hub.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package circolog
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "os"
  6. "time"
  7. "gopkg.in/mcuadros/go-syslog.v2/format"
  8. )
  9. // Client represent a client connected via websocket. Its most important field is the messages channel, where
  10. // new messages are sent.
  11. type Client struct {
  12. Messages chan format.LogParts // only hub should write/close this
  13. Options ClientOptions
  14. }
  15. type ClientOptions struct {
  16. BacklogLength int // how many past messages the client wants to receive upon connection
  17. Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
  18. }
  19. // The Hub is the central "registry"; it keeps both the data storage and clients notifications
  20. //
  21. // The channel "register" and "unregister" can be seen as "command"
  22. // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
  23. // has "options", such as Nofollow, to explain the Hub what should be given
  24. // An HubCommand is an "enum" of different commands
  25. type HubCommand int
  26. const (
  27. CommandClear = iota
  28. CommandPauseToggle = iota
  29. CommandStatus = iota
  30. )
  31. // An HubFullCommand is a Command, complete with arguments
  32. type HubFullCommand struct {
  33. Command HubCommand
  34. Parameters map[string]interface{}
  35. Response chan CommandResponse
  36. }
  37. type CommandResponse struct {
  38. Value interface{}
  39. }
  40. type Hub struct {
  41. Register chan Client
  42. Unregister chan Client
  43. LogMessages chan format.LogParts
  44. Commands chan HubFullCommand
  45. clients map[Client]bool
  46. circbuf *ring.Ring
  47. }
  48. // NewHub creates an empty hub
  49. func NewHub(ringBufSize int) Hub {
  50. return Hub{clients: make(map[Client]bool),
  51. Register: make(chan Client),
  52. Unregister: make(chan Client),
  53. LogMessages: make(chan format.LogParts),
  54. Commands: make(chan HubFullCommand),
  55. circbuf: ring.New(ringBufSize),
  56. }
  57. }
  58. func (h *Hub) register(cl Client) {
  59. if _, ok := h.clients[cl]; !ok {
  60. if !cl.Options.Nofollow { // we won't need it in future
  61. h.clients[cl] = true
  62. }
  63. howmany := cl.Options.BacklogLength
  64. if howmany > h.circbuf.Len() || howmany == -1 {
  65. howmany = h.circbuf.Len()
  66. }
  67. buf := h.circbuf.Move(-howmany)
  68. for i := 0; i < howmany; i++ {
  69. item := buf.Value
  70. if item != nil {
  71. select { // send with short timeout
  72. case cl.Messages <- item.(format.LogParts):
  73. break
  74. case <-time.After(500 * time.Millisecond):
  75. close(cl.Messages)
  76. return
  77. }
  78. }
  79. buf = buf.Next()
  80. }
  81. if cl.Options.Nofollow {
  82. close(cl.Messages)
  83. }
  84. }
  85. }
  86. // Run is hub main loop; keeps everything going
  87. func (h *Hub) Run() {
  88. active := true
  89. for {
  90. select {
  91. case cl := <-h.Register:
  92. h.register(cl)
  93. case cl := <-h.Unregister:
  94. _, ok := h.clients[cl]
  95. if ok {
  96. close(cl.Messages)
  97. delete(h.clients, cl)
  98. }
  99. case msg := <-h.LogMessages:
  100. if active == true {
  101. h.circbuf.Value = msg
  102. h.circbuf = h.circbuf.Next()
  103. for client := range h.clients {
  104. select { // send without blocking
  105. case client.Messages <- msg:
  106. break
  107. default:
  108. break
  109. }
  110. }
  111. }
  112. case cmd := <-h.Commands:
  113. switch cmd.Command {
  114. case CommandClear:
  115. h.clear()
  116. cmd.Response <- CommandResponse{Value: true}
  117. case CommandPauseToggle:
  118. togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
  119. if active {
  120. fmt.Print("un")
  121. }
  122. fmt.Println("paused")
  123. cmd.Response <- CommandResponse{Value: active}
  124. case CommandStatus:
  125. cmd.Response <- CommandResponse{Value: map[string]interface{}{
  126. "size": h.circbuf.Len(),
  127. "paused": !active,
  128. }}
  129. }
  130. }
  131. }
  132. }
  133. func togglePause(waitTime time.Duration, status *bool) {
  134. if waitTime != 0 {
  135. go func() {
  136. time.Sleep(waitTime)
  137. fmt.Fprintln(os.Stderr, "toggling again")
  138. togglePause(0, status)
  139. }()
  140. }
  141. *status = !*status
  142. }
  143. // Clear removes all elements from the buffer
  144. func (h *Hub) clear() {
  145. buf := h.circbuf
  146. for i := 0; i < buf.Len(); i++ {
  147. buf.Value = nil
  148. buf = buf.Next()
  149. }
  150. }