forked from boyska/circolog
refactor: hub reorganized a little
This commit is contained in:
parent
5b7ddb62a6
commit
34593d380a
3 changed files with 30 additions and 26 deletions
|
@ -9,7 +9,7 @@ import (
|
||||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupHttp(hub circolog.Hub) {
|
func setupHTTP(hub circolog.Hub) {
|
||||||
http.HandleFunc("/", getHTTPHandler(hub))
|
http.HandleFunc("/", getHTTPHandler(hub))
|
||||||
http.HandleFunc("/ws", getWSHandler(hub))
|
http.HandleFunc("/ws", getWSHandler(hub))
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ func main() {
|
||||||
}
|
}
|
||||||
go hub.Run()
|
go hub.Run()
|
||||||
|
|
||||||
setupHttp(hub)
|
setupHTTP(hub)
|
||||||
if *querySocket != "" {
|
if *querySocket != "" {
|
||||||
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
|
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
|
||||||
unixListener, err := net.Listen("unix", *querySocket)
|
unixListener, err := net.Listen("unix", *querySocket)
|
||||||
|
|
52
hub.go
52
hub.go
|
@ -38,35 +38,39 @@ func NewHub(ringBufSize int) Hub {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Hub) register(cl Client) {
|
||||||
|
if _, ok := h.clients[cl]; !ok {
|
||||||
|
if !cl.Nofollow { // we won't need it in future
|
||||||
|
h.clients[cl] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
circbufDoExit := false
|
||||||
|
h.circbuf.Do(func(x interface{}) {
|
||||||
|
if circbufDoExit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if x != nil {
|
||||||
|
select { // send with short timeout
|
||||||
|
case cl.Messages <- x.(format.LogParts):
|
||||||
|
break
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
circbufDoExit = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if cl.Nofollow {
|
||||||
|
close(cl.Messages)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Run is hub main loop; keeps everything going
|
// Run is hub main loop; keeps everything going
|
||||||
func (h *Hub) Run() {
|
func (h *Hub) Run() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case cl := <-h.Register:
|
case cl := <-h.Register:
|
||||||
if _, ok := h.clients[cl]; !ok {
|
h.register(cl)
|
||||||
if !cl.Nofollow { // we won't need it in future
|
|
||||||
h.clients[cl] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
circbuf_do_exit := false
|
|
||||||
h.circbuf.Do(func(x interface{}) {
|
|
||||||
if circbuf_do_exit {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if x != nil {
|
|
||||||
select { // send with short timeout
|
|
||||||
case cl.Messages <- x.(format.LogParts):
|
|
||||||
break
|
|
||||||
case <-time.After(500 * time.Millisecond):
|
|
||||||
circbuf_do_exit = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if cl.Nofollow {
|
|
||||||
close(cl.Messages)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case cl := <-h.Unregister:
|
case cl := <-h.Unregister:
|
||||||
_, ok := h.clients[cl]
|
_, ok := h.clients[cl]
|
||||||
if ok {
|
if ok {
|
||||||
|
|
Loading…
Reference in a new issue