forked from boyska/circolog
Compare commits
20 commits
f66e07e873
...
9ef425d827
Author | SHA1 | Date | |
---|---|---|---|
9ef425d827 | |||
19045d9b25 | |||
0747959f8a | |||
dfe1e60146 | |||
48d9d2df8c | |||
d61ab0638f | |||
b11c2edfc0 | |||
2bf83b6c33 | |||
b2127fd349 | |||
eb66cb4307 | |||
1b08df0ce0 | |||
515e910683 | |||
7c17c971a0 | |||
3f216f12f8 | |||
a990422953 | |||
7704c5ac70 | |||
86bdeed4a2 | |||
c70e28ff27 | |||
917e457af0 | |||
647701822c |
8 changed files with 493 additions and 49 deletions
56
README.md
56
README.md
|
@ -1,9 +1,9 @@
|
||||||
A syslog daemon implementing circular buffer, in-memory storage.
|
A syslog daemon implementing circular buffer, in-memory storage.
|
||||||
|
|
||||||
This is useful when you want to keep some (heavy detailed) log available, but you don't want to log too many
|
This is useful when you want to keep some (heavily detailed) log available, but you don't want to log too many
|
||||||
things to disk.
|
things to disk. Remember: logging is useful, but can be dangerous to your users' privacy!
|
||||||
|
|
||||||
On your "main" syslog, send some message to this one!
|
On your "main" syslog, forward (part of the) messages to this one!
|
||||||
|
|
||||||
## Integration examples
|
## Integration examples
|
||||||
|
|
||||||
|
@ -27,15 +27,59 @@ and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/
|
||||||
|
|
||||||
## Client
|
## Client
|
||||||
|
|
||||||
`curl` might be enough of a client for most uses.
|
`circolog` has its own client: `circolog-tail`. It is intended to resemble `tail -f` for the most basic
|
||||||
|
options; however, it will include filtering options that are common when you want to read logs, because that's
|
||||||
|
very easy when you have structured logs available.
|
||||||
|
|
||||||
|
However, one design point of circolog is to be usable without having a specific client: so the logs are
|
||||||
|
offered on both HTTP and websocket. This means that you can use `curl` if you want:
|
||||||
|
|
||||||
curl --unix-socket /run/circolog-query.sock localhost/
|
curl --unix-socket /run/circolog-query.sock localhost/
|
||||||
|
|
||||||
will give you everything that circologd has in memory
|
will give you everything that circologd has in memory.
|
||||||
|
|
||||||
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
|
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
|
||||||
any websocket client supporting UNIX domain socket, so you have two options:
|
any websocket client supporting UNIX domain socket, so you have two options:
|
||||||
|
|
||||||
1. wait until I write a proper `circolog-tail` client implementing it all
|
1. Use `circolog-tail`
|
||||||
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
|
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
|
||||||
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.
|
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.
|
||||||
|
|
||||||
|
### HTTP URLs and parameters
|
||||||
|
|
||||||
|
When using HTTP, logs are served on `/`. Valid parameters are:
|
||||||
|
|
||||||
|
* `l`. This is the amount of lines to send. This is essentially the same as the `-n` parameter on tail
|
||||||
|
Using `l=-1` (the default) means "give me every log message that you have
|
||||||
|
* `fmt`. This selects the output format. When `fmt=json` is used, each message is returned as JSON structured
|
||||||
|
data. The format of those JSON messages is still unstable. `fmt=syslog`, the default, outputs messages using "syslog style" (RFC XXXXXX)
|
||||||
|
|
||||||
|
To use websocket, request path `/ws`. The same parameters of `/` are recognized.
|
||||||
|
|
||||||
|
## Control daemon
|
||||||
|
|
||||||
|
Circologd can be controlled, on some aspects, at run-time. It has 2 mechanisms for that: the easiest, and more
|
||||||
|
limited, is sending a signal with kill; the second, and more powerful, is a control socket, where you can give
|
||||||
|
commands to it. This control socket is just HTTP, so again `curl` is your friend. In the future a
|
||||||
|
`circolog-ctl` client will be developed.
|
||||||
|
|
||||||
|
### Pause
|
||||||
|
|
||||||
|
When circologd is paused, every new message it receives is immediately discarded. No exception. The backlog
|
||||||
|
is, however, preserved. This means that you can trigger the event that you want to investigate, pause
|
||||||
|
circolog, then analyze the logs.
|
||||||
|
Pausing might be the easiest way to make circologd only run "when needed".
|
||||||
|
|
||||||
|
When circologd resumes, no previous message is lost.
|
||||||
|
|
||||||
|
|
||||||
|
To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again.
|
||||||
|
|
||||||
|
To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket.
|
||||||
|
|
||||||
|
### Clear
|
||||||
|
|
||||||
|
When you clear the circologd's buffer, it will discard every message it has, but will keep collecting new
|
||||||
|
messages.
|
||||||
|
|
||||||
|
You can do that with `POST /logs/clear`
|
||||||
|
|
|
@ -11,7 +11,10 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.lattuga.net/boyska/circolog/formatter"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
|
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||||
|
"gopkg.in/mgo.v2/bson"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -28,6 +31,7 @@ func main() {
|
||||||
Path: "/ws",
|
Path: "/ws",
|
||||||
}
|
}
|
||||||
q := u.Query()
|
q := u.Query()
|
||||||
|
q.Set("fmt", "bson")
|
||||||
if *backlogLimit >= 0 {
|
if *backlogLimit >= 0 {
|
||||||
q.Set("l", strconv.Itoa(*backlogLimit))
|
q.Set("l", strconv.Itoa(*backlogLimit))
|
||||||
}
|
}
|
||||||
|
@ -58,12 +62,20 @@ func main() {
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
for {
|
for {
|
||||||
_, message, err := c.ReadMessage()
|
_, serialized, err := c.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("close:", err)
|
log.Println("close:", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Println(string(message))
|
var parsed format.LogParts
|
||||||
|
if err := bson.Unmarshal(serialized, &parsed); err != nil {
|
||||||
|
log.Println("invalid YAML", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {
|
||||||
|
log.Println("error printing", err)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
161
cmd/circologctl/main.go
Normal file
161
cmd/circologctl/main.go
Normal file
|
@ -0,0 +1,161 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
var globalOpts struct {
|
||||||
|
ctlSock string
|
||||||
|
verbose bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var ctl http.Client
|
||||||
|
|
||||||
|
type commandFunc func([]string) error
|
||||||
|
|
||||||
|
var cmdMap map[string]commandFunc
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdMap = map[string]commandFunc{
|
||||||
|
// TODO: implement set and get of config at runtime
|
||||||
|
//"set": setCmd,
|
||||||
|
//"get": getCmd,
|
||||||
|
"pause": pauseCmd,
|
||||||
|
"reload": reloadCmd,
|
||||||
|
"restart": restartCmd,
|
||||||
|
"help": helpCmd,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//func setCmd(ctlSock string, args []string) error {}
|
||||||
|
|
||||||
|
//func getCmd(ctlSock string, args []string) error {}
|
||||||
|
|
||||||
|
func parsePauseOpts(postponeTime string) (string, error) {
|
||||||
|
var waitTime int64
|
||||||
|
var err error
|
||||||
|
L := len(postponeTime)
|
||||||
|
switch unit := postponeTime[L-1]; string(unit) {
|
||||||
|
case "s":
|
||||||
|
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
case "m":
|
||||||
|
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
waitTime = waitTime * 60
|
||||||
|
case "h":
|
||||||
|
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
waitTime = waitTime * 60 * 60
|
||||||
|
case "d":
|
||||||
|
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
waitTime = waitTime * 60 * 60 * 24
|
||||||
|
case "w":
|
||||||
|
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
waitTime = waitTime * 60 * 60 * 24 * 7
|
||||||
|
}
|
||||||
|
return string(waitTime), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func pauseCmd(args []string) error {
|
||||||
|
var postBody string
|
||||||
|
var err error
|
||||||
|
flagset := flag.NewFlagSet(args[0], flag.ExitOnError)
|
||||||
|
postponeTime := flagset.String("postpone", "", "How long to wait before untoggling the state")
|
||||||
|
flagset.Parse(args[1:])
|
||||||
|
if *postponeTime != "" {
|
||||||
|
postBody, err = parsePauseOpts(*postponeTime)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp, err := ctl.Post("http://unix/pause/toggle", "application/octet-stream", strings.NewReader(postBody))
|
||||||
|
if globalOpts.verbose {
|
||||||
|
defer resp.Body.Close()
|
||||||
|
bodyBytes, err2 := ioutil.ReadAll(resp.Body)
|
||||||
|
if err2 != nil {
|
||||||
|
return err2
|
||||||
|
}
|
||||||
|
fmt.Println(string(bodyBytes))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func reloadCmd(args []string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func restartCmd(args []string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func helpCmd(args []string) error {
|
||||||
|
usage(os.Stdout)
|
||||||
|
os.Exit(0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func usage(w io.Writer) {
|
||||||
|
fmt.Fprintf(w, "USAGE: %s [globalOpts] [SUBCOMMAND] [opts]\n", os.Args[0])
|
||||||
|
fmt.Fprintf(w, "\nSUBCOMMANDS:\n\n")
|
||||||
|
for command := range cmdMap {
|
||||||
|
fmt.Fprintf(w, "\t%s\n", command)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseAndRun(args []string) {
|
||||||
|
cmdName := args[0]
|
||||||
|
cmdToRun, ok := cmdMap[cmdName]
|
||||||
|
if !ok {
|
||||||
|
fmt.Fprintf(os.Stderr, "Unknown subcommand: %s\n", cmdName)
|
||||||
|
usage(os.Stderr)
|
||||||
|
os.Exit(2)
|
||||||
|
}
|
||||||
|
// from here: https://gist.github.com/teknoraver/5ffacb8757330715bcbcc90e6d46ac74
|
||||||
|
ctl = http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||||
|
return net.Dial("unix", globalOpts.ctlSock)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := cmdToRun(args)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error:\n%s\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.StringVar(&globalOpts.ctlSock, "ctl-socket", "/tmp/circologd-ctl.sock",
|
||||||
|
"Path to a unix domain socket for the control server; leave empty to disable")
|
||||||
|
flag.BoolVar(&globalOpts.verbose, "verbose", false, "Print more output")
|
||||||
|
flag.Parse()
|
||||||
|
args := flag.Args()
|
||||||
|
if len(args) == 0 {
|
||||||
|
usage(os.Stderr)
|
||||||
|
os.Exit(-1)
|
||||||
|
}
|
||||||
|
parseAndRun(args)
|
||||||
|
}
|
82
cmd/circologd/http_ctl.go
Normal file
82
cmd/circologd/http_ctl.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
fractal "git.lattuga.net/blallo/gotools/formatting"
|
||||||
|
"git.lattuga.net/boyska/circolog"
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setupHTTPCtl(hub circolog.Hub, verbose bool) *mux.Router {
|
||||||
|
m := mux.NewRouter()
|
||||||
|
m.HandleFunc("/pause/toggle", togglePause(hub, verbose)).Methods("POST")
|
||||||
|
m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
|
||||||
|
m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
|
||||||
|
m.HandleFunc("/echo", echo(verbose)).Methods("GET")
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func togglePause(hub circolog.Hub, verbose bool) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if verbose {
|
||||||
|
log.Printf("[%s] %s - toggled pause", r.Method, r.RemoteAddr)
|
||||||
|
}
|
||||||
|
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
|
||||||
|
resp := <-hub.Responses
|
||||||
|
active := resp.Value.(bool)
|
||||||
|
w.Header().Set("content-type", "application/json")
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
|
enc.Encode(map[string]interface{}{"paused": !active})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if verbose {
|
||||||
|
log.Printf("[%s] %s - cleared queue", r.Method, r.RemoteAddr)
|
||||||
|
}
|
||||||
|
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
|
||||||
|
resp := <-hub.Responses
|
||||||
|
success := resp.Value.(bool)
|
||||||
|
w.Header().Set("content-type", "application/json")
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
|
enc.Encode(map[string]interface{}{"success": success})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func printHelp(verbose bool) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if verbose {
|
||||||
|
log.Printf("[%s] %s - asked for help", r.Method, r.RemoteAddr)
|
||||||
|
}
|
||||||
|
w.Header().Set("content-type", "application/json")
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
|
var pathsWithDocs = map[string]string{
|
||||||
|
"/pause/toggle": "Toggle the server from pause state (not listening)",
|
||||||
|
"/logs/clear": "Wipe the buffer from all the messages",
|
||||||
|
"/help": "This help",
|
||||||
|
"/echo": "Answers to heartbeat",
|
||||||
|
}
|
||||||
|
fractal.EncodeJSON(pathsWithDocs, enc)
|
||||||
|
if verbose {
|
||||||
|
errEnc := json.NewEncoder(os.Stderr)
|
||||||
|
fractal.EncodeJSON(pathsWithDocs, errEnc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func echo(verbose bool) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Println("Echo")
|
||||||
|
if verbose {
|
||||||
|
log.Printf("[%s] %s - asked for echo", r.Method, r.RemoteAddr)
|
||||||
|
}
|
||||||
|
w.Header().Set("content-type", "text/plain")
|
||||||
|
fmt.Fprintln(w, "I am on!")
|
||||||
|
}
|
||||||
|
}
|
|
@ -9,13 +9,16 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.lattuga.net/boyska/circolog"
|
"git.lattuga.net/boyska/circolog"
|
||||||
|
"git.lattuga.net/boyska/circolog/formatter"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupHTTP(hub circolog.Hub) {
|
func setupHTTP(hub circolog.Hub) *http.ServeMux {
|
||||||
http.HandleFunc("/", getHTTPHandler(hub))
|
mux := http.NewServeMux()
|
||||||
http.HandleFunc("/ws", getWSHandler(hub))
|
mux.HandleFunc("/", getHTTPHandler(hub))
|
||||||
|
mux.HandleFunc("/ws", getWSHandler(hub))
|
||||||
|
return mux
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseParameterL(r *http.Request) (int, error) {
|
func parseParameterL(r *http.Request) (int, error) {
|
||||||
|
@ -54,7 +57,7 @@ func parseParameters(r *http.Request) (circolog.ClientOptions, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd)
|
type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd)
|
||||||
Format renderFormat
|
Format formatter.Format
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseRenderParameters(r *http.Request) (renderOptions, error) {
|
func parseRenderParameters(r *http.Request) (renderOptions, error) {
|
||||||
|
@ -68,11 +71,10 @@ func parseRenderParameters(r *http.Request) (renderOptions, error) {
|
||||||
if len(val) != 1 {
|
if len(val) != 1 {
|
||||||
return opts, errors.New("Format repeated multiple times")
|
return opts, errors.New("Format repeated multiple times")
|
||||||
}
|
}
|
||||||
format, err := parseRenderFormat(val[0])
|
err := opts.Format.Set(val[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return opts, err
|
return opts, err
|
||||||
}
|
}
|
||||||
opts.Format = format
|
|
||||||
}
|
}
|
||||||
return opts, nil
|
return opts, nil
|
||||||
}
|
}
|
||||||
|
@ -104,9 +106,10 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
|
||||||
hub.Register <- client
|
hub.Register <- client
|
||||||
|
|
||||||
for x := range client.Messages {
|
for x := range client.Messages {
|
||||||
writeFormatted(w, render_opts.Format, x)
|
if err := render_opts.Format.WriteFormatted(w, x); err == nil {
|
||||||
if render_opts.Format != formatJSON { // bleah
|
if render_opts.Format != formatter.FormatJSON { // bleah
|
||||||
w.Write([]byte("\n"))
|
w.Write([]byte("\n"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -118,10 +121,18 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
|
||||||
WriteBufferSize: 1024,
|
WriteBufferSize: 1024,
|
||||||
}
|
}
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
render_opts, err := parseRenderParameters(r)
|
||||||
|
if err != nil {
|
||||||
|
log.Println("Error parsing:", err)
|
||||||
|
w.WriteHeader(400)
|
||||||
|
fmt.Fprintln(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
opts, err := parseParameters(r)
|
opts, err := parseParameters(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Error on request parameter \"l\":", err)
|
log.Println("Error on request parameter \"l\":", err)
|
||||||
w.WriteHeader(400)
|
w.WriteHeader(400)
|
||||||
|
fmt.Fprintln(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
opts.Nofollow = false
|
opts.Nofollow = false
|
||||||
|
@ -156,7 +167,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeFormatted(w, formatSyslog, message)
|
render_opts.Format.WriteFormatted(w, message)
|
||||||
|
|
||||||
if err := w.Close(); err != nil {
|
if err := w.Close(); err != nil {
|
||||||
return
|
return
|
|
@ -3,10 +3,12 @@ package main
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
"git.lattuga.net/boyska/circolog"
|
"git.lattuga.net/boyska/circolog"
|
||||||
syslog "gopkg.in/mcuadros/go-syslog.v2"
|
syslog "gopkg.in/mcuadros/go-syslog.v2"
|
||||||
|
@ -14,7 +16,7 @@ import (
|
||||||
|
|
||||||
func cleanSocket(socket string) {
|
func cleanSocket(socket string) {
|
||||||
if err := os.Remove(socket); err != nil {
|
if err := os.Remove(socket); err != nil {
|
||||||
fmt.Fprintln(os.Stderr, socket, ":", err)
|
fmt.Fprintln(os.Stderr, "Error cleaning", socket, ":", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,13 +28,16 @@ func main() {
|
||||||
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
|
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
|
||||||
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
|
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
|
||||||
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
|
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
|
||||||
|
ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable")
|
||||||
|
verbose := flag.Bool("verbose", false, "Print more output executing the daemon")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
interrupt := make(chan os.Signal, 1)
|
interrupt := make(chan os.Signal, 1)
|
||||||
signal.Notify(interrupt, os.Interrupt)
|
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGTERM)
|
||||||
|
|
||||||
hub := circolog.NewHub(*bufsize)
|
hub := circolog.NewHub(*bufsize)
|
||||||
handler := syslog.NewChannelHandler(hub.LogMessages)
|
handler := syslog.NewChannelHandler(hub.LogMessages)
|
||||||
|
go hub.Run()
|
||||||
|
|
||||||
server := syslog.NewServer()
|
server := syslog.NewServer()
|
||||||
server.SetFormat(syslog.RFC5424)
|
server.SetFormat(syslog.RFC5424)
|
||||||
|
@ -55,31 +60,82 @@ func main() {
|
||||||
fmt.Fprintln(os.Stderr, "argh", err)
|
fmt.Fprintln(os.Stderr, "argh", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
go hub.Run()
|
|
||||||
|
|
||||||
setupHTTP(hub)
|
httpQueryServer := http.Server{Handler: setupHTTP(hub)}
|
||||||
if *querySocket != "" {
|
if *querySocket != "" {
|
||||||
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
|
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
|
||||||
unixListener, err := net.Listen("unix", *querySocket)
|
unixListener, err := net.Listen("unix", *querySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
|
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
go http.Serve(unixListener, nil)
|
defer cleanSocket(*querySocket)
|
||||||
|
go func() {
|
||||||
|
if err := httpQueryServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
|
||||||
|
fmt.Fprintln(os.Stderr, "error binding", *querySocket, ":", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
} else {
|
} else {
|
||||||
|
httpQueryServer.Addr = *queryAddr
|
||||||
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
|
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
|
||||||
go http.ListenAndServe(*queryAddr, nil)
|
go func() {
|
||||||
|
err := httpQueryServer.ListenAndServe()
|
||||||
|
if err != nil && err != http.ErrServerClosed {
|
||||||
|
fmt.Fprintln(os.Stderr, "error binding", *queryAddr, ":", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
httpCtlServer := http.Server{Handler: setupHTTPCtl(hub, *verbose)}
|
||||||
|
if *ctlSocket != "" {
|
||||||
|
fmt.Printf("Binding address `%s` [http]\n", *ctlSocket)
|
||||||
|
unixListener, err := net.Listen("unix", *ctlSocket)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer cleanSocket(*ctlSocket)
|
||||||
|
go func() {
|
||||||
|
if err := httpCtlServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
|
||||||
|
fmt.Fprintln(os.Stderr, "error binding:", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: now we are ready
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-interrupt:
|
case sig := <-interrupt:
|
||||||
server.Kill()
|
if sig == syscall.SIGUSR1 {
|
||||||
//server.Wait()
|
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
|
||||||
if *syslogSocketPath != "" {
|
resp := <-hub.Responses
|
||||||
|
if resp.Value.(bool) {
|
||||||
|
log.Println("resumed")
|
||||||
|
} else {
|
||||||
|
log.Println("paused")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sig == syscall.SIGUSR2 {
|
||||||
|
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
|
||||||
|
resp := <-hub.Responses
|
||||||
|
if resp.Value.(bool) {
|
||||||
|
log.Println("buffer cleaned")
|
||||||
|
} else {
|
||||||
|
log.Println("buffer NOT cleaned")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
|
||||||
|
log.Println("Quitting because of signal", sig)
|
||||||
|
server.Kill()
|
||||||
|
if err := httpQueryServer.Shutdown(nil); err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, "Error closing http server:", err)
|
||||||
|
}
|
||||||
|
if err := httpCtlServer.Shutdown(nil); err != nil {
|
||||||
|
fmt.Fprintln(os.Stderr, "Error closing control server:", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package formatter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
@ -8,6 +8,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||||
|
"gopkg.in/mgo.v2/bson"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Formatter is an interface, so that multiple implementations can exist
|
// Formatter is an interface, so that multiple implementations can exist
|
||||||
|
@ -33,32 +34,67 @@ func init() {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
type renderFormat int
|
type Format int
|
||||||
|
|
||||||
func parseRenderFormat(s string) (renderFormat, error) {
|
func (rf *Format) Set(v string) error {
|
||||||
|
newvalue, err := parseFormat(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*rf = newvalue
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rf Format) String() string {
|
||||||
|
switch rf {
|
||||||
|
case FormatJSON:
|
||||||
|
return "json"
|
||||||
|
case FormatSyslog:
|
||||||
|
return "syslog"
|
||||||
|
case FormatBSON:
|
||||||
|
return "bson"
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rf Format) WriteFormatted(w io.Writer, msg format.LogParts) error {
|
||||||
|
return WriteFormatted(w, rf, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseFormat(s string) (Format, error) {
|
||||||
switch s {
|
switch s {
|
||||||
case "json":
|
case "json":
|
||||||
return formatJSON, nil
|
return FormatJSON, nil
|
||||||
case "syslog":
|
case "syslog":
|
||||||
return formatSyslog, nil
|
return FormatSyslog, nil
|
||||||
|
case "bson":
|
||||||
|
return FormatBSON, nil
|
||||||
default:
|
default:
|
||||||
return 0, fmt.Errorf("Undefined format `%s`", s)
|
return 0, fmt.Errorf("Undefined format `%s`", s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
formatSyslog = iota // 0
|
FormatSyslog = iota // 0
|
||||||
formatJSON = iota
|
FormatJSON = iota
|
||||||
|
FormatBSON = iota
|
||||||
)
|
)
|
||||||
|
|
||||||
func writeFormatted(w io.Writer, f renderFormat, msg format.LogParts) error {
|
func WriteFormatted(w io.Writer, f Format, msg format.LogParts) error {
|
||||||
switch f {
|
switch f {
|
||||||
case formatSyslog:
|
case FormatSyslog:
|
||||||
return syslogTmpl.Execute(w, msg)
|
return syslogTmpl.Execute(w, msg)
|
||||||
case formatJSON:
|
case FormatJSON:
|
||||||
enc := json.NewEncoder(w)
|
enc := json.NewEncoder(w)
|
||||||
enc.SetIndent("", " ")
|
enc.SetIndent("", " ")
|
||||||
return enc.Encode(msg)
|
return enc.Encode(msg)
|
||||||
|
case FormatBSON:
|
||||||
|
enc, err := bson.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = w.Write(enc)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
58
hub.go
58
hub.go
|
@ -24,10 +24,29 @@ type ClientOptions struct {
|
||||||
// The channel "register" and "unregister" can be seen as "command"
|
// The channel "register" and "unregister" can be seen as "command"
|
||||||
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
|
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
|
||||||
// has "options", such as Nofollow, to explain the Hub what should be given
|
// has "options", such as Nofollow, to explain the Hub what should be given
|
||||||
|
|
||||||
|
// An HubCommand is an "enum" of different commands
|
||||||
|
type HubCommand int
|
||||||
|
|
||||||
|
const (
|
||||||
|
CommandClear = iota
|
||||||
|
CommandPauseToggle = iota
|
||||||
|
)
|
||||||
|
|
||||||
|
// An HubFullCommand is a Command, complete with arguments
|
||||||
|
type HubFullCommand struct {
|
||||||
|
Command HubCommand
|
||||||
|
}
|
||||||
|
type CommandResponse struct {
|
||||||
|
Value interface{}
|
||||||
|
}
|
||||||
|
|
||||||
type Hub struct {
|
type Hub struct {
|
||||||
Register chan Client
|
Register chan Client
|
||||||
Unregister chan Client
|
Unregister chan Client
|
||||||
LogMessages chan format.LogParts
|
LogMessages chan format.LogParts
|
||||||
|
Commands chan HubFullCommand
|
||||||
|
Responses chan CommandResponse
|
||||||
|
|
||||||
clients map[Client]bool
|
clients map[Client]bool
|
||||||
circbuf *ring.Ring
|
circbuf *ring.Ring
|
||||||
|
@ -39,6 +58,8 @@ func NewHub(ringBufSize int) Hub {
|
||||||
Register: make(chan Client),
|
Register: make(chan Client),
|
||||||
Unregister: make(chan Client),
|
Unregister: make(chan Client),
|
||||||
LogMessages: make(chan format.LogParts),
|
LogMessages: make(chan format.LogParts),
|
||||||
|
Commands: make(chan HubFullCommand),
|
||||||
|
Responses: make(chan CommandResponse),
|
||||||
circbuf: ring.New(ringBufSize),
|
circbuf: ring.New(ringBufSize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,6 +98,7 @@ func (h *Hub) register(cl Client) {
|
||||||
|
|
||||||
// Run is hub main loop; keeps everything going
|
// Run is hub main loop; keeps everything going
|
||||||
func (h *Hub) Run() {
|
func (h *Hub) Run() {
|
||||||
|
active := true
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case cl := <-h.Register:
|
case cl := <-h.Register:
|
||||||
|
@ -88,16 +110,36 @@ func (h *Hub) Run() {
|
||||||
delete(h.clients, cl)
|
delete(h.clients, cl)
|
||||||
}
|
}
|
||||||
case msg := <-h.LogMessages:
|
case msg := <-h.LogMessages:
|
||||||
h.circbuf.Value = msg
|
if active == true {
|
||||||
h.circbuf = h.circbuf.Next()
|
h.circbuf.Value = msg
|
||||||
for client := range h.clients {
|
h.circbuf = h.circbuf.Next()
|
||||||
select { // send without blocking
|
for client := range h.clients {
|
||||||
case client.Messages <- msg:
|
select { // send without blocking
|
||||||
break
|
case client.Messages <- msg:
|
||||||
default:
|
break
|
||||||
break
|
default:
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case cmd := <-h.Commands:
|
||||||
|
if cmd.Command == CommandClear {
|
||||||
|
h.clear()
|
||||||
|
h.Responses <- CommandResponse{Value: true}
|
||||||
|
}
|
||||||
|
if cmd.Command == CommandPauseToggle {
|
||||||
|
active = !active
|
||||||
|
h.Responses <- CommandResponse{Value: active}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear removes all elements from the buffer
|
||||||
|
func (h *Hub) clear() {
|
||||||
|
buf := h.circbuf
|
||||||
|
for i := 0; i < buf.Len(); i++ {
|
||||||
|
buf.Value = nil
|
||||||
|
buf = buf.Next()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue