From 0965847026f42141f2a4914063fcfc06795fc41a Mon Sep 17 00:00:00 2001 From: boyska Date: Mon, 24 Jul 2017 00:04:46 +0200 Subject: [PATCH] FIX remove workers (concurrently) --- uiserver/server.go | 57 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 11 deletions(-) diff --git a/uiserver/server.go b/uiserver/server.go index 28b9a94..0d0fc86 100644 --- a/uiserver/server.go +++ b/uiserver/server.go @@ -4,7 +4,7 @@ import ( "encoding/json" "fmt" "net" - "os" + "sync" ) // Worker handles a single connection, providing updates to the clients @@ -53,10 +53,11 @@ func (w *Worker) Work() { // the API is designed so to make it possible to attach it to multiple // listeners, despite this is NOT possible at the moment type NetUI struct { - Object interface{} - workers []Worker - sock *net.Listener - exit chan interface{} + Object interface{} + workers []Worker + workersMutex sync.Mutex + sock *net.Listener + exit chan interface{} } // protocolHello is the hello message that the server will give to clients @@ -77,14 +78,19 @@ func NewNetUI(obj interface{}) NetUI { // Update sends the new state to every connected client func (n *NetUI) Update() { + todelete := make([]Worker, 0) + n.workersMutex.Lock() for _, w := range n.workers { select { case w.Update <- n.Object: default: - fmt.Fprintln(os.Stderr, "error pushing message to worker on", w.Conn.RemoteAddr()) - // TODO: shall we remove w from the pool on first error? + todelete = append(todelete, w) } } + n.workersMutex.Unlock() + for _, w := range todelete { + n.removeWorker(w) + } } // Close shuts the TCPserver down synchronously @@ -92,10 +98,39 @@ func (n *NetUI) Close() { if n.sock != nil { (*n.sock).Close() } - for _, w := range n.workers { - close(w.Exit) - close(w.Update) + // for _, w := range n.workers { + // close(w.Exit) + // close(w.Update) + // } +} + +func (n *NetUI) removeWorker(worker Worker) error { + n.workersMutex.Lock() + err := n.removeWorkerNoLock(worker) + n.workersMutex.Unlock() + return err +} +func (n *NetUI) removeWorkerNoLock(worker Worker) error { + fmt.Println("avevo una lista lunga", len(n.workers)) + i := -1 + for pos, w := range n.workers { + if worker == w { + i = pos + break + } } + if i == -1 { + return fmt.Errorf("No such worker") + } + n.workers[i] = n.workers[0] + n.workers = n.workers[1:] + fmt.Println("ora ho una lista lunga", len(n.workers)) + return nil +} +func (n *NetUI) addWorker(connworker Worker) { + n.workersMutex.Lock() + n.workers = append(n.workers, connworker) + n.workersMutex.Unlock() } // Run does the heavy work, and is blocking. It expects a Listener (typically @@ -123,7 +158,7 @@ func (n *NetUI) Run(sock net.Listener) error { } connworker := NewWorker(conn) connworker.Update <- n.Object - n.workers = append(n.workers, connworker) + n.addWorker(connworker) go connworker.Work() } }