hub.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package circolog
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "os"
  6. "time"
  7. "gopkg.in/mcuadros/go-syslog.v2/format"
  8. )
  9. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  10. // new messages are sent.
  11. type Client struct {
  12. Messages chan format.LogParts // only hub should write/close this
  13. Options ClientOptions
  14. }
  15. type ClientOptions struct {
  16. BacklogLength int // how many past messages the client wants to receive upon connection
  17. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  18. }
  19. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  20. //
  21. // The channel "register" and "unregister" can be seen as "command"
  22. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  23. // has "options", such as Nofollow, to explain the Hub what should be given
  24. // An HubCommand is an "enum" of different commands
  25. type HubCommand int
  26. const (
  27. CommandClear = iota
  28. CommandPauseToggle = iota
  29. )
  30. // An HubFullCommand is a Command, complete with arguments
  31. type HubFullCommand struct {
  32. Command HubCommand
  33. Parameters map[string]interface{}
  34. Response chan CommandResponse
  35. }
  36. type CommandResponse struct {
  37. Value interface{}
  38. }
  39. type Hub struct {
  40. Register chan Client
  41. Unregister chan Client
  42. LogMessages chan format.LogParts
  43. Commands chan HubFullCommand
  44. clients map[Client]bool
  45. circbuf *ring.Ring
  46. }
  47. // NewHub creates an empty hub
  48. func NewHub(ringBufSize int) Hub {
  49. return Hub{clients: make(map[Client]bool),
  50. Register: make(chan Client),
  51. Unregister: make(chan Client),
  52. LogMessages: make(chan format.LogParts),
  53. Commands: make(chan HubFullCommand),
  54. circbuf: ring.New(ringBufSize),
  55. }
  56. }
  57. func (h *Hub) register(cl Client) {
  58. if _, ok := h.clients[cl]; !ok {
  59. if !cl.Options.Nofollow { // we won't need it in future
  60. h.clients[cl] = true
  61. }
  62. howmany := cl.Options.BacklogLength
  63. if howmany > h.circbuf.Len() || howmany == -1 {
  64. howmany = h.circbuf.Len()
  65. }
  66. buf := h.circbuf.Move(-howmany)
  67. for i := 0; i < howmany; i++ {
  68. item := buf.Value
  69. if item != nil {
  70. select { // send with short timeout
  71. case cl.Messages <- item.(format.LogParts):
  72. break
  73. case <-time.After(500 * time.Millisecond):
  74. close(cl.Messages)
  75. return
  76. }
  77. }
  78. buf = buf.Next()
  79. }
  80. if cl.Options.Nofollow {
  81. close(cl.Messages)
  82. }
  83. }
  84. }
  85. // Run is hub main loop; keeps everything going
  86. func (h *Hub) Run() {
  87. active := true
  88. for {
  89. select {
  90. case cl := <-h.Register:
  91. h.register(cl)
  92. case cl := <-h.Unregister:
  93. _, ok := h.clients[cl]
  94. if ok {
  95. close(cl.Messages)
  96. delete(h.clients, cl)
  97. }
  98. case msg := <-h.LogMessages:
  99. if active == true {
  100. h.circbuf.Value = msg
  101. h.circbuf = h.circbuf.Next()
  102. for client := range h.clients {
  103. select { // send without blocking
  104. case client.Messages <- msg:
  105. break
  106. default:
  107. break
  108. }
  109. }
  110. }
  111. case cmd := <-h.Commands:
  112. if cmd.Command == CommandClear {
  113. h.clear()
  114. cmd.Response <- CommandResponse{Value: true}
  115. }
  116. if cmd.Command == CommandPauseToggle {
  117. togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
  118. if active {
  119. fmt.Print("un")
  120. }
  121. fmt.Println("paused")
  122. cmd.Response <- CommandResponse{Value: active}
  123. }
  124. }
  125. }
  126. }
  127. func togglePause(waitTime time.Duration, status *bool) {
  128. if waitTime != 0 {
  129. go func() {
  130. time.Sleep(waitTime)
  131. fmt.Fprintln(os.Stderr, "toggling again")
  132. togglePause(0, status)
  133. }()
  134. }
  135. *status = !*status
  136. }
  137. // Clear removes all elements from the buffer
  138. func (h *Hub) clear() {
  139. buf := h.circbuf
  140. for i := 0; i < buf.Len(); i++ {
  141. buf.Value = nil
  142. buf = buf.Next()
  143. }
  144. }