diff --git a/http.go b/http.go new file mode 100644 index 0000000..67cf683 --- /dev/null +++ b/http.go @@ -0,0 +1,80 @@ +package main + +import ( + "container/ring" + "net/http" + "time" + + "github.com/gorilla/websocket" + "gopkg.in/mcuadros/go-syslog.v2/format" +) + +func setupHttp(circbuf *ring.Ring, hub Hub) { + http.HandleFunc("/", getHTTPHandler(circbuf)) + http.HandleFunc("/ws", getWSHandler(hub)) +} + +func getHTTPHandler(circbuf *ring.Ring) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + circbuf.Do(func(x interface{}) { + if x == nil { + return + } + logmsg := x.(format.LogParts) + if logmsg["message"] == nil { + return + } + c := logmsg["message"].(string) + w.Write([]byte(c)) + w.Write([]byte("\n")) + }) + } +} + +func getWSHandler(hub Hub) http.HandlerFunc { + var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + return func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + client := Client{messages: make(chan format.LogParts)} + hub.Register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go func(conn *websocket.Conn, c Client) { + defer func() { + hub.Unregister <- c + conn.Close() + }() + for { + select { + case message, ok := <-c.messages: + conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if !ok { + // The hub closed the channel. + conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + if msg, ok := message["message"]; ok { + w.Write([]byte(msg.(string))) + } + + if err := w.Close(); err != nil { + return + } + // TODO: ticker/ping + } + } + }(conn, client) + } +} diff --git a/main.go b/main.go index 63310e6..f96805d 100644 --- a/main.go +++ b/main.go @@ -6,91 +6,22 @@ import ( "fmt" "net/http" "os" - "time" - "github.com/gorilla/websocket" syslog "gopkg.in/mcuadros/go-syslog.v2" "gopkg.in/mcuadros/go-syslog.v2/format" ) -var circbuf *ring.Ring -var hub Hub -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, -} - -func init() { - hub = NewHub() -} - -func syslogdHandler(channel syslog.LogPartsChannel) { - for logParts := range channel { - hub.logMessages <- logParts - fmt.Println(logParts) - circbuf.Value = logParts - circbuf = circbuf.Next() +func getSyslogdHandler(circbuf *ring.Ring, hub Hub) func(channel syslog.LogPartsChannel) { + return func(channel syslog.LogPartsChannel) { + for logParts := range channel { + hub.logMessages <- logParts + fmt.Println(logParts) + circbuf.Value = logParts + circbuf = circbuf.Next() + } } } -func httpHandler(w http.ResponseWriter, r *http.Request) { - circbuf.Do(func(x interface{}) { - if x == nil { - return - } - logmsg := x.(format.LogParts) - if logmsg["message"] == nil { - return - } - c := logmsg["message"].(string) - w.Write([]byte(c)) - w.Write([]byte("\n")) - }) -} - -func wsHandler(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - return - } - httpHandler(w, r) - client := Client{messages: make(chan format.LogParts)} - hub.Register <- client - - // Allow collection of memory referenced by the caller by doing all work in - // new goroutines. - go func(conn *websocket.Conn, c Client) { - defer func() { - hub.Unregister <- c - conn.Close() - }() - for { - select { - case message, ok := <-c.messages: - conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) - if !ok { - // The hub closed the channel. - conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - } - - w, err := conn.NextWriter(websocket.TextMessage) - if err != nil { - return - } - if msg, ok := message["message"]; ok { - w.Write([]byte(msg.(string))) - } - - if err := w.Close(); err != nil { - return - } - // TODO: ticker/ping - } - } - }(conn, client) -} - func main() { var err error syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses") @@ -100,6 +31,8 @@ func main() { queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service") flag.Parse() + var hub Hub + hub = NewHub() channel := make(chan format.LogParts) handler := syslog.NewChannelHandler(channel) @@ -119,18 +52,16 @@ func main() { os.Exit(1) } } - circbuf = ring.New(*bufsize) + circbuf := ring.New(*bufsize) if err = server.Boot(); err != nil { fmt.Fprintln(os.Stderr, "argh", err) os.Exit(1) } go hub.Run() - go syslogdHandler(channel) + go getSyslogdHandler(circbuf, hub)(channel) - http.HandleFunc("/", httpHandler) - http.HandleFunc("/ws", wsHandler) + setupHttp(circbuf, hub) fmt.Printf("Binding address `%s` [http]\n", *queryAddr) http.ListenAndServe(*queryAddr, nil) - server.Wait() }