hub.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package circolog
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "os"
  6. "time"
  7. "git.lattuga.net/boyska/circolog/filtering"
  8. "gopkg.in/mcuadros/go-syslog.v2/format"
  9. )
  10. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  11. // new messages are sent.
  12. type Client struct {
  13. Messages chan format.LogParts // only hub should write/close this
  14. Options ClientOptions
  15. }
  16. type ClientOptions struct {
  17. BacklogLength int // how many past messages the client wants to receive upon connection
  18. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  19. }
  20. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  21. //
  22. // The channel "register" and "unregister" can be seen as "command"
  23. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  24. // has "options", such as Nofollow, to explain the Hub what should be given
  25. // An HubCommand is an "enum" of different commands
  26. type HubCommand int
  27. const (
  28. CommandClear = iota
  29. CommandPauseToggle = iota
  30. CommandStatus = iota
  31. CommandNewFilter = iota
  32. )
  33. // An HubFullCommand is a Command, complete with arguments
  34. type HubFullCommand struct {
  35. Command HubCommand
  36. Parameters map[string]interface{}
  37. Response chan CommandResponse
  38. }
  39. type CommandResponse struct {
  40. Value interface{}
  41. }
  42. type Hub struct {
  43. Register chan Client
  44. Unregister chan Client
  45. LogMessages chan format.LogParts
  46. Commands chan HubFullCommand
  47. clients map[Client]bool
  48. circbuf *ring.Ring
  49. }
  50. // NewHub creates an empty hub
  51. func NewHub(ringBufSize int) Hub {
  52. return Hub{clients: make(map[Client]bool),
  53. Register: make(chan Client),
  54. Unregister: make(chan Client),
  55. LogMessages: make(chan format.LogParts),
  56. Commands: make(chan HubFullCommand),
  57. circbuf: ring.New(ringBufSize),
  58. }
  59. }
  60. func (h *Hub) register(cl Client) {
  61. if _, ok := h.clients[cl]; !ok {
  62. if !cl.Options.Nofollow { // we won't need it in future
  63. h.clients[cl] = true
  64. }
  65. howmany := cl.Options.BacklogLength
  66. if howmany > h.circbuf.Len() || howmany == -1 {
  67. howmany = h.circbuf.Len()
  68. }
  69. buf := h.circbuf.Move(-howmany)
  70. for i := 0; i < howmany; i++ {
  71. item := buf.Value
  72. if item != nil {
  73. select { // send with short timeout
  74. case cl.Messages <- item.(format.LogParts):
  75. break
  76. case <-time.After(500 * time.Millisecond):
  77. close(cl.Messages)
  78. return
  79. }
  80. }
  81. buf = buf.Next()
  82. }
  83. if cl.Options.Nofollow {
  84. close(cl.Messages)
  85. }
  86. }
  87. }
  88. // Run is hub main loop; keeps everything going
  89. func (h *Hub) Run() {
  90. active := true
  91. var filter filtering.ExprValue
  92. for {
  93. select {
  94. case cl := <-h.Register:
  95. h.register(cl)
  96. case cl := <-h.Unregister:
  97. _, ok := h.clients[cl]
  98. if ok {
  99. close(cl.Messages)
  100. delete(h.clients, cl)
  101. }
  102. case msg := <-h.LogMessages:
  103. if active == true && filter.Validate(msg) {
  104. h.circbuf.Value = msg
  105. h.circbuf = h.circbuf.Next()
  106. for client := range h.clients {
  107. select { // send without blocking
  108. case client.Messages <- msg:
  109. break
  110. default:
  111. break
  112. }
  113. }
  114. }
  115. case cmd := <-h.Commands:
  116. switch cmd.Command {
  117. case CommandClear:
  118. h.clear()
  119. cmd.Response <- CommandResponse{Value: true}
  120. case CommandPauseToggle:
  121. togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
  122. if active {
  123. fmt.Print("un")
  124. }
  125. fmt.Println("paused")
  126. cmd.Response <- CommandResponse{Value: active}
  127. case CommandStatus:
  128. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  129. "size": h.circbuf.Len(),
  130. "paused": !active,
  131. "filter": filter.String(),
  132. }}
  133. case CommandNewFilter:
  134. if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
  135. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  136. "success": false,
  137. "error": err.Error(),
  138. }}
  139. } else {
  140. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  141. "success": true,
  142. "error": "",
  143. }}
  144. }
  145. }
  146. }
  147. }
  148. }
  149. func togglePause(waitTime time.Duration, status *bool) {
  150. if waitTime != 0 {
  151. go func() {
  152. time.Sleep(waitTime)
  153. fmt.Fprintln(os.Stderr, "toggling again")
  154. togglePause(0, status)
  155. }()
  156. }
  157. *status = !*status
  158. }
  159. // Clear removes all elements from the buffer
  160. func (h *Hub) clear() {
  161. buf := h.circbuf
  162. for i := 0; i < buf.Len(); i++ {
  163. buf.Value = nil
  164. buf = buf.Next()
  165. }
  166. }