2018-08-23 01:21:53 +02:00
package circolog
2018-08-22 23:51:59 +02:00
import (
2018-08-23 00:56:27 +02:00
"container/ring"
2018-08-23 02:05:50 +02:00
"time"
2018-08-22 23:51:59 +02:00
"gopkg.in/mcuadros/go-syslog.v2/format"
)
// Client represent a client connected via websocket. Its most important field is the messages channel, where
2018-08-23 00:56:27 +02:00
// new messages are sent.
2018-08-22 23:51:59 +02:00
type Client struct {
2018-08-23 00:56:27 +02:00
Messages chan format . LogParts // only hub should write/close this
Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
2018-08-22 23:51:59 +02:00
}
2018-08-23 00:56:27 +02:00
// The Hub is the central "registry"; it keeps both the data storage and clients notifications
//
// The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given
2018-08-22 23:51:59 +02:00
type Hub struct {
Register chan Client
Unregister chan Client
2018-08-23 00:56:27 +02:00
LogMessages chan format . LogParts
clients map [ Client ] bool
circbuf * ring . Ring
2018-08-22 23:51:59 +02:00
}
// NewHub creates an empty hub
2018-08-23 00:56:27 +02:00
func NewHub ( ringBufSize int ) Hub {
2018-08-22 23:51:59 +02:00
return Hub { clients : make ( map [ Client ] bool ) ,
Register : make ( chan Client ) ,
Unregister : make ( chan Client ) ,
2018-08-23 00:56:27 +02:00
LogMessages : make ( chan format . LogParts ) ,
circbuf : ring . New ( ringBufSize ) ,
2018-08-22 23:51:59 +02:00
}
}
2018-08-23 00:56:27 +02:00
// Run is hub main loop; keeps everything going
2018-08-22 23:51:59 +02:00
func ( h * Hub ) Run ( ) {
for {
select {
case cl := <- h . Register :
2018-08-23 00:56:27 +02:00
if _ , ok := h . clients [ cl ] ; ! ok {
if ! cl . Nofollow { // we won't need it in future
h . clients [ cl ] = true
}
2018-08-23 02:05:50 +02:00
circbuf_do_exit := false
2018-08-23 00:56:27 +02:00
h . circbuf . Do ( func ( x interface { } ) {
2018-08-23 02:05:50 +02:00
if circbuf_do_exit {
return
}
2018-08-23 00:56:27 +02:00
if x != nil {
2018-08-23 02:05:50 +02:00
select { // send with short timeout
2018-08-23 01:14:08 +02:00
case cl . Messages <- x . ( format . LogParts ) :
break
2018-08-23 02:05:50 +02:00
case <- time . After ( 500 * time . Millisecond ) :
circbuf_do_exit = true
2018-08-23 01:14:08 +02:00
break
}
2018-08-23 00:56:27 +02:00
}
} )
if cl . Nofollow {
close ( cl . Messages )
}
}
2018-08-22 23:51:59 +02:00
case cl := <- h . Unregister :
_ , ok := h . clients [ cl ]
if ok {
2018-08-23 00:56:27 +02:00
close ( cl . Messages )
2018-08-22 23:51:59 +02:00
delete ( h . clients , cl )
}
2018-08-23 00:56:27 +02:00
case msg := <- h . LogMessages :
h . circbuf . Value = msg
h . circbuf = h . circbuf . Next ( )
2018-08-22 23:51:59 +02:00
for client := range h . clients {
2018-08-23 01:14:08 +02:00
select { // send without blocking
case client . Messages <- msg :
break
default :
break
}
2018-08-22 23:51:59 +02:00
}
}
}
}