1
0
Fork 0
forked from boyska/circolog

refactor: now Hub keeps everything

before this there was some hidden race condition because raceCondition
is not concurrent-safe, and there was some concurrent reading and
writing.
Now everything is handled safely by the Hub.

Client now have "options" which are understood by the Hub to handle
them differently.
This commit is contained in:
boyska 2018-08-23 00:56:27 +02:00
parent 97fd191f0e
commit 66f32d1c05
3 changed files with 52 additions and 45 deletions

27
http.go
View file

@ -1,7 +1,6 @@
package main package main
import ( import (
"container/ring"
"net/http" "net/http"
"time" "time"
@ -9,25 +8,27 @@ import (
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
) )
func setupHttp(circbuf *ring.Ring, hub Hub) { func setupHttp(hub Hub) {
http.HandleFunc("/", getHTTPHandler(circbuf)) http.HandleFunc("/", getHTTPHandler(hub))
http.HandleFunc("/ws", getWSHandler(hub)) http.HandleFunc("/ws", getWSHandler(hub))
} }
func getHTTPHandler(circbuf *ring.Ring) http.HandlerFunc { func getHTTPHandler(hub Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
circbuf.Do(func(x interface{}) { client := Client{
if x == nil { Messages: make(chan format.LogParts),
return Nofollow: true}
} hub.Register <- client
logmsg := x.(format.LogParts)
for x := range client.Messages {
logmsg := x
if logmsg["message"] == nil { if logmsg["message"] == nil {
return continue
} }
c := logmsg["message"].(string) c := logmsg["message"].(string)
w.Write([]byte(c)) w.Write([]byte(c))
w.Write([]byte("\n")) w.Write([]byte("\n"))
}) }
} }
} }
@ -41,7 +42,7 @@ func getWSHandler(hub Hub) http.HandlerFunc {
if err != nil { if err != nil {
return return
} }
client := Client{messages: make(chan format.LogParts)} client := Client{Messages: make(chan format.LogParts)}
hub.Register <- client hub.Register <- client
// Allow collection of memory referenced by the caller by doing all work in // Allow collection of memory referenced by the caller by doing all work in
@ -53,7 +54,7 @@ func getWSHandler(hub Hub) http.HandlerFunc {
}() }()
for { for {
select { select {
case message, ok := <-c.messages: case message, ok := <-c.Messages:
conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok { if !ok {
// The hub closed the channel. // The hub closed the channel.

48
hub.go
View file

@ -1,49 +1,71 @@
package main package main
import ( import (
"fmt" "container/ring"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
) )
// Client represent a client connected via websocket. Its most important field is the messages channel, where // Client represent a client connected via websocket. Its most important field is the messages channel, where
// new messages are sent. It is a struct so that it can later be "expanded" to contain other fields (ie: // new messages are sent.
// filters)
type Client struct { type Client struct {
messages chan format.LogParts Messages chan format.LogParts // only hub should write/close this
Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
} }
// The Hub is the central "registry" // The Hub is the central "registry"; it keeps both the data storage and clients notifications
//
// The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given
type Hub struct { type Hub struct {
clients map[Client]bool
Register chan Client Register chan Client
Unregister chan Client Unregister chan Client
logMessages chan format.LogParts LogMessages chan format.LogParts
clients map[Client]bool
circbuf *ring.Ring
} }
// NewHub creates an empty hub // NewHub creates an empty hub
func NewHub() Hub { func NewHub(ringBufSize int) Hub {
return Hub{clients: make(map[Client]bool), return Hub{clients: make(map[Client]bool),
Register: make(chan Client), Register: make(chan Client),
Unregister: make(chan Client), Unregister: make(chan Client),
logMessages: make(chan format.LogParts), LogMessages: make(chan format.LogParts),
circbuf: ring.New(ringBufSize),
} }
} }
// Run is hub main loop; keeps everything going
func (h *Hub) Run() { func (h *Hub) Run() {
for { for {
select { select {
case cl := <-h.Register: case cl := <-h.Register:
h.clients[cl] = true if _, ok := h.clients[cl]; !ok {
if !cl.Nofollow { // we won't need it in future
h.clients[cl] = true
}
h.circbuf.Do(func(x interface{}) {
if x != nil {
cl.Messages <- x.(format.LogParts)
}
})
if cl.Nofollow {
close(cl.Messages)
}
}
case cl := <-h.Unregister: case cl := <-h.Unregister:
_, ok := h.clients[cl] _, ok := h.clients[cl]
if ok { if ok {
close(cl.Messages)
delete(h.clients, cl) delete(h.clients, cl)
} }
case msg := <-h.logMessages: case msg := <-h.LogMessages:
fmt.Println("ricevuto", msg["message"], len(h.clients)) h.circbuf.Value = msg
h.circbuf = h.circbuf.Next()
for client := range h.clients { for client := range h.clients {
client.messages <- msg client.Messages <- msg
} }
} }
} }

22
main.go
View file

@ -1,27 +1,14 @@
package main package main
import ( import (
"container/ring"
"flag" "flag"
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
syslog "gopkg.in/mcuadros/go-syslog.v2" syslog "gopkg.in/mcuadros/go-syslog.v2"
"gopkg.in/mcuadros/go-syslog.v2/format"
) )
func getSyslogdHandler(circbuf *ring.Ring, hub Hub) func(channel syslog.LogPartsChannel) {
return func(channel syslog.LogPartsChannel) {
for logParts := range channel {
hub.logMessages <- logParts
fmt.Println(logParts)
circbuf.Value = logParts
circbuf = circbuf.Next()
}
}
}
func main() { func main() {
var err error var err error
syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses") syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses")
@ -32,9 +19,8 @@ func main() {
flag.Parse() flag.Parse()
var hub Hub var hub Hub
hub = NewHub() hub = NewHub(*bufsize)
channel := make(chan format.LogParts) handler := syslog.NewChannelHandler(hub.LogMessages)
handler := syslog.NewChannelHandler(channel)
server := syslog.NewServer() server := syslog.NewServer()
server.SetFormat(syslog.RFC5424) server.SetFormat(syslog.RFC5424)
@ -52,15 +38,13 @@ func main() {
os.Exit(1) os.Exit(1)
} }
} }
circbuf := ring.New(*bufsize)
if err = server.Boot(); err != nil { if err = server.Boot(); err != nil {
fmt.Fprintln(os.Stderr, "argh", err) fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1) os.Exit(1)
} }
go hub.Run() go hub.Run()
go getSyslogdHandler(circbuf, hub)(channel)
setupHttp(circbuf, hub) setupHttp(hub)
fmt.Printf("Binding address `%s` [http]\n", *queryAddr) fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
http.ListenAndServe(*queryAddr, nil) http.ListenAndServe(*queryAddr, nil)
server.Wait() server.Wait()