hub.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package circolog
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "os"
  6. "time"
  7. "git.lattuga.net/boyska/circolog/filtering"
  8. "gopkg.in/mcuadros/go-syslog.v2/format"
  9. )
  10. // Message is currently an alias for format.Logparts, but this is only temporary; sooner or later, a real struct will be used
  11. // The advantage of having an explicit Message is to clear out what data we are sending to circolog "readers"
  12. // This is not necessarily (and not in practice) the same structure that we receive from logging programs
  13. type Message format.LogParts
  14. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  15. // new messages are sent.
  16. type Client struct {
  17. Messages chan Message // only hub should write/close this
  18. Options ClientOptions
  19. }
  20. // ClientOptions is a struct containing connection options for every reader
  21. type ClientOptions struct {
  22. BacklogLength int // how many past messages the client wants to receive upon connection
  23. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  24. }
  25. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  26. //
  27. // The channel "register" and "unregister" can be seen as "command"
  28. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  29. // has "options", such as Nofollow, to explain the Hub what should be given
  30. // An HubCommand is an "enum" of different commands
  31. type HubCommand int
  32. const (
  33. CommandClear = iota
  34. CommandPauseToggle = iota
  35. CommandStatus = iota
  36. CommandNewFilter = iota
  37. )
  38. // An HubFullCommand is a Command, complete with arguments
  39. type HubFullCommand struct {
  40. Command HubCommand
  41. Parameters map[string]interface{}
  42. Response chan CommandResponse
  43. }
  44. type CommandResponse struct {
  45. Value interface{}
  46. }
  47. // StatusResponse is an implementation of a CommandResponse
  48. type StatusResponse struct {
  49. Size int `json:"size"`
  50. IsRunning bool `json:"running"`
  51. Filter string `json:"filter"`
  52. }
  53. // Status return "paused/unpaused" based on isRunning value
  54. func (r StatusResponse) Status() string {
  55. if r.IsRunning {
  56. return "unpaused"
  57. }
  58. return "paused"
  59. }
  60. type Hub struct {
  61. Register chan Client
  62. Unregister chan Client
  63. LogMessages chan format.LogParts
  64. Commands chan HubFullCommand
  65. clients map[Client]bool
  66. circbuf *ring.Ring
  67. }
  68. // NewHub creates an empty hub
  69. func NewHub(ringBufSize int) Hub {
  70. return Hub{clients: make(map[Client]bool),
  71. Register: make(chan Client),
  72. Unregister: make(chan Client),
  73. LogMessages: make(chan format.LogParts),
  74. Commands: make(chan HubFullCommand),
  75. circbuf: ring.New(ringBufSize),
  76. }
  77. }
  78. func (h *Hub) register(cl Client) {
  79. if _, ok := h.clients[cl]; !ok {
  80. if !cl.Options.Nofollow { // we won't need it in future
  81. h.clients[cl] = true
  82. }
  83. howmany := cl.Options.BacklogLength
  84. if howmany > h.circbuf.Len() || howmany == -1 {
  85. howmany = h.circbuf.Len()
  86. }
  87. buf := h.circbuf.Move(-howmany)
  88. for i := 0; i < howmany; i++ {
  89. item := buf.Value
  90. if item != nil {
  91. select { // send with short timeout
  92. case cl.Messages <- item.(Message):
  93. break
  94. case <-time.After(500 * time.Millisecond):
  95. close(cl.Messages)
  96. return
  97. }
  98. }
  99. buf = buf.Next()
  100. }
  101. if cl.Options.Nofollow {
  102. close(cl.Messages)
  103. }
  104. }
  105. }
  106. // logEntryToMessage converts messages received from writers to the format we promise to readers
  107. func logEntryToMessage(orig format.LogParts) Message {
  108. m := Message{}
  109. // it currently only supports RFC5424. TODO: support RFC3164
  110. m["prog"] = orig["app_name"]
  111. m["client"] = orig["client"]
  112. m["hostname"] = orig["hostname"]
  113. m["proc_id"] = orig["proc_id"]
  114. m["msg"] = orig["message"]
  115. m["facility"] = orig["facility"]
  116. m["time"] = orig["timestamp"]
  117. m["sev"] = orig["severity"]
  118. return m
  119. }
  120. // Run is hub main loop; keeps everything going
  121. func (h *Hub) Run() {
  122. active := true
  123. var filter filtering.ExprValue
  124. for {
  125. select {
  126. case cl := <-h.Register:
  127. h.register(cl)
  128. case cl := <-h.Unregister:
  129. _, ok := h.clients[cl]
  130. if ok {
  131. close(cl.Messages)
  132. delete(h.clients, cl)
  133. }
  134. case msg := <-h.LogMessages:
  135. if active == true && filter.Validate(msg) {
  136. newmsg := logEntryToMessage(msg)
  137. h.circbuf.Value = newmsg
  138. h.circbuf = h.circbuf.Next()
  139. for client := range h.clients {
  140. select { // send without blocking
  141. case client.Messages <- newmsg:
  142. break
  143. default:
  144. break
  145. }
  146. }
  147. }
  148. case cmd := <-h.Commands:
  149. switch cmd.Command {
  150. case CommandClear:
  151. h.clear()
  152. cmd.Response <- CommandResponse{Value: true}
  153. case CommandPauseToggle:
  154. togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
  155. if active {
  156. fmt.Print("un")
  157. }
  158. fmt.Println("paused")
  159. cmd.Response <- CommandResponse{Value: active}
  160. case CommandStatus:
  161. var resp = StatusResponse{
  162. Size: h.circbuf.Len(),
  163. IsRunning: active,
  164. Filter: filter.String(),
  165. }
  166. cmd.Response <- CommandResponse{Value: resp}
  167. case CommandNewFilter:
  168. if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
  169. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  170. "success": false,
  171. "error": err.Error(),
  172. }}
  173. } else {
  174. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  175. "success": true,
  176. "error": "",
  177. }}
  178. }
  179. }
  180. }
  181. }
  182. }
  183. func togglePause(waitTime time.Duration, status *bool) {
  184. if waitTime != 0 {
  185. go func() {
  186. time.Sleep(waitTime)
  187. fmt.Fprintln(os.Stderr, "toggling again")
  188. togglePause(0, status)
  189. }()
  190. }
  191. *status = !*status
  192. }
  193. // Clear removes all elements from the buffer
  194. func (h *Hub) clear() {
  195. buf := h.circbuf
  196. for i := 0; i < buf.Len(); i++ {
  197. buf.Value = nil
  198. buf = buf.Next()
  199. }
  200. }