hub.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package circolog
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "os"
  6. "time"
  7. "git.lattuga.net/boyska/circolog/data"
  8. "git.lattuga.net/boyska/circolog/filtering"
  9. "gopkg.in/mcuadros/go-syslog.v2/format"
  10. )
  11. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  12. // new messages are sent.
  13. type Client struct {
  14. Messages chan data.Message // only hub should write/close this
  15. Options ClientOptions
  16. }
  17. // ClientOptions is a struct containing connection options for every reader
  18. type ClientOptions struct {
  19. BacklogLength int // how many past messages the client wants to receive upon connection
  20. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  21. }
  22. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  23. //
  24. // The channel "register" and "unregister" can be seen as "command"
  25. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  26. // has "options", such as Nofollow, to explain the Hub what should be given
  27. // An HubCommand is an "enum" of different commands
  28. type HubCommand int
  29. const (
  30. CommandClear = iota
  31. CommandPauseToggle = iota
  32. CommandStatus = iota
  33. CommandNewFilter = iota
  34. )
  35. // An HubFullCommand is a Command, complete with arguments
  36. type HubFullCommand struct {
  37. Command HubCommand
  38. Parameters map[string]interface{}
  39. Response chan CommandResponse
  40. }
  41. type CommandResponse struct {
  42. Value interface{}
  43. }
  44. // StatusResponse is an implementation of a CommandResponse
  45. type StatusResponse struct {
  46. Size int `json:"size"`
  47. IsRunning bool `json:"running"`
  48. Filter string `json:"filter"`
  49. }
  50. // Status return "paused/unpaused" based on isRunning value
  51. func (r StatusResponse) Status() string {
  52. if r.IsRunning {
  53. return "unpaused"
  54. }
  55. return "paused"
  56. }
  57. type Hub struct {
  58. Register chan Client
  59. Unregister chan Client
  60. LogMessages chan format.LogParts
  61. Commands chan HubFullCommand
  62. clients map[Client]bool
  63. circbuf *ring.Ring
  64. }
  65. // NewHub creates an empty hub
  66. func NewHub(ringBufSize int) Hub {
  67. return Hub{clients: make(map[Client]bool),
  68. Register: make(chan Client),
  69. Unregister: make(chan Client),
  70. LogMessages: make(chan format.LogParts),
  71. Commands: make(chan HubFullCommand),
  72. circbuf: ring.New(ringBufSize),
  73. }
  74. }
  75. func (h *Hub) register(cl Client) {
  76. if _, ok := h.clients[cl]; !ok {
  77. if !cl.Options.Nofollow { // we won't need it in future
  78. h.clients[cl] = true
  79. }
  80. howmany := cl.Options.BacklogLength
  81. if howmany > h.circbuf.Len() || howmany == -1 {
  82. howmany = h.circbuf.Len()
  83. }
  84. buf := h.circbuf.Move(-howmany)
  85. for i := 0; i < howmany; i++ {
  86. item := buf.Value
  87. if item != nil {
  88. select { // send with short timeout
  89. case cl.Messages <- item.(data.Message):
  90. break
  91. case <-time.After(500 * time.Millisecond):
  92. close(cl.Messages)
  93. return
  94. }
  95. }
  96. buf = buf.Next()
  97. }
  98. if cl.Options.Nofollow {
  99. close(cl.Messages)
  100. }
  101. }
  102. }
  103. // Run is hub main loop; keeps everything going
  104. func (h *Hub) Run() {
  105. active := true
  106. var filter filtering.ExprValue
  107. for {
  108. select {
  109. case cl := <-h.Register:
  110. h.register(cl)
  111. case cl := <-h.Unregister:
  112. _, ok := h.clients[cl]
  113. if ok {
  114. close(cl.Messages)
  115. delete(h.clients, cl)
  116. }
  117. case msg := <-h.LogMessages:
  118. newmsg := data.LogEntryToMessage(msg)
  119. if active == true && filter.Validate(newmsg) {
  120. h.circbuf.Value = newmsg
  121. h.circbuf = h.circbuf.Next()
  122. for client := range h.clients {
  123. select { // send without blocking
  124. case client.Messages <- newmsg:
  125. break
  126. default:
  127. break
  128. }
  129. }
  130. }
  131. case cmd := <-h.Commands:
  132. switch cmd.Command {
  133. case CommandClear:
  134. h.clear()
  135. cmd.Response <- CommandResponse{Value: true}
  136. case CommandPauseToggle:
  137. togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
  138. if active {
  139. fmt.Print("un")
  140. }
  141. fmt.Println("paused")
  142. cmd.Response <- CommandResponse{Value: active}
  143. case CommandStatus:
  144. var resp = StatusResponse{
  145. Size: h.circbuf.Len(),
  146. IsRunning: active,
  147. Filter: filter.String(),
  148. }
  149. cmd.Response <- CommandResponse{Value: resp}
  150. case CommandNewFilter:
  151. if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
  152. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  153. "success": false,
  154. "error": err.Error(),
  155. }}
  156. } else {
  157. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  158. "success": true,
  159. "error": "",
  160. }}
  161. }
  162. }
  163. }
  164. }
  165. }
  166. func togglePause(waitTime time.Duration, status *bool) {
  167. if waitTime != 0 {
  168. go func() {
  169. time.Sleep(waitTime)
  170. fmt.Fprintln(os.Stderr, "toggling again")
  171. togglePause(0, status)
  172. }()
  173. }
  174. *status = !*status
  175. }
  176. // Clear removes all elements from the buffer
  177. func (h *Hub) clear() {
  178. buf := h.circbuf
  179. for i := 0; i < buf.Len(); i++ {
  180. buf.Value = nil
  181. buf = buf.Next()
  182. }
  183. }