hub.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package circolog
  2. import (
  3. "container/ring"
  4. "time"
  5. "gopkg.in/mcuadros/go-syslog.v2/format"
  6. )
  7. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  8. // new messages are sent.
  9. type Client struct {
  10. Messages chan format.LogParts // only hub should write/close this
  11. Options ClientOptions
  12. }
  13. type ClientOptions struct {
  14. BacklogLength int // how many past messages the client wants to receive upon connection
  15. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  16. }
  17. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  18. //
  19. // The channel "register" and "unregister" can be seen as "command"
  20. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  21. // has "options", such as Nofollow, to explain the Hub what should be given
  22. // An HubCommand is an "enum" of different commands
  23. type HubCommand int
  24. const (
  25. CommandClear = iota
  26. CommandPauseToggle = iota
  27. )
  28. // An HubFullCommand is a Command, complete with arguments
  29. type HubFullCommand struct {
  30. Command HubCommand
  31. }
  32. type CommandResponse struct {
  33. Value interface{}
  34. }
  35. type Hub struct {
  36. Register chan Client
  37. Unregister chan Client
  38. LogMessages chan format.LogParts
  39. Commands chan HubFullCommand
  40. Responses chan CommandResponse
  41. clients map[Client]bool
  42. circbuf *ring.Ring
  43. }
  44. // NewHub creates an empty hub
  45. func NewHub(ringBufSize int) Hub {
  46. return Hub{clients: make(map[Client]bool),
  47. Register: make(chan Client),
  48. Unregister: make(chan Client),
  49. LogMessages: make(chan format.LogParts),
  50. Commands: make(chan HubFullCommand),
  51. Responses: make(chan CommandResponse),
  52. circbuf: ring.New(ringBufSize),
  53. }
  54. }
  55. func (h *Hub) register(cl Client) {
  56. if _, ok := h.clients[cl]; !ok {
  57. if !cl.Options.Nofollow { // we won't need it in future
  58. h.clients[cl] = true
  59. }
  60. howmany := cl.Options.BacklogLength
  61. if howmany > h.circbuf.Len() || howmany == -1 {
  62. howmany = h.circbuf.Len()
  63. }
  64. buf := h.circbuf.Move(-howmany)
  65. for i := 0; i < howmany; i++ {
  66. item := buf.Value
  67. if item != nil {
  68. select { // send with short timeout
  69. case cl.Messages <- item.(format.LogParts):
  70. break
  71. case <-time.After(500 * time.Millisecond):
  72. close(cl.Messages)
  73. return
  74. }
  75. }
  76. buf = buf.Next()
  77. }
  78. if cl.Options.Nofollow {
  79. close(cl.Messages)
  80. }
  81. }
  82. }
  83. // Run is hub main loop; keeps everything going
  84. func (h *Hub) Run() {
  85. active := true
  86. for {
  87. select {
  88. case cl := <-h.Register:
  89. h.register(cl)
  90. case cl := <-h.Unregister:
  91. _, ok := h.clients[cl]
  92. if ok {
  93. close(cl.Messages)
  94. delete(h.clients, cl)
  95. }
  96. case msg := <-h.LogMessages:
  97. if active == true {
  98. h.circbuf.Value = msg
  99. h.circbuf = h.circbuf.Next()
  100. for client := range h.clients {
  101. select { // send without blocking
  102. case client.Messages <- msg:
  103. break
  104. default:
  105. break
  106. }
  107. }
  108. }
  109. case cmd := <-h.Commands:
  110. if cmd.Command == CommandClear {
  111. h.clear()
  112. h.Responses <- CommandResponse{Value: true}
  113. }
  114. if cmd.Command == CommandPauseToggle {
  115. active = !active
  116. h.Responses <- CommandResponse{Value: active}
  117. }
  118. }
  119. }
  120. }
  121. // Clear removes every all elements from the buffer
  122. func (h *Hub) clear() {
  123. buf := h.circbuf
  124. for i := 0; i < buf.Len(); i++ {
  125. buf.Value = nil
  126. buf = buf.Next()
  127. }
  128. }