hub.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package circolog
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "os"
  6. "time"
  7. "git.lattuga.net/boyska/circolog/filtering"
  8. "gopkg.in/mcuadros/go-syslog.v2/format"
  9. )
  10. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  11. // new messages are sent.
  12. type Client struct {
  13. Messages chan format.LogParts // only hub should write/close this
  14. Options ClientOptions
  15. }
  16. type ClientOptions struct {
  17. BacklogLength int // how many past messages the client wants to receive upon connection
  18. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  19. }
  20. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  21. //
  22. // The channel "register" and "unregister" can be seen as "command"
  23. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  24. // has "options", such as Nofollow, to explain the Hub what should be given
  25. // An HubCommand is an "enum" of different commands
  26. type HubCommand int
  27. const (
  28. CommandClear = iota
  29. CommandPauseToggle = iota
  30. CommandStatus = iota
  31. CommandNewFilter = iota
  32. )
  33. // An HubFullCommand is a Command, complete with arguments
  34. type HubFullCommand struct {
  35. Command HubCommand
  36. Parameters map[string]interface{}
  37. Response chan CommandResponse
  38. }
  39. type CommandResponse struct {
  40. Value interface{}
  41. }
  42. // StatusResponse is an implementation of a CommandResponse
  43. type StatusResponse struct {
  44. Size int `json:"size"`
  45. IsRunning bool `json:"running"`
  46. Filter string `json:"filter"`
  47. }
  48. // Status return "paused/unpaused" based on isRunning value
  49. func (r StatusResponse) Status() string {
  50. if r.IsRunning {
  51. return "unpaused"
  52. }
  53. return "paused"
  54. }
  55. type Hub struct {
  56. Register chan Client
  57. Unregister chan Client
  58. LogMessages chan format.LogParts
  59. Commands chan HubFullCommand
  60. clients map[Client]bool
  61. circbuf *ring.Ring
  62. }
  63. // NewHub creates an empty hub
  64. func NewHub(ringBufSize int) Hub {
  65. return Hub{clients: make(map[Client]bool),
  66. Register: make(chan Client),
  67. Unregister: make(chan Client),
  68. LogMessages: make(chan format.LogParts),
  69. Commands: make(chan HubFullCommand),
  70. circbuf: ring.New(ringBufSize),
  71. }
  72. }
  73. func (h *Hub) register(cl Client) {
  74. if _, ok := h.clients[cl]; !ok {
  75. if !cl.Options.Nofollow { // we won't need it in future
  76. h.clients[cl] = true
  77. }
  78. howmany := cl.Options.BacklogLength
  79. if howmany > h.circbuf.Len() || howmany == -1 {
  80. howmany = h.circbuf.Len()
  81. }
  82. buf := h.circbuf.Move(-howmany)
  83. for i := 0; i < howmany; i++ {
  84. item := buf.Value
  85. if item != nil {
  86. select { // send with short timeout
  87. case cl.Messages <- item.(format.LogParts):
  88. break
  89. case <-time.After(500 * time.Millisecond):
  90. close(cl.Messages)
  91. return
  92. }
  93. }
  94. buf = buf.Next()
  95. }
  96. if cl.Options.Nofollow {
  97. close(cl.Messages)
  98. }
  99. }
  100. }
  101. // Run is hub main loop; keeps everything going
  102. func (h *Hub) Run() {
  103. active := true
  104. var filter filtering.ExprValue
  105. for {
  106. select {
  107. case cl := <-h.Register:
  108. h.register(cl)
  109. case cl := <-h.Unregister:
  110. _, ok := h.clients[cl]
  111. if ok {
  112. close(cl.Messages)
  113. delete(h.clients, cl)
  114. }
  115. case msg := <-h.LogMessages:
  116. if active == true && filter.Validate(msg) {
  117. h.circbuf.Value = msg
  118. h.circbuf = h.circbuf.Next()
  119. for client := range h.clients {
  120. select { // send without blocking
  121. case client.Messages <- msg:
  122. break
  123. default:
  124. break
  125. }
  126. }
  127. }
  128. case cmd := <-h.Commands:
  129. switch cmd.Command {
  130. case CommandClear:
  131. h.clear()
  132. cmd.Response <- CommandResponse{Value: true}
  133. case CommandPauseToggle:
  134. togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
  135. if active {
  136. fmt.Print("un")
  137. }
  138. fmt.Println("paused")
  139. cmd.Response <- CommandResponse{Value: active}
  140. case CommandStatus:
  141. var resp = StatusResponse{
  142. Size: h.circbuf.Len(),
  143. IsRunning: active,
  144. Filter: filter.String(),
  145. }
  146. cmd.Response <- CommandResponse{Value: resp}
  147. case CommandNewFilter:
  148. if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
  149. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  150. "success": false,
  151. "error": err.Error(),
  152. }}
  153. } else {
  154. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  155. "success": true,
  156. "error": "",
  157. }}
  158. }
  159. }
  160. }
  161. }
  162. }
  163. func togglePause(waitTime time.Duration, status *bool) {
  164. if waitTime != 0 {
  165. go func() {
  166. time.Sleep(waitTime)
  167. fmt.Fprintln(os.Stderr, "toggling again")
  168. togglePause(0, status)
  169. }()
  170. }
  171. *status = !*status
  172. }
  173. // Clear removes all elements from the buffer
  174. func (h *Hub) clear() {
  175. buf := h.circbuf
  176. for i := 0; i < buf.Len(); i++ {
  177. buf.Value = nil
  178. buf = buf.Next()
  179. }
  180. }