package circolog import ( "container/ring" "time" "gopkg.in/mcuadros/go-syslog.v2/format" ) // Client represent a client connected via websocket. Its most important field is the messages channel, where // new messages are sent. type Client struct { Messages chan format.LogParts // only hub should write/close this Options ClientOptions } type ClientOptions struct { BacklogLength int // how many past messages the client wants to receive upon connection Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot } // The Hub is the central "registry"; it keeps both the data storage and clients notifications // // The channel "register" and "unregister" can be seen as "command" // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client // has "options", such as Nofollow, to explain the Hub what should be given // An HubCommand is an "enum" of different commands type HubCommand int const ( CommandClear = iota CommandPauseToggle = iota ) // An HubFullCommand is a Command, complete with arguments type HubFullCommand struct { Command HubCommand } type CommandResponse struct { Value interface{} } type Hub struct { Register chan Client Unregister chan Client LogMessages chan format.LogParts Commands chan HubFullCommand Responses chan CommandResponse clients map[Client]bool circbuf *ring.Ring } // NewHub creates an empty hub func NewHub(ringBufSize int) Hub { return Hub{clients: make(map[Client]bool), Register: make(chan Client), Unregister: make(chan Client), LogMessages: make(chan format.LogParts), Commands: make(chan HubFullCommand), Responses: make(chan CommandResponse), circbuf: ring.New(ringBufSize), } } func (h *Hub) register(cl Client) { if _, ok := h.clients[cl]; !ok { if !cl.Options.Nofollow { // we won't need it in future h.clients[cl] = true } howmany := cl.Options.BacklogLength if howmany > h.circbuf.Len() || howmany == -1 { howmany = h.circbuf.Len() } buf := h.circbuf.Move(-howmany) for i := 0; i < howmany; i++ { item := buf.Value if item != nil { select { // send with short timeout case cl.Messages <- item.(format.LogParts): break case <-time.After(500 * time.Millisecond): close(cl.Messages) return } } buf = buf.Next() } if cl.Options.Nofollow { close(cl.Messages) } } } // Run is hub main loop; keeps everything going func (h *Hub) Run() { active := true for { select { case cl := <-h.Register: h.register(cl) case cl := <-h.Unregister: _, ok := h.clients[cl] if ok { close(cl.Messages) delete(h.clients, cl) } case msg := <-h.LogMessages: if active == true { h.circbuf.Value = msg h.circbuf = h.circbuf.Next() for client := range h.clients { select { // send without blocking case client.Messages <- msg: break default: break } } } case cmd := <-h.Commands: if cmd.Command == CommandClear { h.clear() h.Responses <- CommandResponse{Value: true} } if cmd.Command == CommandPauseToggle { active = !active h.Responses <- CommandResponse{Value: active} } } } } // Clear removes every all elements from the buffer func (h *Hub) clear() { buf := h.circbuf for i := 0; i < buf.Len(); i++ { buf.Value = nil buf = buf.Next() } }