diff --git a/hub.go b/hub.go new file mode 100644 index 0000000..aba24da --- /dev/null +++ b/hub.go @@ -0,0 +1,50 @@ +package main + +import ( + "fmt" + + "gopkg.in/mcuadros/go-syslog.v2/format" +) + +// Client represent a client connected via websocket. Its most important field is the messages channel, where +// new messages are sent. It is a struct so that it can later be "expanded" to contain other fields (ie: +// filters) +type Client struct { + messages chan format.LogParts +} + +// The Hub is the central "registry" +type Hub struct { + clients map[Client]bool + Register chan Client + Unregister chan Client + logMessages chan format.LogParts +} + +// NewHub creates an empty hub +func NewHub() Hub { + return Hub{clients: make(map[Client]bool), + Register: make(chan Client), + Unregister: make(chan Client), + logMessages: make(chan format.LogParts), + } +} + +func (h *Hub) Run() { + for { + select { + case cl := <-h.Register: + h.clients[cl] = true + case cl := <-h.Unregister: + _, ok := h.clients[cl] + if ok { + delete(h.clients, cl) + } + case msg := <-h.logMessages: + fmt.Println("ricevuto", msg["message"], len(h.clients)) + for client := range h.clients { + client.messages <- msg + } + } + } +} diff --git a/main.go b/main.go index e0e5f1b..63310e6 100644 --- a/main.go +++ b/main.go @@ -5,15 +5,28 @@ import ( "flag" "fmt" "net/http" + "os" + "time" + "github.com/gorilla/websocket" syslog "gopkg.in/mcuadros/go-syslog.v2" "gopkg.in/mcuadros/go-syslog.v2/format" ) var circbuf *ring.Ring +var hub Hub +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +func init() { + hub = NewHub() +} func syslogdHandler(channel syslog.LogPartsChannel) { for logParts := range channel { + hub.logMessages <- logParts fmt.Println(logParts) circbuf.Value = logParts circbuf = circbuf.Next() @@ -34,7 +47,52 @@ func httpHandler(w http.ResponseWriter, r *http.Request) { w.Write([]byte("\n")) }) } + +func wsHandler(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + httpHandler(w, r) + client := Client{messages: make(chan format.LogParts)} + hub.Register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go func(conn *websocket.Conn, c Client) { + defer func() { + hub.Unregister <- c + conn.Close() + }() + for { + select { + case message, ok := <-c.messages: + conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if !ok { + // The hub closed the channel. + conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + if msg, ok := message["message"]; ok { + w.Write([]byte(msg.(string))) + } + + if err := w.Close(); err != nil { + return + } + // TODO: ticker/ping + } + } + }(conn, client) +} + func main() { + var err error syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses") // dumpSocketPath := flag.String("dump-socket", "/run/buffer.sock", "The socket that user will connect to in order to receive logs") bufsize := flag.Int("buffer-size", 1000, "Number of messages to keep") @@ -49,17 +107,28 @@ func main() { server.SetFormat(syslog.RFC5424) server.SetHandler(handler) if *syslogSocketPath != "" { - server.ListenUnixgram(*syslogSocketPath) + if err = server.ListenUnixgram(*syslogSocketPath); err != nil { + fmt.Fprintln(os.Stderr, "argh", err) + os.Exit(1) + } fmt.Printf("Binding socket `%s` [syslog]\n", *syslogSocketPath) } else { fmt.Printf("Binding address `%s` [syslog]\n", *syslogAddr) - server.ListenUDP(*syslogAddr) + if err = server.ListenUDP(*syslogAddr); err != nil { + fmt.Fprintln(os.Stderr, "argh", err) + os.Exit(1) + } } circbuf = ring.New(*bufsize) - server.Boot() + if err = server.Boot(); err != nil { + fmt.Fprintln(os.Stderr, "argh", err) + os.Exit(1) + } + go hub.Run() go syslogdHandler(channel) http.HandleFunc("/", httpHandler) + http.HandleFunc("/ws", wsHandler) fmt.Printf("Binding address `%s` [http]\n", *queryAddr) http.ListenAndServe(*queryAddr, nil)