hub.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package circolog
  2. import (
  3. "container/ring"
  4. "time"
  5. "gopkg.in/mcuadros/go-syslog.v2/format"
  6. )
  7. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  8. // new messages are sent.
  9. type Client struct {
  10. Messages chan format.LogParts // only hub should write/close this
  11. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  12. }
  13. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  14. //
  15. // The channel "register" and "unregister" can be seen as "command"
  16. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  17. // has "options", such as Nofollow, to explain the Hub what should be given
  18. type Hub struct {
  19. Register chan Client
  20. Unregister chan Client
  21. LogMessages chan format.LogParts
  22. clients map[Client]bool
  23. circbuf *ring.Ring
  24. }
  25. // NewHub creates an empty hub
  26. func NewHub(ringBufSize int) Hub {
  27. return Hub{clients: make(map[Client]bool),
  28. Register: make(chan Client),
  29. Unregister: make(chan Client),
  30. LogMessages: make(chan format.LogParts),
  31. circbuf: ring.New(ringBufSize),
  32. }
  33. }
  34. func (h *Hub) register(cl Client) {
  35. if _, ok := h.clients[cl]; !ok {
  36. if !cl.Nofollow { // we won't need it in future
  37. h.clients[cl] = true
  38. }
  39. circbufDoExit := false
  40. h.circbuf.Do(func(x interface{}) {
  41. if circbufDoExit {
  42. return
  43. }
  44. if x != nil {
  45. select { // send with short timeout
  46. case cl.Messages <- x.(format.LogParts):
  47. break
  48. case <-time.After(500 * time.Millisecond):
  49. circbufDoExit = true
  50. break
  51. }
  52. }
  53. })
  54. if cl.Nofollow {
  55. close(cl.Messages)
  56. }
  57. }
  58. }
  59. // Run is hub main loop; keeps everything going
  60. func (h *Hub) Run() {
  61. for {
  62. select {
  63. case cl := <-h.Register:
  64. h.register(cl)
  65. case cl := <-h.Unregister:
  66. _, ok := h.clients[cl]
  67. if ok {
  68. close(cl.Messages)
  69. delete(h.clients, cl)
  70. }
  71. case msg := <-h.LogMessages:
  72. h.circbuf.Value = msg
  73. h.circbuf = h.circbuf.Next()
  74. for client := range h.clients {
  75. select { // send without blocking
  76. case client.Messages <- msg:
  77. break
  78. default:
  79. break
  80. }
  81. }
  82. }
  83. }
  84. }