expose state on a socket, for UI decoupling
The idea is that the UI can run in a separate process, so there is now a way for another process to know the state and to display an interface accordingly
This commit is contained in:
parent
bcdeaf8f8e
commit
ed985a92e9
4 changed files with 242 additions and 12 deletions
|
@ -8,22 +8,42 @@ import (
|
|||
"os/signal"
|
||||
"time"
|
||||
|
||||
"git.lattuga.net/boyska/direttoforo.git/liquidsoap"
|
||||
"git.lattuga.net/boyska/direttoforo.git/uiserver"
|
||||
"git.lattuga.net/boyska/direttoforo/liquidsoap"
|
||||
"git.lattuga.net/boyska/direttoforo/uiserver"
|
||||
)
|
||||
|
||||
func outUI(output <-chan liquidsoap.Output) {
|
||||
for msg := range output {
|
||||
if msg.Level <= 2 {
|
||||
fmt.Println(msg)
|
||||
}
|
||||
}
|
||||
type State struct {
|
||||
Streams map[string]liquidsoap.Stream
|
||||
}
|
||||
|
||||
func NewState() State {
|
||||
s := State{}
|
||||
s.Streams = make(map[string]liquidsoap.Stream)
|
||||
return s
|
||||
}
|
||||
|
||||
func main() {
|
||||
liqfile := flag.String("liq", "foo.liq", "Path to liquidsoap script to run")
|
||||
bindpath := flag.String("bindpath", "/var/lib/direttoforo/ui.sock", "UNIX domain socket path for UIs")
|
||||
flag.Parse()
|
||||
|
||||
state := NewState()
|
||||
|
||||
netUIsock, err := net.Listen("unix", *bindpath)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error binding UI socket!")
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
return
|
||||
}
|
||||
netUI := uiserver.NewNetUI(&state)
|
||||
go func() {
|
||||
err := netUI.Run(netUIsock)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "NetUI error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
killLs := make(chan struct{}) // when it is closed, liquidsoap will die
|
||||
killed := make(chan os.Signal, 1)
|
||||
signal.Notify(killed, os.Interrupt) // ctrl-c
|
||||
|
@ -33,7 +53,15 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
go outUI(output)
|
||||
go func(log <-chan liquidsoap.Output) {
|
||||
for {
|
||||
msg := <-log
|
||||
if msg.Msg != "" && msg.Level < 3 {
|
||||
fmt.Println("msg", msg)
|
||||
}
|
||||
}
|
||||
}(output)
|
||||
|
||||
go func() {
|
||||
tick := time.Tick(3 * time.Second)
|
||||
for {
|
||||
|
@ -44,13 +72,29 @@ func main() {
|
|||
continue
|
||||
}
|
||||
t.Conn.SetDeadline(time.Now().Add(3 * time.Second))
|
||||
out, err := t.Outputs()
|
||||
outs, err := t.Outputs()
|
||||
if err != nil {
|
||||
fmt.Println("telnet cmd errored", err)
|
||||
continue
|
||||
}
|
||||
changed := false
|
||||
for name, enabled := range outs {
|
||||
if stream, exists := state.Streams[name]; exists {
|
||||
if stream.State != enabled {
|
||||
stream.State = enabled
|
||||
changed = true
|
||||
}
|
||||
state.Streams[name] = stream
|
||||
} else {
|
||||
state.Streams[name] = liquidsoap.Stream{State: enabled}
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
t.Close()
|
||||
fmt.Println("list=", out)
|
||||
fmt.Println(changed, "state=", state)
|
||||
if changed {
|
||||
netUI.Update()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -59,13 +103,16 @@ func main() {
|
|||
case how := <-exit: // liquidsoap exits
|
||||
if !how.Success() {
|
||||
fmt.Fprintln(os.Stderr, "liquidsoap terminated,", how.Err)
|
||||
netUI.Close()
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(0)
|
||||
case <-killed: // we receive a SIGINT: ask liquidsoap to die is enough
|
||||
netUI.Close()
|
||||
close(killLs)
|
||||
fmt.Println("Closed by user interaction, waiting for liquidsoap to exit")
|
||||
// TODO: schedule a more aggressive SIGKILL if liquidsoap doesn't exit soon
|
||||
// TODO: schedule a more aggressive SIGKILL if liquidsoap doesn't
|
||||
// exit soon
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,11 @@ type Client struct {
|
|||
Conn net.Conn
|
||||
}
|
||||
|
||||
// Stream represents a liquidsoap stream. It contains information about the status and the nature of it
|
||||
type Stream struct {
|
||||
State bool
|
||||
}
|
||||
|
||||
// NewTelnet returns a liquidsoap.Client created using telnet on the given parameters
|
||||
func NewTelnet(host string, port int) (Client, error) {
|
||||
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", host, port))
|
||||
|
|
37
uiserver/doc.go
Normal file
37
uiserver/doc.go
Normal file
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
Package uiserver creates a server to share an object (readonly) with clients.
|
||||
|
||||
Background
|
||||
|
||||
We want to deal with a user interface that is output-only: we only care about
|
||||
providing a representation of the internal state to users, not getting their
|
||||
input (that is, we'll do that part as a separate component).
|
||||
|
||||
We also want the UI to be completely detached from the core, ie running as a
|
||||
separate process: to do so, bind a socket and exchange informations over it.
|
||||
|
||||
This is especially reasonable when dealing with "embedded" boards where the
|
||||
outputs might be LED or OLED displays and the inputs might be pushbuttons so
|
||||
privilege separation is especially needed, but can prove useful in any other
|
||||
context.
|
||||
|
||||
It must be easy to write a UI with a different programming language: to do
|
||||
so, gob is excluded, JSON is preferred.
|
||||
We assume that the "state" is pretty small, so caring about diffs will only
|
||||
be a waste of time.
|
||||
|
||||
Protocol
|
||||
|
||||
Bind a socket. Every UI is a client, which connects to our socket.
|
||||
Upon connection, send some "hello" just to check versions etcetera. Then
|
||||
send a serialization of the complete state. Upon change, just send it all
|
||||
again.
|
||||
Conversely, the client will read the socket and set the UI according to the
|
||||
state that has just been read.
|
||||
|
||||
Usage
|
||||
|
||||
The uiserver package implements a generic server specifically designed to
|
||||
send updates upon connection and whenever asked to (with NetUI.Update())
|
||||
*/
|
||||
package uiserver
|
141
uiserver/server.go
Normal file
141
uiserver/server.go
Normal file
|
@ -0,0 +1,141 @@
|
|||
package uiserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
)
|
||||
|
||||
// Worker handles a single connection, providing updates to the clients
|
||||
type Worker struct {
|
||||
Update chan interface{} // this is called whenever a worker should update
|
||||
Exit chan interface{} // this is called to force the worker to Close
|
||||
Conn net.Conn
|
||||
}
|
||||
|
||||
// NewWorker creates a new Worker from an ongoing connection
|
||||
func NewWorker(conn net.Conn) Worker {
|
||||
w := Worker{Conn: conn}
|
||||
w.Update = make(chan interface{}, 10)
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *Worker) send(obj interface{}) error {
|
||||
marshalled, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
marshalled = append(marshalled, byte('\n'))
|
||||
_, err = w.Conn.Write(marshalled)
|
||||
return err
|
||||
}
|
||||
|
||||
// Work loops on Worker channel to do what is needed
|
||||
func (w *Worker) Work() {
|
||||
for {
|
||||
select {
|
||||
case <-w.Exit:
|
||||
w.Conn.Close()
|
||||
return
|
||||
case obj := <-w.Update:
|
||||
err := w.send(obj)
|
||||
if err != nil {
|
||||
w.Conn.Close()
|
||||
close(w.Update)
|
||||
close(w.Exit)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NetUI handles a server for the protocol documented in this package
|
||||
// the API is designed so to make it possible to attach it to multiple
|
||||
// listeners, despite this is NOT possible at the moment
|
||||
type NetUI struct {
|
||||
Object interface{}
|
||||
workers []Worker
|
||||
sock *net.Listener
|
||||
exit chan interface{}
|
||||
}
|
||||
|
||||
// protocolHello is the hello message that the server will give to clients
|
||||
// a server doesn't expect any introduction from clients
|
||||
// the hello message is useful to clients to recognize protocol version cleanly
|
||||
// the clients are guaranteed that the hello message will always be present in
|
||||
// future versions of the protocol, always terminated by a newline and of
|
||||
// exactly 15bytes
|
||||
var protocolHello = "DIRETTOFORO V1\n"
|
||||
|
||||
// NewNetUI creates a NetUI object. Don't create it otherwise!
|
||||
// the parameter "obj" must be a pointer; if it's not a pointer, no error will
|
||||
// be raised but bad things could happen later
|
||||
func NewNetUI(obj interface{}) NetUI {
|
||||
exitchan := make(chan interface{})
|
||||
return NetUI{exit: exitchan, Object: obj}
|
||||
}
|
||||
|
||||
// Update sends the new state to every connected client
|
||||
func (n *NetUI) Update() {
|
||||
for _, w := range n.workers {
|
||||
select {
|
||||
case w.Update <- n.Object:
|
||||
default:
|
||||
fmt.Fprintln(os.Stderr, "error pushing message to worker on", w.Conn.RemoteAddr())
|
||||
// TODO: shall we remove w from the pool on first error?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts the TCPserver down synchronously
|
||||
func (n *NetUI) Close() {
|
||||
if n.sock != nil {
|
||||
(*n.sock).Close()
|
||||
}
|
||||
for _, w := range n.workers {
|
||||
close(w.Exit)
|
||||
close(w.Update)
|
||||
}
|
||||
}
|
||||
|
||||
// Run does the heavy work, and is blocking. It expects a Listener (typically
|
||||
// got with net.Listener)
|
||||
// It will exit when NetUI.Close() is called or when an error on the socket
|
||||
// happens
|
||||
func (n *NetUI) Run(sock net.Listener) error {
|
||||
if n.sock != nil {
|
||||
return fmt.Errorf("More than one Run for a single NetUI is currently unsupported")
|
||||
}
|
||||
n.sock = &sock
|
||||
defer sock.Close()
|
||||
for {
|
||||
conn, err := sock.Accept()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
// FIXME: performance problem here: we're waiting each client to be
|
||||
// validated before passing to sth else; we'd better run this in a
|
||||
// goroutine, but then the access to n.workers would be concurrent,
|
||||
// which is not allowed
|
||||
err = validateRequest(conn)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
connworker := NewWorker(conn)
|
||||
connworker.Update <- n.Object
|
||||
n.workers = append(n.workers, connworker)
|
||||
go connworker.Work()
|
||||
}
|
||||
}
|
||||
|
||||
func validateRequest(conn net.Conn) error {
|
||||
_, err := conn.Write([]byte(protocolHello))
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
Loading…
Reference in a new issue