hub.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package circolog
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "os"
  6. "time"
  7. "gopkg.in/mcuadros/go-syslog.v2/format"
  8. )
  9. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  10. // new messages are sent.
  11. type Client struct {
  12. Messages chan format.LogParts // only hub should write/close this
  13. Options ClientOptions
  14. }
  15. type ClientOptions struct {
  16. BacklogLength int // how many past messages the client wants to receive upon connection
  17. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  18. }
  19. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  20. //
  21. // The channel "register" and "unregister" can be seen as "command"
  22. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  23. // has "options", such as Nofollow, to explain the Hub what should be given
  24. // An HubCommand is an "enum" of different commands
  25. type HubCommand int
  26. const (
  27. CommandClear = iota
  28. CommandPauseToggle = iota
  29. CommandStatus = iota
  30. )
  31. // An HubFullCommand is a Command, complete with arguments
  32. type HubFullCommand struct {
  33. Command HubCommand
  34. Parameters map[string]interface{}
  35. Response chan CommandResponse
  36. }
  37. type CommandResponse struct {
  38. Value interface{}
  39. }
  40. // StatusResponse is an implementation of a CommandResponse
  41. type StatusResponse struct {
  42. Size int `json:"size"`
  43. IsRunning bool `json:"running"`
  44. }
  45. // Status return "paused/unpaused" based on isRunning value
  46. func (r StatusResponse) Status() string {
  47. if r.IsRunning {
  48. return "unpaused"
  49. }
  50. return "paused"
  51. }
  52. type Hub struct {
  53. Register chan Client
  54. Unregister chan Client
  55. LogMessages chan format.LogParts
  56. Commands chan HubFullCommand
  57. clients map[Client]bool
  58. circbuf *ring.Ring
  59. }
  60. // NewHub creates an empty hub
  61. func NewHub(ringBufSize int) Hub {
  62. return Hub{clients: make(map[Client]bool),
  63. Register: make(chan Client),
  64. Unregister: make(chan Client),
  65. LogMessages: make(chan format.LogParts),
  66. Commands: make(chan HubFullCommand),
  67. circbuf: ring.New(ringBufSize),
  68. }
  69. }
  70. func (h *Hub) register(cl Client) {
  71. if _, ok := h.clients[cl]; !ok {
  72. if !cl.Options.Nofollow { // we won't need it in future
  73. h.clients[cl] = true
  74. }
  75. howmany := cl.Options.BacklogLength
  76. if howmany > h.circbuf.Len() || howmany == -1 {
  77. howmany = h.circbuf.Len()
  78. }
  79. buf := h.circbuf.Move(-howmany)
  80. for i := 0; i < howmany; i++ {
  81. item := buf.Value
  82. if item != nil {
  83. select { // send with short timeout
  84. case cl.Messages <- item.(format.LogParts):
  85. break
  86. case <-time.After(500 * time.Millisecond):
  87. close(cl.Messages)
  88. return
  89. }
  90. }
  91. buf = buf.Next()
  92. }
  93. if cl.Options.Nofollow {
  94. close(cl.Messages)
  95. }
  96. }
  97. }
  98. // Run is hub main loop; keeps everything going
  99. func (h *Hub) Run() {
  100. active := true
  101. for {
  102. select {
  103. case cl := <-h.Register:
  104. h.register(cl)
  105. case cl := <-h.Unregister:
  106. _, ok := h.clients[cl]
  107. if ok {
  108. close(cl.Messages)
  109. delete(h.clients, cl)
  110. }
  111. case msg := <-h.LogMessages:
  112. if active == true {
  113. h.circbuf.Value = msg
  114. h.circbuf = h.circbuf.Next()
  115. for client := range h.clients {
  116. select { // send without blocking
  117. case client.Messages <- msg:
  118. break
  119. default:
  120. break
  121. }
  122. }
  123. }
  124. case cmd := <-h.Commands:
  125. switch cmd.Command {
  126. case CommandClear:
  127. h.clear()
  128. cmd.Response <- CommandResponse{Value: true}
  129. case CommandPauseToggle:
  130. togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
  131. if active {
  132. fmt.Print("un")
  133. }
  134. fmt.Println("paused")
  135. cmd.Response <- CommandResponse{Value: active}
  136. case CommandStatus:
  137. var resp = StatusResponse{
  138. Size: h.circbuf.Len(),
  139. IsRunning: active,
  140. }
  141. cmd.Response <- CommandResponse{Value: resp}
  142. }
  143. }
  144. }
  145. }
  146. func togglePause(waitTime time.Duration, status *bool) {
  147. if waitTime != 0 {
  148. go func() {
  149. time.Sleep(waitTime)
  150. fmt.Fprintln(os.Stderr, "toggling again")
  151. togglePause(0, status)
  152. }()
  153. }
  154. *status = !*status
  155. }
  156. // Clear removes all elements from the buffer
  157. func (h *Hub) clear() {
  158. buf := h.circbuf
  159. for i := 0; i < buf.Len(); i++ {
  160. buf.Value = nil
  161. buf = buf.Next()
  162. }
  163. }