From ed985a92e9e36c5f2fd084ac76aee7c8ca5d3fe5 Mon Sep 17 00:00:00 2001 From: boyska Date: Sun, 23 Jul 2017 20:37:20 +0200 Subject: [PATCH] expose state on a socket, for UI decoupling The idea is that the UI can run in a separate process, so there is now a way for another process to know the state and to display an interface accordingly --- cmd/direttoforo/main.go | 71 ++++++++++++++++---- liquidsoap/telnet.go | 5 ++ uiserver/doc.go | 37 +++++++++++ uiserver/server.go | 141 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 242 insertions(+), 12 deletions(-) create mode 100644 uiserver/doc.go create mode 100644 uiserver/server.go diff --git a/cmd/direttoforo/main.go b/cmd/direttoforo/main.go index 97a5d30..a35452c 100644 --- a/cmd/direttoforo/main.go +++ b/cmd/direttoforo/main.go @@ -8,22 +8,42 @@ import ( "os/signal" "time" - "git.lattuga.net/boyska/direttoforo.git/liquidsoap" - "git.lattuga.net/boyska/direttoforo.git/uiserver" + "git.lattuga.net/boyska/direttoforo/liquidsoap" + "git.lattuga.net/boyska/direttoforo/uiserver" ) -func outUI(output <-chan liquidsoap.Output) { - for msg := range output { - if msg.Level <= 2 { - fmt.Println(msg) - } - } +type State struct { + Streams map[string]liquidsoap.Stream +} + +func NewState() State { + s := State{} + s.Streams = make(map[string]liquidsoap.Stream) + return s } func main() { liqfile := flag.String("liq", "foo.liq", "Path to liquidsoap script to run") + bindpath := flag.String("bindpath", "/var/lib/direttoforo/ui.sock", "UNIX domain socket path for UIs") flag.Parse() + state := NewState() + + netUIsock, err := net.Listen("unix", *bindpath) + if err != nil { + fmt.Fprintln(os.Stderr, "error binding UI socket!") + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + return + } + netUI := uiserver.NewNetUI(&state) + go func() { + err := netUI.Run(netUIsock) + if err != nil { + fmt.Fprintln(os.Stderr, "NetUI error", err) + } + }() + killLs := make(chan struct{}) // when it is closed, liquidsoap will die killed := make(chan os.Signal, 1) signal.Notify(killed, os.Interrupt) // ctrl-c @@ -33,7 +53,15 @@ func main() { os.Exit(1) } - go outUI(output) + go func(log <-chan liquidsoap.Output) { + for { + msg := <-log + if msg.Msg != "" && msg.Level < 3 { + fmt.Println("msg", msg) + } + } + }(output) + go func() { tick := time.Tick(3 * time.Second) for { @@ -44,13 +72,29 @@ func main() { continue } t.Conn.SetDeadline(time.Now().Add(3 * time.Second)) - out, err := t.Outputs() + outs, err := t.Outputs() if err != nil { fmt.Println("telnet cmd errored", err) continue } + changed := false + for name, enabled := range outs { + if stream, exists := state.Streams[name]; exists { + if stream.State != enabled { + stream.State = enabled + changed = true + } + state.Streams[name] = stream + } else { + state.Streams[name] = liquidsoap.Stream{State: enabled} + changed = true + } + } t.Close() - fmt.Println("list=", out) + fmt.Println(changed, "state=", state) + if changed { + netUI.Update() + } } }() @@ -59,13 +103,16 @@ func main() { case how := <-exit: // liquidsoap exits if !how.Success() { fmt.Fprintln(os.Stderr, "liquidsoap terminated,", how.Err) + netUI.Close() os.Exit(1) } os.Exit(0) case <-killed: // we receive a SIGINT: ask liquidsoap to die is enough + netUI.Close() close(killLs) fmt.Println("Closed by user interaction, waiting for liquidsoap to exit") - // TODO: schedule a more aggressive SIGKILL if liquidsoap doesn't exit soon + // TODO: schedule a more aggressive SIGKILL if liquidsoap doesn't + // exit soon } } diff --git a/liquidsoap/telnet.go b/liquidsoap/telnet.go index 31ca301..8f1f254 100644 --- a/liquidsoap/telnet.go +++ b/liquidsoap/telnet.go @@ -14,6 +14,11 @@ type Client struct { Conn net.Conn } +// Stream represents a liquidsoap stream. It contains information about the status and the nature of it +type Stream struct { + State bool +} + // NewTelnet returns a liquidsoap.Client created using telnet on the given parameters func NewTelnet(host string, port int) (Client, error) { conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port)) diff --git a/uiserver/doc.go b/uiserver/doc.go new file mode 100644 index 0000000..eca2b0b --- /dev/null +++ b/uiserver/doc.go @@ -0,0 +1,37 @@ +/* +Package uiserver creates a server to share an object (readonly) with clients. + +Background + +We want to deal with a user interface that is output-only: we only care about +providing a representation of the internal state to users, not getting their +input (that is, we'll do that part as a separate component). + +We also want the UI to be completely detached from the core, ie running as a +separate process: to do so, bind a socket and exchange informations over it. + +This is especially reasonable when dealing with "embedded" boards where the +outputs might be LED or OLED displays and the inputs might be pushbuttons so +privilege separation is especially needed, but can prove useful in any other +context. + +It must be easy to write a UI with a different programming language: to do +so, gob is excluded, JSON is preferred. +We assume that the "state" is pretty small, so caring about diffs will only +be a waste of time. + +Protocol + +Bind a socket. Every UI is a client, which connects to our socket. +Upon connection, send some "hello" just to check versions etcetera. Then +send a serialization of the complete state. Upon change, just send it all +again. +Conversely, the client will read the socket and set the UI according to the +state that has just been read. + +Usage + +The uiserver package implements a generic server specifically designed to +send updates upon connection and whenever asked to (with NetUI.Update()) +*/ +package uiserver diff --git a/uiserver/server.go b/uiserver/server.go new file mode 100644 index 0000000..095ae72 --- /dev/null +++ b/uiserver/server.go @@ -0,0 +1,141 @@ +package uiserver + +import ( + "encoding/json" + "fmt" + "net" + "os" +) + +// Worker handles a single connection, providing updates to the clients +type Worker struct { + Update chan interface{} // this is called whenever a worker should update + Exit chan interface{} // this is called to force the worker to Close + Conn net.Conn +} + +// NewWorker creates a new Worker from an ongoing connection +func NewWorker(conn net.Conn) Worker { + w := Worker{Conn: conn} + w.Update = make(chan interface{}, 10) + return w +} + +func (w *Worker) send(obj interface{}) error { + marshalled, err := json.Marshal(obj) + if err != nil { + return err + } + marshalled = append(marshalled, byte('\n')) + _, err = w.Conn.Write(marshalled) + return err +} + +// Work loops on Worker channel to do what is needed +func (w *Worker) Work() { + for { + select { + case <-w.Exit: + w.Conn.Close() + return + case obj := <-w.Update: + err := w.send(obj) + if err != nil { + w.Conn.Close() + close(w.Update) + close(w.Exit) + return + } + } + } +} + +// NetUI handles a server for the protocol documented in this package +// the API is designed so to make it possible to attach it to multiple +// listeners, despite this is NOT possible at the moment +type NetUI struct { + Object interface{} + workers []Worker + sock *net.Listener + exit chan interface{} +} + +// protocolHello is the hello message that the server will give to clients +// a server doesn't expect any introduction from clients +// the hello message is useful to clients to recognize protocol version cleanly +// the clients are guaranteed that the hello message will always be present in +// future versions of the protocol, always terminated by a newline and of +// exactly 15bytes +var protocolHello = "DIRETTOFORO V1\n" + +// NewNetUI creates a NetUI object. Don't create it otherwise! +// the parameter "obj" must be a pointer; if it's not a pointer, no error will +// be raised but bad things could happen later +func NewNetUI(obj interface{}) NetUI { + exitchan := make(chan interface{}) + return NetUI{exit: exitchan, Object: obj} +} + +// Update sends the new state to every connected client +func (n *NetUI) Update() { + for _, w := range n.workers { + select { + case w.Update <- n.Object: + default: + fmt.Fprintln(os.Stderr, "error pushing message to worker on", w.Conn.RemoteAddr()) + // TODO: shall we remove w from the pool on first error? + } + } +} + +// Close shuts the TCPserver down synchronously +func (n *NetUI) Close() { + if n.sock != nil { + (*n.sock).Close() + } + for _, w := range n.workers { + close(w.Exit) + close(w.Update) + } +} + +// Run does the heavy work, and is blocking. It expects a Listener (typically +// got with net.Listener) +// It will exit when NetUI.Close() is called or when an error on the socket +// happens +func (n *NetUI) Run(sock net.Listener) error { + if n.sock != nil { + return fmt.Errorf("More than one Run for a single NetUI is currently unsupported") + } + n.sock = &sock + defer sock.Close() + for { + conn, err := sock.Accept() + if err != nil { + return nil + } + // FIXME: performance problem here: we're waiting each client to be + // validated before passing to sth else; we'd better run this in a + // goroutine, but then the access to n.workers would be concurrent, + // which is not allowed + err = validateRequest(conn) + if err != nil { + continue + } + connworker := NewWorker(conn) + connworker.Update <- n.Object + n.workers = append(n.workers, connworker) + go connworker.Work() + } +} + +func validateRequest(conn net.Conn) error { + _, err := conn.Write([]byte(protocolHello)) + if err != nil { + conn.Close() + return err + } + + return nil + +}