1
0
Fork 0
forked from boyska/circolog

Compare commits

...

20 commits

Author SHA1 Message Date
9ef425d827
[ctl] pause subcommand partially implemented 2018-12-21 18:09:32 +01:00
19045d9b25 Merge branch 'sig-coolness' 2018-12-20 09:48:08 +01:00
0747959f8a
First draft of control command 2018-12-20 09:45:05 +01:00
dfe1e60146
Typo in comment 2018-12-19 17:30:04 +01:00
48d9d2df8c Merge remote-tracking branch 'origin/sig-coolness' 2018-12-11 10:52:40 +01:00
d61ab0638f
sigusr2 triggering buffer cleanup also via ctrlsock 2018-12-05 09:18:52 +01:00
b11c2edfc0 [tail] format locally
JSON is actually unfit to send structured log messages, because
date/time is not well supported. So we are using BSON, which supports
those. BSON is, among the dozen serializers available, really a random
choice and might be changed in the future.
2018-11-11 20:54:10 +01:00
2bf83b6c33 refactor syslog formatting
the goal is making circolog-tail apply formatting "locally", receiving
structured messages instead
2018-11-11 20:42:26 +01:00
b2127fd349 update readme to recent enhancements 2018-11-11 20:06:10 +01:00
eb66cb4307 clear logs on contrl socket 2018-11-11 19:58:49 +01:00
1b08df0ce0 add control socket (HTTP server)
also there is some refactoring on circologd: connection handling,
closing, etc. Not as much as needed, though: shutdown is still unclean,
and websocket clean shutdown is not tested
2018-11-11 19:51:21 +01:00
515e910683 refactor httpd 2018-11-11 19:19:21 +01:00
7c17c971a0 command responses 2018-11-11 19:10:53 +01:00
3f216f12f8 USR1 on circologd pause/resume 2018-11-10 18:22:23 +01:00
a990422953 HUP on circologd clears the buffer 2018-11-10 18:22:19 +01:00
7704c5ac70 tiny reformatting 2018-11-10 17:50:51 +01:00
86bdeed4a2 more signals handled 2018-11-10 17:43:48 +01:00
c70e28ff27 clean HTTP sockets on shutdown 2018-11-10 17:36:47 +01:00
917e457af0 some more check on http sockets 2018-11-10 17:28:02 +01:00
647701822c fmt=json on websocket 2018-11-10 17:20:17 +01:00
8 changed files with 493 additions and 49 deletions

View file

@ -1,9 +1,9 @@
A syslog daemon implementing circular buffer, in-memory storage. A syslog daemon implementing circular buffer, in-memory storage.
This is useful when you want to keep some (heavy detailed) log available, but you don't want to log too many This is useful when you want to keep some (heavily detailed) log available, but you don't want to log too many
things to disk. things to disk. Remember: logging is useful, but can be dangerous to your users' privacy!
On your "main" syslog, send some message to this one! On your "main" syslog, forward (part of the) messages to this one!
## Integration examples ## Integration examples
@ -27,15 +27,59 @@ and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/
## Client ## Client
`curl` might be enough of a client for most uses. `circolog` has its own client: `circolog-tail`. It is intended to resemble `tail -f` for the most basic
options; however, it will include filtering options that are common when you want to read logs, because that's
very easy when you have structured logs available.
However, one design point of circolog is to be usable without having a specific client: so the logs are
offered on both HTTP and websocket. This means that you can use `curl` if you want:
curl --unix-socket /run/circolog-query.sock localhost/ curl --unix-socket /run/circolog-query.sock localhost/
will give you everything that circologd has in memory will give you everything that circologd has in memory.
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
any websocket client supporting UNIX domain socket, so you have two options: any websocket client supporting UNIX domain socket, so you have two options:
1. wait until I write a proper `circolog-tail` client implementing it all 1. Use `circolog-tail`
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that 2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages. port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.
### HTTP URLs and parameters
When using HTTP, logs are served on `/`. Valid parameters are:
* `l`. This is the amount of lines to send. This is essentially the same as the `-n` parameter on tail
Using `l=-1` (the default) means "give me every log message that you have
* `fmt`. This selects the output format. When `fmt=json` is used, each message is returned as JSON structured
data. The format of those JSON messages is still unstable. `fmt=syslog`, the default, outputs messages using "syslog style" (RFC XXXXXX)
To use websocket, request path `/ws`. The same parameters of `/` are recognized.
## Control daemon
Circologd can be controlled, on some aspects, at run-time. It has 2 mechanisms for that: the easiest, and more
limited, is sending a signal with kill; the second, and more powerful, is a control socket, where you can give
commands to it. This control socket is just HTTP, so again `curl` is your friend. In the future a
`circolog-ctl` client will be developed.
### Pause
When circologd is paused, every new message it receives is immediately discarded. No exception. The backlog
is, however, preserved. This means that you can trigger the event that you want to investigate, pause
circolog, then analyze the logs.
Pausing might be the easiest way to make circologd only run "when needed".
When circologd resumes, no previous message is lost.
To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again.
To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket.
### Clear
When you clear the circologd's buffer, it will discard every message it has, but will keep collecting new
messages.
You can do that with `POST /logs/clear`

View file

@ -11,7 +11,10 @@ import (
"strconv" "strconv"
"time" "time"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson"
) )
func main() { func main() {
@ -28,6 +31,7 @@ func main() {
Path: "/ws", Path: "/ws",
} }
q := u.Query() q := u.Query()
q.Set("fmt", "bson")
if *backlogLimit >= 0 { if *backlogLimit >= 0 {
q.Set("l", strconv.Itoa(*backlogLimit)) q.Set("l", strconv.Itoa(*backlogLimit))
} }
@ -58,12 +62,20 @@ func main() {
go func() { go func() {
defer close(done) defer close(done)
for { for {
_, message, err := c.ReadMessage() _, serialized, err := c.ReadMessage()
if err != nil { if err != nil {
log.Println("close:", err) log.Println("close:", err)
return return
} }
fmt.Println(string(message)) var parsed format.LogParts
if err := bson.Unmarshal(serialized, &parsed); err != nil {
log.Println("invalid YAML", err)
continue
}
if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {
log.Println("error printing", err)
}
fmt.Println()
} }
}() }()

161
cmd/circologctl/main.go Normal file
View file

@ -0,0 +1,161 @@
package main
import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"strings"
)
var globalOpts struct {
ctlSock string
verbose bool
}
var ctl http.Client
type commandFunc func([]string) error
var cmdMap map[string]commandFunc
func init() {
cmdMap = map[string]commandFunc{
// TODO: implement set and get of config at runtime
//"set": setCmd,
//"get": getCmd,
"pause": pauseCmd,
"reload": reloadCmd,
"restart": restartCmd,
"help": helpCmd,
}
}
//func setCmd(ctlSock string, args []string) error {}
//func getCmd(ctlSock string, args []string) error {}
func parsePauseOpts(postponeTime string) (string, error) {
var waitTime int64
var err error
L := len(postponeTime)
switch unit := postponeTime[L-1]; string(unit) {
case "s":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
case "m":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60
case "h":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60
case "d":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60 * 24
case "w":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60 * 24 * 7
}
return string(waitTime), nil
}
func pauseCmd(args []string) error {
var postBody string
var err error
flagset := flag.NewFlagSet(args[0], flag.ExitOnError)
postponeTime := flagset.String("postpone", "", "How long to wait before untoggling the state")
flagset.Parse(args[1:])
if *postponeTime != "" {
postBody, err = parsePauseOpts(*postponeTime)
if err != nil {
return err
}
}
resp, err := ctl.Post("http://unix/pause/toggle", "application/octet-stream", strings.NewReader(postBody))
if globalOpts.verbose {
defer resp.Body.Close()
bodyBytes, err2 := ioutil.ReadAll(resp.Body)
if err2 != nil {
return err2
}
fmt.Println(string(bodyBytes))
}
return err
}
func reloadCmd(args []string) error {
return nil
}
func restartCmd(args []string) error {
return nil
}
func helpCmd(args []string) error {
usage(os.Stdout)
os.Exit(0)
return nil
}
func usage(w io.Writer) {
fmt.Fprintf(w, "USAGE: %s [globalOpts] [SUBCOMMAND] [opts]\n", os.Args[0])
fmt.Fprintf(w, "\nSUBCOMMANDS:\n\n")
for command := range cmdMap {
fmt.Fprintf(w, "\t%s\n", command)
}
}
func parseAndRun(args []string) {
cmdName := args[0]
cmdToRun, ok := cmdMap[cmdName]
if !ok {
fmt.Fprintf(os.Stderr, "Unknown subcommand: %s\n", cmdName)
usage(os.Stderr)
os.Exit(2)
}
// from here: https://gist.github.com/teknoraver/5ffacb8757330715bcbcc90e6d46ac74
ctl = http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", globalOpts.ctlSock)
},
},
}
err := cmdToRun(args)
if err != nil {
fmt.Fprintf(os.Stderr, "Error:\n%s\n", err)
os.Exit(1)
}
}
func main() {
flag.StringVar(&globalOpts.ctlSock, "ctl-socket", "/tmp/circologd-ctl.sock",
"Path to a unix domain socket for the control server; leave empty to disable")
flag.BoolVar(&globalOpts.verbose, "verbose", false, "Print more output")
flag.Parse()
args := flag.Args()
if len(args) == 0 {
usage(os.Stderr)
os.Exit(-1)
}
parseAndRun(args)
}

82
cmd/circologd/http_ctl.go Normal file
View file

@ -0,0 +1,82 @@
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
fractal "git.lattuga.net/blallo/gotools/formatting"
"git.lattuga.net/boyska/circolog"
"github.com/gorilla/mux"
)
func setupHTTPCtl(hub circolog.Hub, verbose bool) *mux.Router {
m := mux.NewRouter()
m.HandleFunc("/pause/toggle", togglePause(hub, verbose)).Methods("POST")
m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
m.HandleFunc("/echo", echo(verbose)).Methods("GET")
return m
}
func togglePause(hub circolog.Hub, verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - toggled pause", r.Method, r.RemoteAddr)
}
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
resp := <-hub.Responses
active := resp.Value.(bool)
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"paused": !active})
}
}
func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - cleared queue", r.Method, r.RemoteAddr)
}
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
resp := <-hub.Responses
success := resp.Value.(bool)
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"success": success})
}
}
func printHelp(verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - asked for help", r.Method, r.RemoteAddr)
}
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
var pathsWithDocs = map[string]string{
"/pause/toggle": "Toggle the server from pause state (not listening)",
"/logs/clear": "Wipe the buffer from all the messages",
"/help": "This help",
"/echo": "Answers to heartbeat",
}
fractal.EncodeJSON(pathsWithDocs, enc)
if verbose {
errEnc := json.NewEncoder(os.Stderr)
fractal.EncodeJSON(pathsWithDocs, errEnc)
}
}
}
func echo(verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Echo")
if verbose {
log.Printf("[%s] %s - asked for echo", r.Method, r.RemoteAddr)
}
w.Header().Set("content-type", "text/plain")
fmt.Fprintln(w, "I am on!")
}
}

View file

@ -9,13 +9,16 @@ import (
"time" "time"
"git.lattuga.net/boyska/circolog" "git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
) )
func setupHTTP(hub circolog.Hub) { func setupHTTP(hub circolog.Hub) *http.ServeMux {
http.HandleFunc("/", getHTTPHandler(hub)) mux := http.NewServeMux()
http.HandleFunc("/ws", getWSHandler(hub)) mux.HandleFunc("/", getHTTPHandler(hub))
mux.HandleFunc("/ws", getWSHandler(hub))
return mux
} }
func parseParameterL(r *http.Request) (int, error) { func parseParameterL(r *http.Request) (int, error) {
@ -54,7 +57,7 @@ func parseParameters(r *http.Request) (circolog.ClientOptions, error) {
} }
type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd) type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd)
Format renderFormat Format formatter.Format
} }
func parseRenderParameters(r *http.Request) (renderOptions, error) { func parseRenderParameters(r *http.Request) (renderOptions, error) {
@ -68,11 +71,10 @@ func parseRenderParameters(r *http.Request) (renderOptions, error) {
if len(val) != 1 { if len(val) != 1 {
return opts, errors.New("Format repeated multiple times") return opts, errors.New("Format repeated multiple times")
} }
format, err := parseRenderFormat(val[0]) err := opts.Format.Set(val[0])
if err != nil { if err != nil {
return opts, err return opts, err
} }
opts.Format = format
} }
return opts, nil return opts, nil
} }
@ -104,12 +106,13 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
hub.Register <- client hub.Register <- client
for x := range client.Messages { for x := range client.Messages {
writeFormatted(w, render_opts.Format, x) if err := render_opts.Format.WriteFormatted(w, x); err == nil {
if render_opts.Format != formatJSON { // bleah if render_opts.Format != formatter.FormatJSON { // bleah
w.Write([]byte("\n")) w.Write([]byte("\n"))
} }
} }
} }
}
} }
func getWSHandler(hub circolog.Hub) http.HandlerFunc { func getWSHandler(hub circolog.Hub) http.HandlerFunc {
@ -118,10 +121,18 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
WriteBufferSize: 1024, WriteBufferSize: 1024,
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
render_opts, err := parseRenderParameters(r)
if err != nil {
log.Println("Error parsing:", err)
w.WriteHeader(400)
fmt.Fprintln(w, err)
return
}
opts, err := parseParameters(r) opts, err := parseParameters(r)
if err != nil { if err != nil {
log.Println("Error on request parameter \"l\":", err) log.Println("Error on request parameter \"l\":", err)
w.WriteHeader(400) w.WriteHeader(400)
fmt.Fprintln(w, err)
return return
} }
opts.Nofollow = false opts.Nofollow = false
@ -156,7 +167,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
if err != nil { if err != nil {
return return
} }
writeFormatted(w, formatSyslog, message) render_opts.Format.WriteFormatted(w, message)
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
return return

View file

@ -3,10 +3,12 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"log"
"net" "net"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"syscall"
"git.lattuga.net/boyska/circolog" "git.lattuga.net/boyska/circolog"
syslog "gopkg.in/mcuadros/go-syslog.v2" syslog "gopkg.in/mcuadros/go-syslog.v2"
@ -14,7 +16,7 @@ import (
func cleanSocket(socket string) { func cleanSocket(socket string) {
if err := os.Remove(socket); err != nil { if err := os.Remove(socket); err != nil {
fmt.Fprintln(os.Stderr, socket, ":", err) fmt.Fprintln(os.Stderr, "Error cleaning", socket, ":", err)
} }
} }
@ -26,13 +28,16 @@ func main() {
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages") syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service") queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!") querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable")
verbose := flag.Bool("verbose", false, "Print more output executing the daemon")
flag.Parse() flag.Parse()
interrupt := make(chan os.Signal, 1) interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt) signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGTERM)
hub := circolog.NewHub(*bufsize) hub := circolog.NewHub(*bufsize)
handler := syslog.NewChannelHandler(hub.LogMessages) handler := syslog.NewChannelHandler(hub.LogMessages)
go hub.Run()
server := syslog.NewServer() server := syslog.NewServer()
server.SetFormat(syslog.RFC5424) server.SetFormat(syslog.RFC5424)
@ -55,31 +60,82 @@ func main() {
fmt.Fprintln(os.Stderr, "argh", err) fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1) os.Exit(1)
} }
go hub.Run()
setupHTTP(hub) httpQueryServer := http.Server{Handler: setupHTTP(hub)}
if *querySocket != "" { if *querySocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *querySocket) fmt.Printf("Binding address `%s` [http]\n", *querySocket)
unixListener, err := net.Listen("unix", *querySocket) unixListener, err := net.Listen("unix", *querySocket)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err) fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
return
} }
go http.Serve(unixListener, nil) defer cleanSocket(*querySocket)
go func() {
if err := httpQueryServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding", *querySocket, ":", err)
}
}()
} else { } else {
httpQueryServer.Addr = *queryAddr
fmt.Printf("Binding address `%s` [http]\n", *queryAddr) fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
go http.ListenAndServe(*queryAddr, nil) go func() {
err := httpQueryServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding", *queryAddr, ":", err)
} }
}()
}
httpCtlServer := http.Server{Handler: setupHTTPCtl(hub, *verbose)}
if *ctlSocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *ctlSocket)
unixListener, err := net.Listen("unix", *ctlSocket)
if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
return
}
defer cleanSocket(*ctlSocket)
go func() {
if err := httpCtlServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding:", err)
}
}()
}
// TODO: now we are ready
for { for {
select { select {
case <-interrupt: case sig := <-interrupt:
if sig == syscall.SIGUSR1 {
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
resp := <-hub.Responses
if resp.Value.(bool) {
log.Println("resumed")
} else {
log.Println("paused")
}
}
if sig == syscall.SIGUSR2 {
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
resp := <-hub.Responses
if resp.Value.(bool) {
log.Println("buffer cleaned")
} else {
log.Println("buffer NOT cleaned")
}
}
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
log.Println("Quitting because of signal", sig)
server.Kill() server.Kill()
//server.Wait() if err := httpQueryServer.Shutdown(nil); err != nil {
if *syslogSocketPath != "" { fmt.Fprintln(os.Stderr, "Error closing http server:", err)
}
if err := httpCtlServer.Shutdown(nil); err != nil {
fmt.Fprintln(os.Stderr, "Error closing control server:", err)
} }
return return
default: }
} }
} }
} }

View file

@ -1,4 +1,4 @@
package main package formatter
import ( import (
"encoding/json" "encoding/json"
@ -8,6 +8,7 @@ import (
"time" "time"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson"
) )
// Formatter is an interface, so that multiple implementations can exist // Formatter is an interface, so that multiple implementations can exist
@ -33,32 +34,67 @@ func init() {
)) ))
} }
type renderFormat int type Format int
func parseRenderFormat(s string) (renderFormat, error) { func (rf *Format) Set(v string) error {
newvalue, err := parseFormat(v)
if err != nil {
return err
}
*rf = newvalue
return nil
}
func (rf Format) String() string {
switch rf {
case FormatJSON:
return "json"
case FormatSyslog:
return "syslog"
case FormatBSON:
return "bson"
}
return ""
}
func (rf Format) WriteFormatted(w io.Writer, msg format.LogParts) error {
return WriteFormatted(w, rf, msg)
}
func parseFormat(s string) (Format, error) {
switch s { switch s {
case "json": case "json":
return formatJSON, nil return FormatJSON, nil
case "syslog": case "syslog":
return formatSyslog, nil return FormatSyslog, nil
case "bson":
return FormatBSON, nil
default: default:
return 0, fmt.Errorf("Undefined format `%s`", s) return 0, fmt.Errorf("Undefined format `%s`", s)
} }
} }
const ( const (
formatSyslog = iota // 0 FormatSyslog = iota // 0
formatJSON = iota FormatJSON = iota
FormatBSON = iota
) )
func writeFormatted(w io.Writer, f renderFormat, msg format.LogParts) error { func WriteFormatted(w io.Writer, f Format, msg format.LogParts) error {
switch f { switch f {
case formatSyslog: case FormatSyslog:
return syslogTmpl.Execute(w, msg) return syslogTmpl.Execute(w, msg)
case formatJSON: case FormatJSON:
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
enc.SetIndent("", " ") enc.SetIndent("", " ")
return enc.Encode(msg) return enc.Encode(msg)
case FormatBSON:
enc, err := bson.Marshal(msg)
if err != nil {
return err
}
_, err = w.Write(enc)
return err
} }
return nil return nil
} }

42
hub.go
View file

@ -24,10 +24,29 @@ type ClientOptions struct {
// The channel "register" and "unregister" can be seen as "command" // The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given // has "options", such as Nofollow, to explain the Hub what should be given
// An HubCommand is an "enum" of different commands
type HubCommand int
const (
CommandClear = iota
CommandPauseToggle = iota
)
// An HubFullCommand is a Command, complete with arguments
type HubFullCommand struct {
Command HubCommand
}
type CommandResponse struct {
Value interface{}
}
type Hub struct { type Hub struct {
Register chan Client Register chan Client
Unregister chan Client Unregister chan Client
LogMessages chan format.LogParts LogMessages chan format.LogParts
Commands chan HubFullCommand
Responses chan CommandResponse
clients map[Client]bool clients map[Client]bool
circbuf *ring.Ring circbuf *ring.Ring
@ -39,6 +58,8 @@ func NewHub(ringBufSize int) Hub {
Register: make(chan Client), Register: make(chan Client),
Unregister: make(chan Client), Unregister: make(chan Client),
LogMessages: make(chan format.LogParts), LogMessages: make(chan format.LogParts),
Commands: make(chan HubFullCommand),
Responses: make(chan CommandResponse),
circbuf: ring.New(ringBufSize), circbuf: ring.New(ringBufSize),
} }
} }
@ -77,6 +98,7 @@ func (h *Hub) register(cl Client) {
// Run is hub main loop; keeps everything going // Run is hub main loop; keeps everything going
func (h *Hub) Run() { func (h *Hub) Run() {
active := true
for { for {
select { select {
case cl := <-h.Register: case cl := <-h.Register:
@ -88,6 +110,7 @@ func (h *Hub) Run() {
delete(h.clients, cl) delete(h.clients, cl)
} }
case msg := <-h.LogMessages: case msg := <-h.LogMessages:
if active == true {
h.circbuf.Value = msg h.circbuf.Value = msg
h.circbuf = h.circbuf.Next() h.circbuf = h.circbuf.Next()
for client := range h.clients { for client := range h.clients {
@ -99,5 +122,24 @@ func (h *Hub) Run() {
} }
} }
} }
case cmd := <-h.Commands:
if cmd.Command == CommandClear {
h.clear()
h.Responses <- CommandResponse{Value: true}
}
if cmd.Command == CommandPauseToggle {
active = !active
h.Responses <- CommandResponse{Value: active}
}
}
}
}
// Clear removes all elements from the buffer
func (h *Hub) clear() {
buf := h.circbuf
for i := 0; i < buf.Len(); i++ {
buf.Value = nil
buf = buf.Next()
} }
} }