FIX remove workers (concurrently)

This commit is contained in:
boyska 2017-07-24 00:04:46 +02:00
parent 044696b7a0
commit 0965847026

View file

@ -4,7 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net" "net"
"os" "sync"
) )
// Worker handles a single connection, providing updates to the clients // Worker handles a single connection, providing updates to the clients
@ -55,6 +55,7 @@ func (w *Worker) Work() {
type NetUI struct { type NetUI struct {
Object interface{} Object interface{}
workers []Worker workers []Worker
workersMutex sync.Mutex
sock *net.Listener sock *net.Listener
exit chan interface{} exit chan interface{}
} }
@ -77,14 +78,19 @@ func NewNetUI(obj interface{}) NetUI {
// Update sends the new state to every connected client // Update sends the new state to every connected client
func (n *NetUI) Update() { func (n *NetUI) Update() {
todelete := make([]Worker, 0)
n.workersMutex.Lock()
for _, w := range n.workers { for _, w := range n.workers {
select { select {
case w.Update <- n.Object: case w.Update <- n.Object:
default: default:
fmt.Fprintln(os.Stderr, "error pushing message to worker on", w.Conn.RemoteAddr()) todelete = append(todelete, w)
// TODO: shall we remove w from the pool on first error?
} }
} }
n.workersMutex.Unlock()
for _, w := range todelete {
n.removeWorker(w)
}
} }
// Close shuts the TCPserver down synchronously // Close shuts the TCPserver down synchronously
@ -92,10 +98,39 @@ func (n *NetUI) Close() {
if n.sock != nil { if n.sock != nil {
(*n.sock).Close() (*n.sock).Close()
} }
for _, w := range n.workers { // for _, w := range n.workers {
close(w.Exit) // close(w.Exit)
close(w.Update) // close(w.Update)
// }
}
func (n *NetUI) removeWorker(worker Worker) error {
n.workersMutex.Lock()
err := n.removeWorkerNoLock(worker)
n.workersMutex.Unlock()
return err
}
func (n *NetUI) removeWorkerNoLock(worker Worker) error {
fmt.Println("avevo una lista lunga", len(n.workers))
i := -1
for pos, w := range n.workers {
if worker == w {
i = pos
break
} }
}
if i == -1 {
return fmt.Errorf("No such worker")
}
n.workers[i] = n.workers[0]
n.workers = n.workers[1:]
fmt.Println("ora ho una lista lunga", len(n.workers))
return nil
}
func (n *NetUI) addWorker(connworker Worker) {
n.workersMutex.Lock()
n.workers = append(n.workers, connworker)
n.workersMutex.Unlock()
} }
// Run does the heavy work, and is blocking. It expects a Listener (typically // Run does the heavy work, and is blocking. It expects a Listener (typically
@ -123,7 +158,7 @@ func (n *NetUI) Run(sock net.Listener) error {
} }
connworker := NewWorker(conn) connworker := NewWorker(conn)
connworker.Update <- n.Object connworker.Update <- n.Object
n.workers = append(n.workers, connworker) n.addWorker(connworker)
go connworker.Work() go connworker.Work()
} }
} }