diff --git a/cmd/circologctl/main.go b/cmd/circologctl/main.go new file mode 100644 index 0000000..0eb7ed9 --- /dev/null +++ b/cmd/circologctl/main.go @@ -0,0 +1,124 @@ +package main + +import ( + "context" + "flag" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "os" + "time" +) + +var globalOpts struct { + ctlSock string + verbose bool + debug bool +} + +var ctl http.Client + +type commandFunc func([]string) error + +var cmdMap map[string]commandFunc + +func init() { + cmdMap = map[string]commandFunc{ + // TODO: implement set and get of config at runtime + //"set": setCmd, + //"get": getCmd, + "pause": pauseCmd, + "reload": reloadCmd, + "restart": restartCmd, + "help": helpCmd, + } +} + +//func setCmd(ctlSock string, args []string) error {} + +//func getCmd(ctlSock string, args []string) error {} + +func pauseCmd(args []string) error { + var dontChangeAgain time.Duration + flagset := flag.NewFlagSet(args[0], flag.ExitOnError) + waitTime := flagset.Duration("wait-time", dontChangeAgain, "How long to wait before untoggling the state, defaults to never") + flagset.Parse(args[1:]) + postBody := make(map[string][]string) + if *waitTime != dontChangeAgain { + postBody["waitTime"] = []string{fmt.Sprintf("%s", *waitTime)} + } + if globalOpts.debug { + fmt.Println("[DEBUG] postBody:", postBody) + } + resp, err := ctl.PostForm("http://unix/pause/toggle", postBody) + if globalOpts.verbose { + defer resp.Body.Close() + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + fmt.Println(string(bodyBytes)) + } + return err +} + +func reloadCmd(args []string) error { + return nil +} + +func restartCmd(args []string) error { + return nil +} + +func helpCmd(args []string) error { + usage(os.Stdout) + os.Exit(0) + return nil +} + +func usage(w io.Writer) { + fmt.Fprintf(w, "USAGE: %s [globalOpts] [SUBCOMMAND] [opts]\n", os.Args[0]) + fmt.Fprintf(w, "\nSUBCOMMANDS:\n\n") + for command := range cmdMap { + fmt.Fprintf(w, "\t%s\n", command) + } +} + +func parseAndRun(args []string) { + cmdName := args[0] + cmdToRun, ok := cmdMap[cmdName] + if !ok { + fmt.Fprintf(os.Stderr, "Unknown subcommand: %s\n", cmdName) + usage(os.Stderr) + os.Exit(2) + } + // from here: https://gist.github.com/teknoraver/5ffacb8757330715bcbcc90e6d46ac74 + ctl = http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", globalOpts.ctlSock) + }, + }, + } + err := cmdToRun(args) + if err != nil { + fmt.Fprintf(os.Stderr, "Error:\n%s\n", err) + os.Exit(1) + } +} + +func main() { + flag.StringVar(&globalOpts.ctlSock, "ctl-socket", "/tmp/circologd-ctl.sock", + "Path to a unix domain socket for the control server; leave empty to disable") + flag.BoolVar(&globalOpts.verbose, "verbose", false, "Print more output") + flag.BoolVar(&globalOpts.debug, "debug", false, "Print debugging info") + flag.Parse() + args := flag.Args() + if len(args) == 0 { + usage(os.Stderr) + os.Exit(-1) + } + parseAndRun(args) +} diff --git a/cmd/circologd/http_ctl.go b/cmd/circologd/http_ctl.go index cde8538..d47eec8 100644 --- a/cmd/circologd/http_ctl.go +++ b/cmd/circologd/http_ctl.go @@ -2,23 +2,51 @@ package main import ( "encoding/json" + "fmt" + "log" "net/http" + "os" + "time" + fractal "git.lattuga.net/blallo/gotools/formatting" "git.lattuga.net/boyska/circolog" "github.com/gorilla/mux" ) -func setupHTTPCtl(hub circolog.Hub) *mux.Router { - mux := mux.NewRouter() - mux.HandleFunc("/pause/toggle", togglePause(hub)).Methods("POST") - mux.HandleFunc("/logs/clear", clearQueue(hub)).Methods("POST") - return mux +func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router { + m := mux.NewRouter() + m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST") + m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST") + m.HandleFunc("/help", printHelp(verbose)).Methods("GET") + m.HandleFunc("/echo", echo(verbose)).Methods("GET") + return m } -func togglePause(hub circolog.Hub) http.HandlerFunc { +func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle} - resp := <-hub.Responses + if verbose { + log.Printf("[%s] %s - toggled pause", r.Method, r.RemoteAddr) + } + r.ParseForm() + waitTimePar := r.FormValue("waitTime") + var waitTime time.Duration + var err error + if waitTimePar != "" { + waitTime, err = time.ParseDuration(waitTimePar) + if err != nil { + fmt.Println("waitTime not understood:", waitTimePar) + } + } + if debug { + fmt.Println("[DEBUG] waitTime:", waitTime) + } + response := make(chan circolog.CommandResponse) + hub.Commands <- circolog.HubFullCommand{ + Command: circolog.CommandPauseToggle, + Response: response, + Parameters: map[string]interface{}{"waitTime": waitTime}, + } + resp := <-response active := resp.Value.(bool) w.Header().Set("content-type", "application/json") enc := json.NewEncoder(w) @@ -26,13 +54,49 @@ func togglePause(hub circolog.Hub) http.HandlerFunc { } } -func clearQueue(hub circolog.Hub) http.HandlerFunc { +func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear} - resp := <-hub.Responses + if verbose { + log.Printf("[%s] %s - cleared queue", r.Method, r.RemoteAddr) + } + response := make(chan circolog.CommandResponse) + hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear, Response: response} + resp := <-response success := resp.Value.(bool) w.Header().Set("content-type", "application/json") enc := json.NewEncoder(w) enc.Encode(map[string]interface{}{"success": success}) } } + +func printHelp(verbose bool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if verbose { + log.Printf("[%s] %s - asked for help", r.Method, r.RemoteAddr) + } + w.Header().Set("content-type", "application/json") + enc := json.NewEncoder(w) + var pathsWithDocs = map[string]string{ + "/pause/toggle": "Toggle the server from pause state (not listening)", + "/logs/clear": "Wipe the buffer from all the messages", + "/help": "This help", + "/echo": "Answers to heartbeat", + } + fractal.EncodeJSON(pathsWithDocs, enc) + if verbose { + errEnc := json.NewEncoder(os.Stderr) + fractal.EncodeJSON(pathsWithDocs, errEnc) + } + } +} + +func echo(verbose bool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + log.Println("Echo") + if verbose { + log.Printf("[%s] %s - asked for echo", r.Method, r.RemoteAddr) + } + w.Header().Set("content-type", "text/plain") + fmt.Fprintln(w, "I am on!") + } +} diff --git a/cmd/circologd/main.go b/cmd/circologd/main.go index b40641c..cf0a202 100644 --- a/cmd/circologd/main.go +++ b/cmd/circologd/main.go @@ -29,10 +29,12 @@ func main() { queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service") querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!") ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable") + verbose := flag.Bool("verbose", false, "Print more output executing the daemon") + debug := flag.Bool("debug", false, "Print debugging info executing the daemon") flag.Parse() interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGTERM) + signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGTERM) hub := circolog.NewHub(*bufsize) handler := syslog.NewChannelHandler(hub.LogMessages) @@ -85,7 +87,7 @@ func main() { }() } - httpCtlServer := http.Server{Handler: setupHTTPCtl(hub)} + httpCtlServer := http.Server{Handler: setupHTTPCtl(hub, *verbose, *debug)} if *ctlSocket != "" { fmt.Printf("Binding address `%s` [http]\n", *ctlSocket) unixListener, err := net.Listen("unix", *ctlSocket) @@ -107,14 +109,25 @@ func main() { select { case sig := <-interrupt: if sig == syscall.SIGUSR1 { - hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle} - resp := <-hub.Responses + response := make(chan circolog.CommandResponse) + hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle, Response: response} + resp := <-response if resp.Value.(bool) { log.Println("resumed") } else { log.Println("paused") } } + if sig == syscall.SIGUSR2 { + response := make(chan circolog.CommandResponse) + hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear, Response: response} + resp := <-response + if resp.Value.(bool) { + log.Println("buffer cleaned") + } else { + log.Println("buffer NOT cleaned") + } + } if sig == syscall.SIGTERM || sig == syscall.SIGINT { log.Println("Quitting because of signal", sig) server.Kill() diff --git a/hub.go b/hub.go index 639612f..7d160c2 100644 --- a/hub.go +++ b/hub.go @@ -2,6 +2,8 @@ package circolog import ( "container/ring" + "fmt" + "os" "time" "gopkg.in/mcuadros/go-syslog.v2/format" @@ -35,7 +37,9 @@ const ( // An HubFullCommand is a Command, complete with arguments type HubFullCommand struct { - Command HubCommand + Command HubCommand + Parameters map[string]interface{} + Response chan CommandResponse } type CommandResponse struct { Value interface{} @@ -46,7 +50,6 @@ type Hub struct { Unregister chan Client LogMessages chan format.LogParts Commands chan HubFullCommand - Responses chan CommandResponse clients map[Client]bool circbuf *ring.Ring @@ -59,7 +62,6 @@ func NewHub(ringBufSize int) Hub { Unregister: make(chan Client), LogMessages: make(chan format.LogParts), Commands: make(chan HubFullCommand), - Responses: make(chan CommandResponse), circbuf: ring.New(ringBufSize), } } @@ -125,17 +127,30 @@ func (h *Hub) Run() { case cmd := <-h.Commands: if cmd.Command == CommandClear { h.clear() - h.Responses <- CommandResponse{Value: true} + cmd.Response <- CommandResponse{Value: true} } if cmd.Command == CommandPauseToggle { - active = !active - h.Responses <- CommandResponse{Value: active} + togglePause(cmd.Parameters["waitTime"].(time.Duration), &active) + cmd.Response <- CommandResponse{Value: active} } } } } -// Clear removes every all elements from the buffer +func togglePause(waitTime time.Duration, status *bool) { + var noTime time.Duration + if waitTime != noTime { + delayedToggle := func() { + time.Sleep(waitTime) + fmt.Fprintln(os.Stderr, "toggling again") + togglePause(noTime, status) + } + go delayedToggle() + } + *status = !*status +} + +// Clear removes all elements from the buffer func (h *Hub) clear() { buf := h.circbuf for i := 0; i < buf.Len(); i++ {