123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- package main
- import (
- "bufio"
- "encoding/json"
- "flag"
- "fmt"
- "net"
- "os"
- "os/signal"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/davecheney/gpio"
- "github.com/pkg/errors"
- "git.lattuga.net/boyska/direttoforo"
- )
- // appicc consumes a channel representing a stream of direttoforo.State updates
- // for each message, GPIOs are set accordingly
- // if the pin cannot be opened, this function will fail early. However, caller has no way to receive this
- // information nor informations about the error
- // this function will live as long as the chan "s" will
- func appicc(gpios gpioMapFlag, s <-chan direttoforo.State) error {
- unassigned := make([]string, 0)
- pins := make(map[string]gpio.Pin)
- for name, pinnumber := range gpios {
- pin, err := gpio.OpenPin(pinnumber, gpio.ModeOutput)
- if err != nil {
- return errors.Wrap(err, "error opening pin")
- }
- pins[name] = pin
- }
- for state := range s {
- fmt.Println("led", state)
- for sname, sstate := range state.Streams {
- pin, ok := pins[sname]
- if ok {
- if sstate.Up {
- pin.Set()
- } else {
- pin.Clear()
- }
- } else {
- // warn about it?
- alreadywarned := false
- for _, v := range unassigned {
- if v == sname {
- alreadywarned = true
- }
- }
- if !alreadywarned {
- unassigned = append(unassigned, sname)
- fmt.Fprintf(os.Stderr, "INFO: stream '%s' unassigned\n", sname)
- }
- }
- }
- }
- for _, pin := range pins {
- pin.Clear()
- pin.Close()
- }
- fmt.Println("led poweroff")
- return nil
- }
- func connect(bindpath string, s chan<- direttoforo.State, quit <-chan interface{}) {
- defer close(s)
- OuterLoop:
- for {
- select {
- case <-quit:
- return
- default:
- time.Sleep(time.Second)
- }
- conn, err := net.Dial("unix", bindpath)
- if err != nil {
- fmt.Fprintln(os.Stderr, "error connecting", err)
- continue OuterLoop
- }
- defer conn.Close()
- hello := []byte("DIRETTOFORO V1\n") // hello size is specified by protocol to be constant
- b := bufio.NewReader(conn)
- nread, err := b.Read(hello)
- if err != nil {
- fmt.Fprintln(os.Stderr, "error reading protocol version", err)
- continue OuterLoop
- }
- if nread != 15 || string(hello) != "DIRETTOFORO V1\n" {
- fmt.Fprintln(os.Stderr, "error: protocol version mismatch")
- continue OuterLoop
- }
- // read data from connection and put in a chan
- msgs := make(chan direttoforo.State)
- go func() {
- decoder := json.NewDecoder(conn)
- var newState direttoforo.State
- for {
- err = decoder.Decode(&newState)
- if err != nil {
- fmt.Fprintln(os.Stderr, "error decoding", err)
- time.Sleep(time.Second)
- close(msgs)
- break
- }
- msgs <- newState
- }
- }()
- MsgLoop: // copy msgs to s, but handles quit signal, too
- for {
- select {
- case <-quit:
- return
- case msg := <-msgs:
- if msg.Streams == nil { // msgs closed
- break MsgLoop
- }
- s <- msg
- }
- }
- }
- }
- // TODO: maybe create a map[string]int instead?
- type gpioMapFlag map[string]int
- func (i *gpioMapFlag) String() string {
- return fmt.Sprint(*i)
- }
- func (i *gpioMapFlag) Set(value string) error {
- for _, keyvalue := range strings.Split(value, ",") {
- kv := strings.SplitN(keyvalue, ":", 2)
- if len(kv) != 2 {
- return fmt.Errorf("Invalid keyvalue: %s", keyvalue)
- }
- value, err := strconv.Atoi(kv[1])
- if err != nil {
- return err
- }
- for k, v := range *i {
- if v == value {
- return fmt.Errorf("Can't assign both '%s' and '%s' to %d", k, kv[0], value)
- }
- }
- (*i)[kv[0]] = value
- }
- return nil
- }
- func main() {
- gpios := make(gpioMapFlag)
- bindpath := flag.String("bindpath", "/var/lib/direttoforo/ui.sock", "UNIX domain socket path for UIs")
- flag.Var(&gpios, "gpios", "Comma-separated list of stream-GPIOnumber to be used. Example: rec:11,stream:7")
- flag.Parse()
- fmt.Println(gpios)
- states := make(chan direttoforo.State)
- connectcloser := make(chan interface{})
- allDone := make(chan interface{})
- ledDone := make(chan error)
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- wg.Wait()
- close(allDone)
- }()
- go func() {
- err := appicc(gpios, states)
- if err != nil {
- fmt.Fprintln(os.Stderr, err)
- }
- ledDone <- err // appicc has no termination notification. This provides a way to have one
- wg.Done()
- }()
- go func() {
- connect(*bindpath, states, connectcloser)
- wg.Done()
- }()
- killed := make(chan os.Signal, 1)
- signal.Notify(killed, os.Interrupt) // ctrl-c
- for {
- select {
- case sig := <-killed:
- fmt.Println("killed by", sig)
- close(connectcloser)
- case err := <-ledDone:
- // led controller failed early (error on OpenPin)
- if err != nil {
- close(connectcloser)
- }
- // else: it was closed because we are shutting
- // down
- case <-allDone:
- os.Exit(0)
- }
- }
- }
|