1
0
Fork 0
forked from boyska/circolog

Websocket to follow logs

This commit is contained in:
boyska 2018-08-22 23:51:59 +02:00
parent 3bf88506be
commit b1b83f488e
2 changed files with 122 additions and 3 deletions

50
hub.go Normal file
View file

@ -0,0 +1,50 @@
package main
import (
"fmt"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
// Client represent a client connected via websocket. Its most important field is the messages channel, where
// new messages are sent. It is a struct so that it can later be "expanded" to contain other fields (ie:
// filters)
type Client struct {
messages chan format.LogParts
}
// The Hub is the central "registry"
type Hub struct {
clients map[Client]bool
Register chan Client
Unregister chan Client
logMessages chan format.LogParts
}
// NewHub creates an empty hub
func NewHub() Hub {
return Hub{clients: make(map[Client]bool),
Register: make(chan Client),
Unregister: make(chan Client),
logMessages: make(chan format.LogParts),
}
}
func (h *Hub) Run() {
for {
select {
case cl := <-h.Register:
h.clients[cl] = true
case cl := <-h.Unregister:
_, ok := h.clients[cl]
if ok {
delete(h.clients, cl)
}
case msg := <-h.logMessages:
fmt.Println("ricevuto", msg["message"], len(h.clients))
for client := range h.clients {
client.messages <- msg
}
}
}
}

75
main.go
View file

@ -5,15 +5,28 @@ import (
"flag"
"fmt"
"net/http"
"os"
"time"
"github.com/gorilla/websocket"
syslog "gopkg.in/mcuadros/go-syslog.v2"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
var circbuf *ring.Ring
var hub Hub
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func init() {
hub = NewHub()
}
func syslogdHandler(channel syslog.LogPartsChannel) {
for logParts := range channel {
hub.logMessages <- logParts
fmt.Println(logParts)
circbuf.Value = logParts
circbuf = circbuf.Next()
@ -34,7 +47,52 @@ func httpHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("\n"))
})
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
httpHandler(w, r)
client := Client{messages: make(chan format.LogParts)}
hub.Register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go func(conn *websocket.Conn, c Client) {
defer func() {
hub.Unregister <- c
conn.Close()
}()
for {
select {
case message, ok := <-c.messages:
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// The hub closed the channel.
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
if msg, ok := message["message"]; ok {
w.Write([]byte(msg.(string)))
}
if err := w.Close(); err != nil {
return
}
// TODO: ticker/ping
}
}
}(conn, client)
}
func main() {
var err error
syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses")
// dumpSocketPath := flag.String("dump-socket", "/run/buffer.sock", "The socket that user will connect to in order to receive logs")
bufsize := flag.Int("buffer-size", 1000, "Number of messages to keep")
@ -49,17 +107,28 @@ func main() {
server.SetFormat(syslog.RFC5424)
server.SetHandler(handler)
if *syslogSocketPath != "" {
server.ListenUnixgram(*syslogSocketPath)
if err = server.ListenUnixgram(*syslogSocketPath); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
fmt.Printf("Binding socket `%s` [syslog]\n", *syslogSocketPath)
} else {
fmt.Printf("Binding address `%s` [syslog]\n", *syslogAddr)
server.ListenUDP(*syslogAddr)
if err = server.ListenUDP(*syslogAddr); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
}
circbuf = ring.New(*bufsize)
server.Boot()
if err = server.Boot(); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
go hub.Run()
go syslogdHandler(channel)
http.HandleFunc("/", httpHandler)
http.HandleFunc("/ws", wsHandler)
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
http.ListenAndServe(*queryAddr, nil)