1
0
Fork 0
forked from boyska/circolog

Merge remote-tracking branch 'origin/sig-coolness'

This commit is contained in:
boyska 2018-12-11 10:52:40 +01:00
commit 48d9d2df8c
7 changed files with 258 additions and 52 deletions

View file

@ -1,9 +1,9 @@
A syslog daemon implementing circular buffer, in-memory storage.
This is useful when you want to keep some (heavy detailed) log available, but you don't want to log too many
things to disk.
This is useful when you want to keep some (heavily detailed) log available, but you don't want to log too many
things to disk. Remember: logging is useful, but can be dangerous to your users' privacy!
On your "main" syslog, send some message to this one!
On your "main" syslog, forward (part of the) messages to this one!
## Integration examples
@ -27,15 +27,59 @@ and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/
## Client
`curl` might be enough of a client for most uses.
`circolog` has its own client: `circolog-tail`. It is intended to resemble `tail -f` for the most basic
options; however, it will include filtering options that are common when you want to read logs, because that's
very easy when you have structured logs available.
However, one design point of circolog is to be usable without having a specific client: so the logs are
offered on both HTTP and websocket. This means that you can use `curl` if you want:
curl --unix-socket /run/circolog-query.sock localhost/
will give you everything that circologd has in memory
will give you everything that circologd has in memory.
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
any websocket client supporting UNIX domain socket, so you have two options:
1. wait until I write a proper `circolog-tail` client implementing it all
1. Use `circolog-tail`
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.
### HTTP URLs and parameters
When using HTTP, logs are served on `/`. Valid parameters are:
* `l`. This is the amount of lines to send. This is essentially the same as the `-n` parameter on tail
Using `l=-1` (the default) means "give me every log message that you have
* `fmt`. This selects the output format. When `fmt=json` is used, each message is returned as JSON structured
data. The format of those JSON messages is still unstable. `fmt=syslog`, the default, outputs messages using "syslog style" (RFC XXXXXX)
To use websocket, request path `/ws`. The same parameters of `/` are recognized.
## Control daemon
Circologd can be controlled, on some aspects, at run-time. It has 2 mechanisms for that: the easiest, and more
limited, is sending a signal with kill; the second, and more powerful, is a control socket, where you can give
commands to it. This control socket is just HTTP, so again `curl` is your friend. In the future a
`circolog-ctl` client will be developed.
### Pause
When circologd is paused, every new message it receives is immediately discarded. No exception. The backlog
is, however, preserved. This means that you can trigger the event that you want to investigate, pause
circolog, then analyze the logs.
Pausing might be the easiest way to make circologd only run "when needed".
When circologd resumes, no previous message is lost.
To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again.
To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket.
### Clear
When you clear the circologd's buffer, it will discard every message it has, but will keep collecting new
messages.
You can do that with `POST /logs/clear`

View file

@ -11,7 +11,10 @@ import (
"strconv"
"time"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson"
)
func main() {
@ -28,6 +31,7 @@ func main() {
Path: "/ws",
}
q := u.Query()
q.Set("fmt", "bson")
if *backlogLimit >= 0 {
q.Set("l", strconv.Itoa(*backlogLimit))
}
@ -58,12 +62,20 @@ func main() {
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
_, serialized, err := c.ReadMessage()
if err != nil {
log.Println("close:", err)
return
}
fmt.Println(string(message))
var parsed format.LogParts
if err := bson.Unmarshal(serialized, &parsed); err != nil {
log.Println("invalid YAML", err)
continue
}
if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {
log.Println("error printing", err)
}
fmt.Println()
}
}()

38
cmd/circologd/http_ctl.go Normal file
View file

@ -0,0 +1,38 @@
package main
import (
"encoding/json"
"net/http"
"git.lattuga.net/boyska/circolog"
"github.com/gorilla/mux"
)
func setupHTTPCtl(hub circolog.Hub) *mux.Router {
mux := mux.NewRouter()
mux.HandleFunc("/pause/toggle", togglePause(hub)).Methods("POST")
mux.HandleFunc("/logs/clear", clearQueue(hub)).Methods("POST")
return mux
}
func togglePause(hub circolog.Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
resp := <-hub.Responses
active := resp.Value.(bool)
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"paused": !active})
}
}
func clearQueue(hub circolog.Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
resp := <-hub.Responses
success := resp.Value.(bool)
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"success": success})
}
}

View file

@ -9,13 +9,16 @@ import (
"time"
"git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
func setupHTTP(hub circolog.Hub) {
http.HandleFunc("/", getHTTPHandler(hub))
http.HandleFunc("/ws", getWSHandler(hub))
func setupHTTP(hub circolog.Hub) *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/", getHTTPHandler(hub))
mux.HandleFunc("/ws", getWSHandler(hub))
return mux
}
func parseParameterL(r *http.Request) (int, error) {
@ -54,7 +57,7 @@ func parseParameters(r *http.Request) (circolog.ClientOptions, error) {
}
type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd)
Format renderFormat
Format formatter.Format
}
func parseRenderParameters(r *http.Request) (renderOptions, error) {
@ -68,11 +71,10 @@ func parseRenderParameters(r *http.Request) (renderOptions, error) {
if len(val) != 1 {
return opts, errors.New("Format repeated multiple times")
}
format, err := parseRenderFormat(val[0])
err := opts.Format.Set(val[0])
if err != nil {
return opts, err
}
opts.Format = format
}
return opts, nil
}
@ -104,9 +106,10 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
hub.Register <- client
for x := range client.Messages {
writeFormatted(w, render_opts.Format, x)
if render_opts.Format != formatJSON { // bleah
w.Write([]byte("\n"))
if err := render_opts.Format.WriteFormatted(w, x); err == nil {
if render_opts.Format != formatter.FormatJSON { // bleah
w.Write([]byte("\n"))
}
}
}
}
@ -164,7 +167,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
if err != nil {
return
}
writeFormatted(w, render_opts.Format, message)
render_opts.Format.WriteFormatted(w, message)
if err := w.Close(); err != nil {
return

View file

@ -16,7 +16,7 @@ import (
func cleanSocket(socket string) {
if err := os.Remove(socket); err != nil {
fmt.Fprintln(os.Stderr, socket, ":", err)
fmt.Fprintln(os.Stderr, "Error cleaning", socket, ":", err)
}
}
@ -28,13 +28,15 @@ func main() {
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable")
flag.Parse()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGTERM)
hub := circolog.NewHub(*bufsize)
handler := syslog.NewChannelHandler(hub.LogMessages)
go hub.Run()
server := syslog.NewServer()
server.SetFormat(syslog.RFC5424)
@ -57,10 +59,8 @@ func main() {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
go hub.Run()
setupHTTP(hub)
httpServer := http.Server{Handler: nil}
httpQueryServer := http.Server{Handler: setupHTTP(hub)}
if *querySocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
unixListener, err := net.Listen("unix", *querySocket)
@ -70,31 +70,62 @@ func main() {
}
defer cleanSocket(*querySocket)
go func() {
if err := httpServer.Serve(unixListener); err != nil {
fmt.Fprintln(os.Stderr, "error binding:", err)
if err := httpQueryServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding", *querySocket, ":", err)
}
}()
} else {
httpServer.Addr = *queryAddr
httpQueryServer.Addr = *queryAddr
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
go func() {
err := httpServer.ListenAndServe()
if err != nil {
err := httpQueryServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding", *queryAddr, ":", err)
}
}()
}
httpCtlServer := http.Server{Handler: setupHTTPCtl(hub)}
if *ctlSocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *ctlSocket)
unixListener, err := net.Listen("unix", *ctlSocket)
if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
return
}
defer cleanSocket(*ctlSocket)
go func() {
if err := httpCtlServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding:", err)
}
}()
}
// TODO: now we are ready
for {
select {
case sig := <-interrupt:
log.Println("Quitting because of signal", sig)
server.Kill()
if err := httpServer.Close(); err != nil {
fmt.Fprintln(os.Stderr, "Error closing http server:", err)
if sig == syscall.SIGUSR1 {
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
resp := <-hub.Responses
if resp.Value.(bool) {
log.Println("resumed")
} else {
log.Println("paused")
}
}
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
log.Println("Quitting because of signal", sig)
server.Kill()
if err := httpQueryServer.Shutdown(nil); err != nil {
fmt.Fprintln(os.Stderr, "Error closing http server:", err)
}
if err := httpCtlServer.Shutdown(nil); err != nil {
fmt.Fprintln(os.Stderr, "Error closing control server:", err)
}
return
}
return
default:
}
}
}

View file

@ -1,4 +1,4 @@
package main
package formatter
import (
"encoding/json"
@ -8,6 +8,7 @@ import (
"time"
"gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson"
)
// Formatter is an interface, so that multiple implementations can exist
@ -33,32 +34,67 @@ func init() {
))
}
type renderFormat int
type Format int
func parseRenderFormat(s string) (renderFormat, error) {
func (rf *Format) Set(v string) error {
newvalue, err := parseFormat(v)
if err != nil {
return err
}
*rf = newvalue
return nil
}
func (rf Format) String() string {
switch rf {
case FormatJSON:
return "json"
case FormatSyslog:
return "syslog"
case FormatBSON:
return "bson"
}
return ""
}
func (rf Format) WriteFormatted(w io.Writer, msg format.LogParts) error {
return WriteFormatted(w, rf, msg)
}
func parseFormat(s string) (Format, error) {
switch s {
case "json":
return formatJSON, nil
return FormatJSON, nil
case "syslog":
return formatSyslog, nil
return FormatSyslog, nil
case "bson":
return FormatBSON, nil
default:
return 0, fmt.Errorf("Undefined format `%s`", s)
}
}
const (
formatSyslog = iota // 0
formatJSON = iota
FormatSyslog = iota // 0
FormatJSON = iota
FormatBSON = iota
)
func writeFormatted(w io.Writer, f renderFormat, msg format.LogParts) error {
func WriteFormatted(w io.Writer, f Format, msg format.LogParts) error {
switch f {
case formatSyslog:
case FormatSyslog:
return syslogTmpl.Execute(w, msg)
case formatJSON:
case FormatJSON:
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(msg)
case FormatBSON:
enc, err := bson.Marshal(msg)
if err != nil {
return err
}
_, err = w.Write(enc)
return err
}
return nil
}

58
hub.go
View file

@ -24,10 +24,29 @@ type ClientOptions struct {
// The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given
// An HubCommand is an "enum" of different commands
type HubCommand int
const (
CommandClear = iota
CommandPauseToggle = iota
)
// An HubFullCommand is a Command, complete with arguments
type HubFullCommand struct {
Command HubCommand
}
type CommandResponse struct {
Value interface{}
}
type Hub struct {
Register chan Client
Unregister chan Client
LogMessages chan format.LogParts
Commands chan HubFullCommand
Responses chan CommandResponse
clients map[Client]bool
circbuf *ring.Ring
@ -39,6 +58,8 @@ func NewHub(ringBufSize int) Hub {
Register: make(chan Client),
Unregister: make(chan Client),
LogMessages: make(chan format.LogParts),
Commands: make(chan HubFullCommand),
Responses: make(chan CommandResponse),
circbuf: ring.New(ringBufSize),
}
}
@ -77,6 +98,7 @@ func (h *Hub) register(cl Client) {
// Run is hub main loop; keeps everything going
func (h *Hub) Run() {
active := true
for {
select {
case cl := <-h.Register:
@ -88,16 +110,36 @@ func (h *Hub) Run() {
delete(h.clients, cl)
}
case msg := <-h.LogMessages:
h.circbuf.Value = msg
h.circbuf = h.circbuf.Next()
for client := range h.clients {
select { // send without blocking
case client.Messages <- msg:
break
default:
break
if active == true {
h.circbuf.Value = msg
h.circbuf = h.circbuf.Next()
for client := range h.clients {
select { // send without blocking
case client.Messages <- msg:
break
default:
break
}
}
}
case cmd := <-h.Commands:
if cmd.Command == CommandClear {
h.clear()
h.Responses <- CommandResponse{Value: true}
}
if cmd.Command == CommandPauseToggle {
active = !active
h.Responses <- CommandResponse{Value: active}
}
}
}
}
// Clear removes every all elements from the buffer
func (h *Hub) clear() {
buf := h.circbuf
for i := 0; i < buf.Len(); i++ {
buf.Value = nil
buf = buf.Next()
}
}