1
0
Fork 0
forked from boyska/circolog

Compare commits

..

No commits in common. "9ef425d827404b1673d71afad08d1a9e429869ce" and "f66e07e8738f5371db8eb2935412accb4c3839a6" have entirely different histories.

8 changed files with 49 additions and 493 deletions

View file

@ -1,9 +1,9 @@
A syslog daemon implementing circular buffer, in-memory storage.
This is useful when you want to keep some (heavily detailed) log available, but you don't want to log too many
things to disk. Remember: logging is useful, but can be dangerous to your users' privacy!
This is useful when you want to keep some (heavy detailed) log available, but you don't want to log too many
things to disk.
On your "main" syslog, forward (part of the) messages to this one!
On your "main" syslog, send some message to this one!
## Integration examples
@ -27,59 +27,15 @@ and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/
## Client
`circolog` has its own client: `circolog-tail`. It is intended to resemble `tail -f` for the most basic
options; however, it will include filtering options that are common when you want to read logs, because that's
very easy when you have structured logs available.
However, one design point of circolog is to be usable without having a specific client: so the logs are
offered on both HTTP and websocket. This means that you can use `curl` if you want:
`curl` might be enough of a client for most uses.
curl --unix-socket /run/circolog-query.sock localhost/
will give you everything that circologd has in memory.
will give you everything that circologd has in memory
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
any websocket client supporting UNIX domain socket, so you have two options:
1. Use `circolog-tail`
1. wait until I write a proper `circolog-tail` client implementing it all
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.
### HTTP URLs and parameters
When using HTTP, logs are served on `/`. Valid parameters are:
* `l`. This is the amount of lines to send. This is essentially the same as the `-n` parameter on tail
Using `l=-1` (the default) means "give me every log message that you have
* `fmt`. This selects the output format. When `fmt=json` is used, each message is returned as JSON structured
data. The format of those JSON messages is still unstable. `fmt=syslog`, the default, outputs messages using "syslog style" (RFC XXXXXX)
To use websocket, request path `/ws`. The same parameters of `/` are recognized.
## Control daemon
Circologd can be controlled, on some aspects, at run-time. It has 2 mechanisms for that: the easiest, and more
limited, is sending a signal with kill; the second, and more powerful, is a control socket, where you can give
commands to it. This control socket is just HTTP, so again `curl` is your friend. In the future a
`circolog-ctl` client will be developed.
### Pause
When circologd is paused, every new message it receives is immediately discarded. No exception. The backlog
is, however, preserved. This means that you can trigger the event that you want to investigate, pause
circolog, then analyze the logs.
Pausing might be the easiest way to make circologd only run "when needed".
When circologd resumes, no previous message is lost.
To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again.
To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket.
### Clear
When you clear the circologd's buffer, it will discard every message it has, but will keep collecting new
messages.
You can do that with `POST /logs/clear`

View file

@ -11,10 +11,7 @@ import (
"strconv"
"time"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson"
)
func main() {
@ -31,7 +28,6 @@ func main() {
Path: "/ws",
}
q := u.Query()
q.Set("fmt", "bson")
if *backlogLimit >= 0 {
q.Set("l", strconv.Itoa(*backlogLimit))
}
@ -62,20 +58,12 @@ func main() {
go func() {
defer close(done)
for {
_, serialized, err := c.ReadMessage()
_, message, err := c.ReadMessage()
if err != nil {
log.Println("close:", err)
return
}
var parsed format.LogParts
if err := bson.Unmarshal(serialized, &parsed); err != nil {
log.Println("invalid YAML", err)
continue
}
if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {
log.Println("error printing", err)
}
fmt.Println()
fmt.Println(string(message))
}
}()

View file

@ -1,161 +0,0 @@
package main
import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"strings"
)
var globalOpts struct {
ctlSock string
verbose bool
}
var ctl http.Client
type commandFunc func([]string) error
var cmdMap map[string]commandFunc
func init() {
cmdMap = map[string]commandFunc{
// TODO: implement set and get of config at runtime
//"set": setCmd,
//"get": getCmd,
"pause": pauseCmd,
"reload": reloadCmd,
"restart": restartCmd,
"help": helpCmd,
}
}
//func setCmd(ctlSock string, args []string) error {}
//func getCmd(ctlSock string, args []string) error {}
func parsePauseOpts(postponeTime string) (string, error) {
var waitTime int64
var err error
L := len(postponeTime)
switch unit := postponeTime[L-1]; string(unit) {
case "s":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
case "m":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60
case "h":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60
case "d":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60 * 24
case "w":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60 * 24 * 7
}
return string(waitTime), nil
}
func pauseCmd(args []string) error {
var postBody string
var err error
flagset := flag.NewFlagSet(args[0], flag.ExitOnError)
postponeTime := flagset.String("postpone", "", "How long to wait before untoggling the state")
flagset.Parse(args[1:])
if *postponeTime != "" {
postBody, err = parsePauseOpts(*postponeTime)
if err != nil {
return err
}
}
resp, err := ctl.Post("http://unix/pause/toggle", "application/octet-stream", strings.NewReader(postBody))
if globalOpts.verbose {
defer resp.Body.Close()
bodyBytes, err2 := ioutil.ReadAll(resp.Body)
if err2 != nil {
return err2
}
fmt.Println(string(bodyBytes))
}
return err
}
func reloadCmd(args []string) error {
return nil
}
func restartCmd(args []string) error {
return nil
}
func helpCmd(args []string) error {
usage(os.Stdout)
os.Exit(0)
return nil
}
func usage(w io.Writer) {
fmt.Fprintf(w, "USAGE: %s [globalOpts] [SUBCOMMAND] [opts]\n", os.Args[0])
fmt.Fprintf(w, "\nSUBCOMMANDS:\n\n")
for command := range cmdMap {
fmt.Fprintf(w, "\t%s\n", command)
}
}
func parseAndRun(args []string) {
cmdName := args[0]
cmdToRun, ok := cmdMap[cmdName]
if !ok {
fmt.Fprintf(os.Stderr, "Unknown subcommand: %s\n", cmdName)
usage(os.Stderr)
os.Exit(2)
}
// from here: https://gist.github.com/teknoraver/5ffacb8757330715bcbcc90e6d46ac74
ctl = http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", globalOpts.ctlSock)
},
},
}
err := cmdToRun(args)
if err != nil {
fmt.Fprintf(os.Stderr, "Error:\n%s\n", err)
os.Exit(1)
}
}
func main() {
flag.StringVar(&globalOpts.ctlSock, "ctl-socket", "/tmp/circologd-ctl.sock",
"Path to a unix domain socket for the control server; leave empty to disable")
flag.BoolVar(&globalOpts.verbose, "verbose", false, "Print more output")
flag.Parse()
args := flag.Args()
if len(args) == 0 {
usage(os.Stderr)
os.Exit(-1)
}
parseAndRun(args)
}

View file

@ -1,4 +1,4 @@
package formatter
package main
import (
"encoding/json"
@ -8,7 +8,6 @@ import (
"time"
"gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson"
)
// Formatter is an interface, so that multiple implementations can exist
@ -34,67 +33,32 @@ func init() {
))
}
type Format int
type renderFormat int
func (rf *Format) Set(v string) error {
newvalue, err := parseFormat(v)
if err != nil {
return err
}
*rf = newvalue
return nil
}
func (rf Format) String() string {
switch rf {
case FormatJSON:
return "json"
case FormatSyslog:
return "syslog"
case FormatBSON:
return "bson"
}
return ""
}
func (rf Format) WriteFormatted(w io.Writer, msg format.LogParts) error {
return WriteFormatted(w, rf, msg)
}
func parseFormat(s string) (Format, error) {
func parseRenderFormat(s string) (renderFormat, error) {
switch s {
case "json":
return FormatJSON, nil
return formatJSON, nil
case "syslog":
return FormatSyslog, nil
case "bson":
return FormatBSON, nil
return formatSyslog, nil
default:
return 0, fmt.Errorf("Undefined format `%s`", s)
}
}
const (
FormatSyslog = iota // 0
FormatJSON = iota
FormatBSON = iota
formatSyslog = iota // 0
formatJSON = iota
)
func WriteFormatted(w io.Writer, f Format, msg format.LogParts) error {
func writeFormatted(w io.Writer, f renderFormat, msg format.LogParts) error {
switch f {
case FormatSyslog:
case formatSyslog:
return syslogTmpl.Execute(w, msg)
case FormatJSON:
case formatJSON:
enc := json.NewEncoder(w)
enc.SetIndent("", " ")
return enc.Encode(msg)
case FormatBSON:
enc, err := bson.Marshal(msg)
if err != nil {
return err
}
_, err = w.Write(enc)
return err
}
return nil
}

View file

@ -9,16 +9,13 @@ import (
"time"
"git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
func setupHTTP(hub circolog.Hub) *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/", getHTTPHandler(hub))
mux.HandleFunc("/ws", getWSHandler(hub))
return mux
func setupHTTP(hub circolog.Hub) {
http.HandleFunc("/", getHTTPHandler(hub))
http.HandleFunc("/ws", getWSHandler(hub))
}
func parseParameterL(r *http.Request) (int, error) {
@ -57,7 +54,7 @@ func parseParameters(r *http.Request) (circolog.ClientOptions, error) {
}
type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd)
Format formatter.Format
Format renderFormat
}
func parseRenderParameters(r *http.Request) (renderOptions, error) {
@ -71,10 +68,11 @@ func parseRenderParameters(r *http.Request) (renderOptions, error) {
if len(val) != 1 {
return opts, errors.New("Format repeated multiple times")
}
err := opts.Format.Set(val[0])
format, err := parseRenderFormat(val[0])
if err != nil {
return opts, err
}
opts.Format = format
}
return opts, nil
}
@ -106,10 +104,9 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
hub.Register <- client
for x := range client.Messages {
if err := render_opts.Format.WriteFormatted(w, x); err == nil {
if render_opts.Format != formatter.FormatJSON { // bleah
w.Write([]byte("\n"))
}
writeFormatted(w, render_opts.Format, x)
if render_opts.Format != formatJSON { // bleah
w.Write([]byte("\n"))
}
}
}
@ -121,18 +118,10 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
WriteBufferSize: 1024,
}
return func(w http.ResponseWriter, r *http.Request) {
render_opts, err := parseRenderParameters(r)
if err != nil {
log.Println("Error parsing:", err)
w.WriteHeader(400)
fmt.Fprintln(w, err)
return
}
opts, err := parseParameters(r)
if err != nil {
log.Println("Error on request parameter \"l\":", err)
w.WriteHeader(400)
fmt.Fprintln(w, err)
return
}
opts.Nofollow = false
@ -167,7 +156,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
if err != nil {
return
}
render_opts.Format.WriteFormatted(w, message)
writeFormatted(w, formatSyslog, message)
if err := w.Close(); err != nil {
return

View file

@ -1,82 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
fractal "git.lattuga.net/blallo/gotools/formatting"
"git.lattuga.net/boyska/circolog"
"github.com/gorilla/mux"
)
func setupHTTPCtl(hub circolog.Hub, verbose bool) *mux.Router {
m := mux.NewRouter()
m.HandleFunc("/pause/toggle", togglePause(hub, verbose)).Methods("POST")
m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
m.HandleFunc("/echo", echo(verbose)).Methods("GET")
return m
}
func togglePause(hub circolog.Hub, verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - toggled pause", r.Method, r.RemoteAddr)
}
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
resp := <-hub.Responses
active := resp.Value.(bool)
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"paused": !active})
}
}
func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - cleared queue", r.Method, r.RemoteAddr)
}
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
resp := <-hub.Responses
success := resp.Value.(bool)
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"success": success})
}
}
func printHelp(verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - asked for help", r.Method, r.RemoteAddr)
}
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
var pathsWithDocs = map[string]string{
"/pause/toggle": "Toggle the server from pause state (not listening)",
"/logs/clear": "Wipe the buffer from all the messages",
"/help": "This help",
"/echo": "Answers to heartbeat",
}
fractal.EncodeJSON(pathsWithDocs, enc)
if verbose {
errEnc := json.NewEncoder(os.Stderr)
fractal.EncodeJSON(pathsWithDocs, errEnc)
}
}
}
func echo(verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Echo")
if verbose {
log.Printf("[%s] %s - asked for echo", r.Method, r.RemoteAddr)
}
w.Header().Set("content-type", "text/plain")
fmt.Fprintln(w, "I am on!")
}
}

View file

@ -3,12 +3,10 @@ package main
import (
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"git.lattuga.net/boyska/circolog"
syslog "gopkg.in/mcuadros/go-syslog.v2"
@ -16,7 +14,7 @@ import (
func cleanSocket(socket string) {
if err := os.Remove(socket); err != nil {
fmt.Fprintln(os.Stderr, "Error cleaning", socket, ":", err)
fmt.Fprintln(os.Stderr, socket, ":", err)
}
}
@ -28,16 +26,13 @@ func main() {
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable")
verbose := flag.Bool("verbose", false, "Print more output executing the daemon")
flag.Parse()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGTERM)
signal.Notify(interrupt, os.Interrupt)
hub := circolog.NewHub(*bufsize)
handler := syslog.NewChannelHandler(hub.LogMessages)
go hub.Run()
server := syslog.NewServer()
server.SetFormat(syslog.RFC5424)
@ -60,82 +55,31 @@ func main() {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
go hub.Run()
httpQueryServer := http.Server{Handler: setupHTTP(hub)}
setupHTTP(hub)
if *querySocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
unixListener, err := net.Listen("unix", *querySocket)
if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
return
}
defer cleanSocket(*querySocket)
go func() {
if err := httpQueryServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding", *querySocket, ":", err)
}
}()
go http.Serve(unixListener, nil)
} else {
httpQueryServer.Addr = *queryAddr
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
go func() {
err := httpQueryServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding", *queryAddr, ":", err)
}
}()
go http.ListenAndServe(*queryAddr, nil)
}
httpCtlServer := http.Server{Handler: setupHTTPCtl(hub, *verbose)}
if *ctlSocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *ctlSocket)
unixListener, err := net.Listen("unix", *ctlSocket)
if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
return
}
defer cleanSocket(*ctlSocket)
go func() {
if err := httpCtlServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding:", err)
}
}()
}
// TODO: now we are ready
for {
select {
case sig := <-interrupt:
if sig == syscall.SIGUSR1 {
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
resp := <-hub.Responses
if resp.Value.(bool) {
log.Println("resumed")
} else {
log.Println("paused")
}
}
if sig == syscall.SIGUSR2 {
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
resp := <-hub.Responses
if resp.Value.(bool) {
log.Println("buffer cleaned")
} else {
log.Println("buffer NOT cleaned")
}
}
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
log.Println("Quitting because of signal", sig)
server.Kill()
if err := httpQueryServer.Shutdown(nil); err != nil {
fmt.Fprintln(os.Stderr, "Error closing http server:", err)
}
if err := httpCtlServer.Shutdown(nil); err != nil {
fmt.Fprintln(os.Stderr, "Error closing control server:", err)
}
return
case <-interrupt:
server.Kill()
//server.Wait()
if *syslogSocketPath != "" {
}
return
default:
}
}
}

58
hub.go
View file

@ -24,29 +24,10 @@ type ClientOptions struct {
// The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given
// An HubCommand is an "enum" of different commands
type HubCommand int
const (
CommandClear = iota
CommandPauseToggle = iota
)
// An HubFullCommand is a Command, complete with arguments
type HubFullCommand struct {
Command HubCommand
}
type CommandResponse struct {
Value interface{}
}
type Hub struct {
Register chan Client
Unregister chan Client
LogMessages chan format.LogParts
Commands chan HubFullCommand
Responses chan CommandResponse
clients map[Client]bool
circbuf *ring.Ring
@ -58,8 +39,6 @@ func NewHub(ringBufSize int) Hub {
Register: make(chan Client),
Unregister: make(chan Client),
LogMessages: make(chan format.LogParts),
Commands: make(chan HubFullCommand),
Responses: make(chan CommandResponse),
circbuf: ring.New(ringBufSize),
}
}
@ -98,7 +77,6 @@ func (h *Hub) register(cl Client) {
// Run is hub main loop; keeps everything going
func (h *Hub) Run() {
active := true
for {
select {
case cl := <-h.Register:
@ -110,36 +88,16 @@ func (h *Hub) Run() {
delete(h.clients, cl)
}
case msg := <-h.LogMessages:
if active == true {
h.circbuf.Value = msg
h.circbuf = h.circbuf.Next()
for client := range h.clients {
select { // send without blocking
case client.Messages <- msg:
break
default:
break
}
h.circbuf.Value = msg
h.circbuf = h.circbuf.Next()
for client := range h.clients {
select { // send without blocking
case client.Messages <- msg:
break
default:
break
}
}
case cmd := <-h.Commands:
if cmd.Command == CommandClear {
h.clear()
h.Responses <- CommandResponse{Value: true}
}
if cmd.Command == CommandPauseToggle {
active = !active
h.Responses <- CommandResponse{Value: active}
}
}
}
}
// Clear removes all elements from the buffer
func (h *Hub) clear() {
buf := h.circbuf
for i := 0; i < buf.Len(); i++ {
buf.Value = nil
buf = buf.Next()
}
}