From 66f32d1c05e8997440ac742c204d0429f3b5bafa Mon Sep 17 00:00:00 2001 From: boyska Date: Thu, 23 Aug 2018 00:56:27 +0200 Subject: [PATCH] refactor: now Hub keeps everything before this there was some hidden race condition because raceCondition is not concurrent-safe, and there was some concurrent reading and writing. Now everything is handled safely by the Hub. Client now have "options" which are understood by the Hub to handle them differently. --- http.go | 27 ++++++++++++++------------- hub.go | 48 +++++++++++++++++++++++++++++++++++------------- main.go | 22 +++------------------- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/http.go b/http.go index 67cf683..5a5a056 100644 --- a/http.go +++ b/http.go @@ -1,7 +1,6 @@ package main import ( - "container/ring" "net/http" "time" @@ -9,25 +8,27 @@ import ( "gopkg.in/mcuadros/go-syslog.v2/format" ) -func setupHttp(circbuf *ring.Ring, hub Hub) { - http.HandleFunc("/", getHTTPHandler(circbuf)) +func setupHttp(hub Hub) { + http.HandleFunc("/", getHTTPHandler(hub)) http.HandleFunc("/ws", getWSHandler(hub)) } -func getHTTPHandler(circbuf *ring.Ring) http.HandlerFunc { +func getHTTPHandler(hub Hub) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - circbuf.Do(func(x interface{}) { - if x == nil { - return - } - logmsg := x.(format.LogParts) + client := Client{ + Messages: make(chan format.LogParts), + Nofollow: true} + hub.Register <- client + + for x := range client.Messages { + logmsg := x if logmsg["message"] == nil { - return + continue } c := logmsg["message"].(string) w.Write([]byte(c)) w.Write([]byte("\n")) - }) + } } } @@ -41,7 +42,7 @@ func getWSHandler(hub Hub) http.HandlerFunc { if err != nil { return } - client := Client{messages: make(chan format.LogParts)} + client := Client{Messages: make(chan format.LogParts)} hub.Register <- client // Allow collection of memory referenced by the caller by doing all work in @@ -53,7 +54,7 @@ func getWSHandler(hub Hub) http.HandlerFunc { }() for { select { - case message, ok := <-c.messages: + case message, ok := <-c.Messages: conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if !ok { // The hub closed the channel. diff --git a/hub.go b/hub.go index aba24da..e9efd27 100644 --- a/hub.go +++ b/hub.go @@ -1,49 +1,71 @@ package main import ( - "fmt" + "container/ring" "gopkg.in/mcuadros/go-syslog.v2/format" ) // Client represent a client connected via websocket. Its most important field is the messages channel, where -// new messages are sent. It is a struct so that it can later be "expanded" to contain other fields (ie: -// filters) +// new messages are sent. type Client struct { - messages chan format.LogParts + Messages chan format.LogParts // only hub should write/close this + Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot } -// The Hub is the central "registry" +// The Hub is the central "registry"; it keeps both the data storage and clients notifications +// +// The channel "register" and "unregister" can be seen as "command" +// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client +// has "options", such as Nofollow, to explain the Hub what should be given type Hub struct { - clients map[Client]bool Register chan Client Unregister chan Client - logMessages chan format.LogParts + LogMessages chan format.LogParts + + clients map[Client]bool + circbuf *ring.Ring } // NewHub creates an empty hub -func NewHub() Hub { +func NewHub(ringBufSize int) Hub { return Hub{clients: make(map[Client]bool), Register: make(chan Client), Unregister: make(chan Client), - logMessages: make(chan format.LogParts), + LogMessages: make(chan format.LogParts), + circbuf: ring.New(ringBufSize), } } +// Run is hub main loop; keeps everything going func (h *Hub) Run() { for { select { case cl := <-h.Register: - h.clients[cl] = true + if _, ok := h.clients[cl]; !ok { + if !cl.Nofollow { // we won't need it in future + h.clients[cl] = true + } + h.circbuf.Do(func(x interface{}) { + if x != nil { + cl.Messages <- x.(format.LogParts) + } + }) + if cl.Nofollow { + close(cl.Messages) + } + } case cl := <-h.Unregister: _, ok := h.clients[cl] if ok { + close(cl.Messages) delete(h.clients, cl) } - case msg := <-h.logMessages: - fmt.Println("ricevuto", msg["message"], len(h.clients)) + case msg := <-h.LogMessages: + h.circbuf.Value = msg + h.circbuf = h.circbuf.Next() for client := range h.clients { - client.messages <- msg + client.Messages <- msg } } } diff --git a/main.go b/main.go index f96805d..3847dbb 100644 --- a/main.go +++ b/main.go @@ -1,27 +1,14 @@ package main import ( - "container/ring" "flag" "fmt" "net/http" "os" syslog "gopkg.in/mcuadros/go-syslog.v2" - "gopkg.in/mcuadros/go-syslog.v2/format" ) -func getSyslogdHandler(circbuf *ring.Ring, hub Hub) func(channel syslog.LogPartsChannel) { - return func(channel syslog.LogPartsChannel) { - for logParts := range channel { - hub.logMessages <- logParts - fmt.Println(logParts) - circbuf.Value = logParts - circbuf = circbuf.Next() - } - } -} - func main() { var err error syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses") @@ -32,9 +19,8 @@ func main() { flag.Parse() var hub Hub - hub = NewHub() - channel := make(chan format.LogParts) - handler := syslog.NewChannelHandler(channel) + hub = NewHub(*bufsize) + handler := syslog.NewChannelHandler(hub.LogMessages) server := syslog.NewServer() server.SetFormat(syslog.RFC5424) @@ -52,15 +38,13 @@ func main() { os.Exit(1) } } - circbuf := ring.New(*bufsize) if err = server.Boot(); err != nil { fmt.Fprintln(os.Stderr, "argh", err) os.Exit(1) } go hub.Run() - go getSyslogdHandler(circbuf, hub)(channel) - setupHttp(circbuf, hub) + setupHttp(hub) fmt.Printf("Binding address `%s` [http]\n", *queryAddr) http.ListenAndServe(*queryAddr, nil) server.Wait()