forked from boyska/circolog
FIX timeout issues between channels
we introduced non-blocking writes; however, being non-blocking when the goroutine has no buffer means that many messages will be considered lost. This commit change from "non blocking" to "max 500ms; first timeout means stop it all"; it also put a little buffer on client messages.
This commit is contained in:
parent
bf145240c2
commit
8568280dd1
2 changed files with 11 additions and 4 deletions
|
@ -17,7 +17,7 @@ func setupHttp(hub circolog.Hub) {
|
||||||
func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
|
func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
client := circolog.Client{
|
client := circolog.Client{
|
||||||
Messages: make(chan format.LogParts),
|
Messages: make(chan format.LogParts, 20),
|
||||||
Nofollow: true}
|
Nofollow: true}
|
||||||
hub.Register <- client
|
hub.Register <- client
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
client := circolog.Client{Messages: make(chan format.LogParts)}
|
client := circolog.Client{Messages: make(chan format.LogParts, 20)}
|
||||||
hub.Register <- client
|
hub.Register <- client
|
||||||
|
|
||||||
// Allow collection of memory referenced by the caller by doing all work in
|
// Allow collection of memory referenced by the caller by doing all work in
|
||||||
|
|
11
hub.go
11
hub.go
|
@ -2,6 +2,7 @@ package circolog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/ring"
|
"container/ring"
|
||||||
|
"time"
|
||||||
|
|
||||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||||
)
|
)
|
||||||
|
@ -46,12 +47,18 @@ func (h *Hub) Run() {
|
||||||
if !cl.Nofollow { // we won't need it in future
|
if !cl.Nofollow { // we won't need it in future
|
||||||
h.clients[cl] = true
|
h.clients[cl] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
circbuf_do_exit := false
|
||||||
h.circbuf.Do(func(x interface{}) {
|
h.circbuf.Do(func(x interface{}) {
|
||||||
|
if circbuf_do_exit {
|
||||||
|
return
|
||||||
|
}
|
||||||
if x != nil {
|
if x != nil {
|
||||||
select { // send without blocking
|
select { // send with short timeout
|
||||||
case cl.Messages <- x.(format.LogParts):
|
case cl.Messages <- x.(format.LogParts):
|
||||||
break
|
break
|
||||||
default:
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
circbuf_do_exit = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue