diff --git a/README.md b/README.md index 3f3d6dd..8e79a89 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ A syslog daemon implementing circular buffer, in-memory storage. -This is useful when you want to keep some (heavy detailed) log available, but you don't want to log too many -things to disk. +This is useful when you want to keep some (heavily detailed) log available, but you don't want to log too many +things to disk. Remember: logging is useful, but can be dangerous to your users' privacy! -On your "main" syslog, send some message to this one! +On your "main" syslog, forward (part of the) messages to this one! ## Integration examples @@ -27,15 +27,59 @@ and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/ ## Client -`curl` might be enough of a client for most uses. +`circolog` has its own client: `circolog-tail`. It is intended to resemble `tail -f` for the most basic +options; however, it will include filtering options that are common when you want to read logs, because that's +very easy when you have structured logs available. + +However, one design point of circolog is to be usable without having a specific client: so the logs are +offered on both HTTP and websocket. This means that you can use `curl` if you want: curl --unix-socket /run/circolog-query.sock localhost/ -will give you everything that circologd has in memory +will give you everything that circologd has in memory. If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of any websocket client supporting UNIX domain socket, so you have two options: - 1. wait until I write a proper `circolog-tail` client implementing it all + 1. Use `circolog-tail` 2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages. + +### HTTP URLs and parameters + +When using HTTP, logs are served on `/`. Valid parameters are: + + * `l`. This is the amount of lines to send. This is essentially the same as the `-n` parameter on tail + Using `l=-1` (the default) means "give me every log message that you have + * `fmt`. This selects the output format. When `fmt=json` is used, each message is returned as JSON structured + data. The format of those JSON messages is still unstable. `fmt=syslog`, the default, outputs messages using "syslog style" (RFC XXXXXX) + +To use websocket, request path `/ws`. The same parameters of `/` are recognized. + +## Control daemon + +Circologd can be controlled, on some aspects, at run-time. It has 2 mechanisms for that: the easiest, and more +limited, is sending a signal with kill; the second, and more powerful, is a control socket, where you can give +commands to it. This control socket is just HTTP, so again `curl` is your friend. In the future a +`circolog-ctl` client will be developed. + +### Pause + +When circologd is paused, every new message it receives is immediately discarded. No exception. The backlog +is, however, preserved. This means that you can trigger the event that you want to investigate, pause +circolog, then analyze the logs. +Pausing might be the easiest way to make circologd only run "when needed". + +When circologd resumes, no previous message is lost. + + +To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again. + +To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket. + +### Clear + +When you clear the circologd's buffer, it will discard every message it has, but will keep collecting new +messages. + +You can do that with `POST /logs/clear` diff --git a/cmd/circolog-tail/main.go b/cmd/circolog-tail/main.go index 5b25ba7..ce653a7 100644 --- a/cmd/circolog-tail/main.go +++ b/cmd/circolog-tail/main.go @@ -11,7 +11,10 @@ import ( "strconv" "time" + "git.lattuga.net/boyska/circolog/formatter" "github.com/gorilla/websocket" + "gopkg.in/mcuadros/go-syslog.v2/format" + "gopkg.in/mgo.v2/bson" ) func main() { @@ -28,6 +31,7 @@ func main() { Path: "/ws", } q := u.Query() + q.Set("fmt", "bson") if *backlogLimit >= 0 { q.Set("l", strconv.Itoa(*backlogLimit)) } @@ -58,12 +62,20 @@ func main() { go func() { defer close(done) for { - _, message, err := c.ReadMessage() + _, serialized, err := c.ReadMessage() if err != nil { log.Println("close:", err) return } - fmt.Println(string(message)) + var parsed format.LogParts + if err := bson.Unmarshal(serialized, &parsed); err != nil { + log.Println("invalid YAML", err) + continue + } + if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil { + log.Println("error printing", err) + } + fmt.Println() } }() diff --git a/cmd/circologd/http_ctl.go b/cmd/circologd/http_ctl.go new file mode 100644 index 0000000..cde8538 --- /dev/null +++ b/cmd/circologd/http_ctl.go @@ -0,0 +1,38 @@ +package main + +import ( + "encoding/json" + "net/http" + + "git.lattuga.net/boyska/circolog" + "github.com/gorilla/mux" +) + +func setupHTTPCtl(hub circolog.Hub) *mux.Router { + mux := mux.NewRouter() + mux.HandleFunc("/pause/toggle", togglePause(hub)).Methods("POST") + mux.HandleFunc("/logs/clear", clearQueue(hub)).Methods("POST") + return mux +} + +func togglePause(hub circolog.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle} + resp := <-hub.Responses + active := resp.Value.(bool) + w.Header().Set("content-type", "application/json") + enc := json.NewEncoder(w) + enc.Encode(map[string]interface{}{"paused": !active}) + } +} + +func clearQueue(hub circolog.Hub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear} + resp := <-hub.Responses + success := resp.Value.(bool) + w.Header().Set("content-type", "application/json") + enc := json.NewEncoder(w) + enc.Encode(map[string]interface{}{"success": success}) + } +} diff --git a/cmd/circologd/http.go b/cmd/circologd/http_log.go similarity index 88% rename from cmd/circologd/http.go rename to cmd/circologd/http_log.go index 7962d8d..37077de 100644 --- a/cmd/circologd/http.go +++ b/cmd/circologd/http_log.go @@ -9,13 +9,16 @@ import ( "time" "git.lattuga.net/boyska/circolog" + "git.lattuga.net/boyska/circolog/formatter" "github.com/gorilla/websocket" "gopkg.in/mcuadros/go-syslog.v2/format" ) -func setupHTTP(hub circolog.Hub) { - http.HandleFunc("/", getHTTPHandler(hub)) - http.HandleFunc("/ws", getWSHandler(hub)) +func setupHTTP(hub circolog.Hub) *http.ServeMux { + mux := http.NewServeMux() + mux.HandleFunc("/", getHTTPHandler(hub)) + mux.HandleFunc("/ws", getWSHandler(hub)) + return mux } func parseParameterL(r *http.Request) (int, error) { @@ -54,7 +57,7 @@ func parseParameters(r *http.Request) (circolog.ClientOptions, error) { } type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd) - Format renderFormat + Format formatter.Format } func parseRenderParameters(r *http.Request) (renderOptions, error) { @@ -68,11 +71,10 @@ func parseRenderParameters(r *http.Request) (renderOptions, error) { if len(val) != 1 { return opts, errors.New("Format repeated multiple times") } - format, err := parseRenderFormat(val[0]) + err := opts.Format.Set(val[0]) if err != nil { return opts, err } - opts.Format = format } return opts, nil } @@ -104,9 +106,10 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc { hub.Register <- client for x := range client.Messages { - writeFormatted(w, render_opts.Format, x) - if render_opts.Format != formatJSON { // bleah - w.Write([]byte("\n")) + if err := render_opts.Format.WriteFormatted(w, x); err == nil { + if render_opts.Format != formatter.FormatJSON { // bleah + w.Write([]byte("\n")) + } } } } @@ -164,7 +167,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc { if err != nil { return } - writeFormatted(w, render_opts.Format, message) + render_opts.Format.WriteFormatted(w, message) if err := w.Close(); err != nil { return diff --git a/cmd/circologd/main.go b/cmd/circologd/main.go index 12faac1..b40641c 100644 --- a/cmd/circologd/main.go +++ b/cmd/circologd/main.go @@ -16,7 +16,7 @@ import ( func cleanSocket(socket string) { if err := os.Remove(socket); err != nil { - fmt.Fprintln(os.Stderr, socket, ":", err) + fmt.Fprintln(os.Stderr, "Error cleaning", socket, ":", err) } } @@ -28,13 +28,15 @@ func main() { syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages") queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service") querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!") + ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable") flag.Parse() interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) + signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGTERM) hub := circolog.NewHub(*bufsize) handler := syslog.NewChannelHandler(hub.LogMessages) + go hub.Run() server := syslog.NewServer() server.SetFormat(syslog.RFC5424) @@ -57,10 +59,8 @@ func main() { fmt.Fprintln(os.Stderr, "argh", err) os.Exit(1) } - go hub.Run() - setupHTTP(hub) - httpServer := http.Server{Handler: nil} + httpQueryServer := http.Server{Handler: setupHTTP(hub)} if *querySocket != "" { fmt.Printf("Binding address `%s` [http]\n", *querySocket) unixListener, err := net.Listen("unix", *querySocket) @@ -70,31 +70,62 @@ func main() { } defer cleanSocket(*querySocket) go func() { - if err := httpServer.Serve(unixListener); err != nil { - fmt.Fprintln(os.Stderr, "error binding:", err) + if err := httpQueryServer.Serve(unixListener); err != nil && err != http.ErrServerClosed { + fmt.Fprintln(os.Stderr, "error binding", *querySocket, ":", err) } }() } else { - httpServer.Addr = *queryAddr + httpQueryServer.Addr = *queryAddr fmt.Printf("Binding address `%s` [http]\n", *queryAddr) go func() { - err := httpServer.ListenAndServe() - if err != nil { + err := httpQueryServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + fmt.Fprintln(os.Stderr, "error binding", *queryAddr, ":", err) + } + }() + } + + httpCtlServer := http.Server{Handler: setupHTTPCtl(hub)} + if *ctlSocket != "" { + fmt.Printf("Binding address `%s` [http]\n", *ctlSocket) + unixListener, err := net.Listen("unix", *ctlSocket) + if err != nil { + fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err) + return + } + defer cleanSocket(*ctlSocket) + go func() { + if err := httpCtlServer.Serve(unixListener); err != nil && err != http.ErrServerClosed { fmt.Fprintln(os.Stderr, "error binding:", err) } }() } + // TODO: now we are ready + for { select { case sig := <-interrupt: - log.Println("Quitting because of signal", sig) - server.Kill() - if err := httpServer.Close(); err != nil { - fmt.Fprintln(os.Stderr, "Error closing http server:", err) + if sig == syscall.SIGUSR1 { + hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle} + resp := <-hub.Responses + if resp.Value.(bool) { + log.Println("resumed") + } else { + log.Println("paused") + } + } + if sig == syscall.SIGTERM || sig == syscall.SIGINT { + log.Println("Quitting because of signal", sig) + server.Kill() + if err := httpQueryServer.Shutdown(nil); err != nil { + fmt.Fprintln(os.Stderr, "Error closing http server:", err) + } + if err := httpCtlServer.Shutdown(nil); err != nil { + fmt.Fprintln(os.Stderr, "Error closing control server:", err) + } + return } - return - default: } } } diff --git a/cmd/circologd/format.go b/formatter/format.go similarity index 53% rename from cmd/circologd/format.go rename to formatter/format.go index 4818d2f..94a4176 100644 --- a/cmd/circologd/format.go +++ b/formatter/format.go @@ -1,4 +1,4 @@ -package main +package formatter import ( "encoding/json" @@ -8,6 +8,7 @@ import ( "time" "gopkg.in/mcuadros/go-syslog.v2/format" + "gopkg.in/mgo.v2/bson" ) // Formatter is an interface, so that multiple implementations can exist @@ -33,32 +34,67 @@ func init() { )) } -type renderFormat int +type Format int -func parseRenderFormat(s string) (renderFormat, error) { +func (rf *Format) Set(v string) error { + newvalue, err := parseFormat(v) + if err != nil { + return err + } + *rf = newvalue + return nil +} + +func (rf Format) String() string { + switch rf { + case FormatJSON: + return "json" + case FormatSyslog: + return "syslog" + case FormatBSON: + return "bson" + } + return "" +} + +func (rf Format) WriteFormatted(w io.Writer, msg format.LogParts) error { + return WriteFormatted(w, rf, msg) +} + +func parseFormat(s string) (Format, error) { switch s { case "json": - return formatJSON, nil + return FormatJSON, nil case "syslog": - return formatSyslog, nil + return FormatSyslog, nil + case "bson": + return FormatBSON, nil default: return 0, fmt.Errorf("Undefined format `%s`", s) } } const ( - formatSyslog = iota // 0 - formatJSON = iota + FormatSyslog = iota // 0 + FormatJSON = iota + FormatBSON = iota ) -func writeFormatted(w io.Writer, f renderFormat, msg format.LogParts) error { +func WriteFormatted(w io.Writer, f Format, msg format.LogParts) error { switch f { - case formatSyslog: + case FormatSyslog: return syslogTmpl.Execute(w, msg) - case formatJSON: + case FormatJSON: enc := json.NewEncoder(w) enc.SetIndent("", " ") return enc.Encode(msg) + case FormatBSON: + enc, err := bson.Marshal(msg) + if err != nil { + return err + } + _, err = w.Write(enc) + return err } return nil } diff --git a/hub.go b/hub.go index 10bf8c1..639612f 100644 --- a/hub.go +++ b/hub.go @@ -24,10 +24,29 @@ type ClientOptions struct { // The channel "register" and "unregister" can be seen as "command" // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client // has "options", such as Nofollow, to explain the Hub what should be given + +// An HubCommand is an "enum" of different commands +type HubCommand int + +const ( + CommandClear = iota + CommandPauseToggle = iota +) + +// An HubFullCommand is a Command, complete with arguments +type HubFullCommand struct { + Command HubCommand +} +type CommandResponse struct { + Value interface{} +} + type Hub struct { Register chan Client Unregister chan Client LogMessages chan format.LogParts + Commands chan HubFullCommand + Responses chan CommandResponse clients map[Client]bool circbuf *ring.Ring @@ -39,6 +58,8 @@ func NewHub(ringBufSize int) Hub { Register: make(chan Client), Unregister: make(chan Client), LogMessages: make(chan format.LogParts), + Commands: make(chan HubFullCommand), + Responses: make(chan CommandResponse), circbuf: ring.New(ringBufSize), } } @@ -77,6 +98,7 @@ func (h *Hub) register(cl Client) { // Run is hub main loop; keeps everything going func (h *Hub) Run() { + active := true for { select { case cl := <-h.Register: @@ -88,16 +110,36 @@ func (h *Hub) Run() { delete(h.clients, cl) } case msg := <-h.LogMessages: - h.circbuf.Value = msg - h.circbuf = h.circbuf.Next() - for client := range h.clients { - select { // send without blocking - case client.Messages <- msg: - break - default: - break + if active == true { + h.circbuf.Value = msg + h.circbuf = h.circbuf.Next() + for client := range h.clients { + select { // send without blocking + case client.Messages <- msg: + break + default: + break + } } } + case cmd := <-h.Commands: + if cmd.Command == CommandClear { + h.clear() + h.Responses <- CommandResponse{Value: true} + } + if cmd.Command == CommandPauseToggle { + active = !active + h.Responses <- CommandResponse{Value: active} + } } } } + +// Clear removes every all elements from the buffer +func (h *Hub) clear() { + buf := h.circbuf + for i := 0; i < buf.Len(); i++ { + buf.Value = nil + buf = buf.Next() + } +}