1
0
Fork 0
forked from boyska/circolog

Compare commits

..

No commits in common. "05f9e0f1d199da8e44f310aef0158b6efe72bdd3" and "a2303c1e1d971782e93d3f26190f15f719e1a894" have entirely different histories.

10 changed files with 19 additions and 379 deletions

View file

@ -72,10 +72,10 @@ Pausing might be the easiest way to make circologd only run "when needed".
When circologd resumes, no previous message is lost. When circologd resumes, no previous message is lost.
To pause/unpause:
* `circologctl pause` To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again.
* `pkill -USR1 circologd`
* `POST /pause/toggle` to your circologd control socket To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket.
### Clear ### Clear
@ -83,27 +83,3 @@ When you clear the circologd's buffer, it will discard every message it has, but
messages. messages.
You can do that with `POST /logs/clear` You can do that with `POST /logs/clear`
### Filter
circologd can drop irrelevant messages using filters. A filter is a sql-like expression (for the exact syntax
you can see [the doc for the underlying library](https://github.com/araddon/qlbridge/blob/master/FilterQL.md),
qlbridge), but just imitating sql where clauses can be enough!
`circologctl filter message NOT LIKE '%usb%'` will discard everything related to usb.
The filter will be applied to incoming messages, so messages mentioning usb will not be saved in memory at all.
You can put zero or one filters at a time. That is, you can not stack more filters... but FilterQL syntax
supports `AND` operators, so this is not an issue.
To remove filtering (thus accepting every message) run `circologctl filter`
NOTE: `circolog-tail` supports filters with exactly the same syntax, but they are two different kinds of
filtering: one is server-side, the other is client-side. When you filter server-side with `circologctl
filter`, circologd will refuse messages not matching the filter. If you only filter with `circolog-tail`, the
message you are filtering out will still consume space in memory (and will be available to other clients).
Filtering brings big dependencies, which will add some 5-6 megabytes to circolog binaries. If you want to
avoid it, install with `go install -tags nofilter git.lattuga.net/boyska/circolog/...` and your binaries will
be a bit smaller.

View file

@ -11,63 +11,18 @@ import (
"strconv" "strconv"
"time" "time"
"git.lattuga.net/boyska/circolog/filtering"
"git.lattuga.net/boyska/circolog/formatter" "git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
isatty "github.com/mattn/go-isatty"
"github.com/mgutz/ansi"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
) )
type BoolAuto uint
const (
BoolAuto_NO BoolAuto = iota
BoolAuto_YES BoolAuto = iota
BoolAuto_AUTO BoolAuto = iota
)
func (b *BoolAuto) String() string {
switch *b {
case BoolAuto_NO:
return "no"
case BoolAuto_YES:
return "always"
case BoolAuto_AUTO:
return "auto"
}
return ""
}
func (b *BoolAuto) Set(s string) error {
switch s {
case "auto":
*b = BoolAuto_AUTO
case "always":
*b = BoolAuto_YES
case "no":
*b = BoolAuto_NO
default:
return fmt.Errorf("Invalid value %s", s)
}
return nil
}
func main() { func main() {
addr := flag.String("addr", "localhost:9080", "http service address") addr := flag.String("addr", "localhost:9080", "http service address")
querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server") querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server")
backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)") backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)")
var filter filtering.ExprValue
flag.Var(&filter, "where", "sql-like query to filter logs")
// TODO: change to color-mode=auto/no/always
hasColor := BoolAuto_AUTO
flag.Var(&hasColor, "color", "dis/enable colors")
flag.Parse() flag.Parse()
if hasColor == BoolAuto_NO || (!isatty.IsTerminal(os.Stdout.Fd()) && hasColor != BoolAuto_YES) {
ansi.DisableColors(true)
}
interrupt := make(chan os.Signal, 1) interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt) signal.Notify(interrupt, os.Interrupt)
var d *websocket.Dialer var d *websocket.Dialer
@ -114,10 +69,7 @@ func main() {
} }
var parsed format.LogParts var parsed format.LogParts
if err := bson.Unmarshal(serialized, &parsed); err != nil { if err := bson.Unmarshal(serialized, &parsed); err != nil {
log.Println("invalid BSON", err) log.Println("invalid YAML", err)
continue
}
if !filter.Validate(parsed) {
continue continue
} }
if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil { if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {

View file

@ -2,7 +2,6 @@ package main
import ( import (
"context" "context"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"io" "io"
@ -10,10 +9,7 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"strings"
"time" "time"
"git.lattuga.net/boyska/circolog"
) )
var globalOpts struct { var globalOpts struct {
@ -33,9 +29,7 @@ func init() {
// TODO: implement set and get of config at runtime // TODO: implement set and get of config at runtime
//"set": setCmd, //"set": setCmd,
//"get": getCmd, //"get": getCmd,
"status": statusCmd,
"pause": pauseCmd, "pause": pauseCmd,
"filter": filterCmd,
"reload": reloadCmd, "reload": reloadCmd,
"restart": restartCmd, "restart": restartCmd,
"help": helpCmd, "help": helpCmd,
@ -46,41 +40,6 @@ func init() {
//func getCmd(ctlSock string, args []string) error {} //func getCmd(ctlSock string, args []string) error {}
func statusCmd(args []string) error {
flagset := flag.NewFlagSet(args[0], flag.ExitOnError)
outFormat := flagset.String("format", "plain", "Which format to use as output for this command (json, pretty, plain)")
flagset.Parse(args[1:])
resp, err := ctl.Get("http://unix/status")
if err != nil {
return err
}
defer resp.Body.Close()
respBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
respJSON := make(map[string]circolog.StatusResponse)
err = json.Unmarshal(respBytes, &respJSON)
if err != nil {
return err
}
switch *outFormat {
case "json":
fmt.Printf("%s", string(respBytes))
case "pretty":
prettyJSON, err := json.MarshalIndent(respJSON, "", " ")
if err != nil {
return err
}
fmt.Printf("%s\n", prettyJSON)
case "plain":
fmt.Printf("Buffer Size: %d\n", respJSON["status"].Size)
fmt.Printf("Server Status: %s\n", respJSON["status"].Status())
fmt.Printf("Filter String: %s\n", respJSON["status"].Filter)
}
return nil
}
func pauseCmd(args []string) error { func pauseCmd(args []string) error {
var dontChangeAgain time.Duration var dontChangeAgain time.Duration
flagset := flag.NewFlagSet(args[0], flag.ExitOnError) flagset := flag.NewFlagSet(args[0], flag.ExitOnError)
@ -105,25 +64,6 @@ func pauseCmd(args []string) error {
return err return err
} }
func filterCmd(args []string) error {
filter := strings.Join(args[1:], " ")
postBody := make(map[string][]string)
postBody["where"] = []string{filter}
if globalOpts.debug {
fmt.Println("[DEBUG] postBody:", postBody)
}
resp, err := ctl.PostForm("http://unix/filter", postBody)
if resp.StatusCode != 200 || globalOpts.verbose {
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Println(string(bodyBytes))
}
return err
}
func reloadCmd(args []string) error { func reloadCmd(args []string) error {
return nil return nil
} }

View file

@ -16,28 +16,12 @@ import (
func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router { func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router {
m := mux.NewRouter() m := mux.NewRouter()
m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST") m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST")
m.HandleFunc("/filter", setFilter(hub, verbose, debug)).Methods("POST")
m.HandleFunc("/status", getStatus(hub, verbose, debug)).Methods("GET")
m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST") m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
m.HandleFunc("/help", printHelp(verbose)).Methods("GET") m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
m.HandleFunc("/echo", echo(verbose)).Methods("GET") m.HandleFunc("/echo", echo(verbose)).Methods("GET")
return m return m
} }
func getStatus(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
response := make(chan circolog.CommandResponse)
hub.Commands <- circolog.HubFullCommand{
Command: circolog.CommandStatus,
Response: response,
}
resp := <-response
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"status": resp.Value})
}
}
func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc { func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if verbose { if verbose {
@ -50,9 +34,7 @@ func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
if waitTimePar != "" { if waitTimePar != "" {
waitTime, err = time.ParseDuration(waitTimePar) waitTime, err = time.ParseDuration(waitTimePar)
if err != nil { if err != nil {
w.WriteHeader(400) fmt.Println("waitTime not understood:", waitTimePar)
fmt.Fprintln(w, "waitTime not understood:", waitTimePar)
return
} }
} }
if debug { if debug {
@ -72,26 +54,6 @@ func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
} }
} }
func setFilter(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
where := r.FormValue("where")
response := make(chan circolog.CommandResponse)
hub.Commands <- circolog.HubFullCommand{
Command: circolog.CommandNewFilter,
Response: response,
Parameters: map[string]interface{}{"where": where},
}
resp := <-response
if !resp.Value.(map[string]interface{})["success"].(bool) {
w.WriteHeader(400)
}
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(resp.Value)
}
}
func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc { func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if verbose { if verbose {
@ -117,7 +79,6 @@ func printHelp(verbose bool) http.HandlerFunc {
var pathsWithDocs = map[string]string{ var pathsWithDocs = map[string]string{
"/pause/toggle": "Toggle the server from pause state (not listening)", "/pause/toggle": "Toggle the server from pause state (not listening)",
"/logs/clear": "Wipe the buffer from all the messages", "/logs/clear": "Wipe the buffer from all the messages",
"/status": "Get info on the status of the server",
"/help": "This help", "/help": "This help",
"/echo": "Answers to heartbeat", "/echo": "Answers to heartbeat",
} }

View file

@ -9,10 +9,8 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time"
"git.lattuga.net/boyska/circolog" "git.lattuga.net/boyska/circolog"
"github.com/coreos/go-systemd/daemon"
syslog "gopkg.in/mcuadros/go-syslog.v2" syslog "gopkg.in/mcuadros/go-syslog.v2"
) )
@ -105,17 +103,10 @@ func main() {
}() }()
} }
daemon.SdNotify(false, daemon.SdNotifyReady) // TODO: now we are ready
var wdTick <-chan time.Time
if watchdogTime, err := daemon.SdWatchdogEnabled(false); err == nil && watchdogTime != 0 {
fmt.Println("systemd watchdog enabled")
wdTick = time.Tick(watchdogTime / 2) // much less than systemd default of 30s; TODO: make it configurable
}
for { for {
select { select {
case <-wdTick:
daemon.SdNotify(false, daemon.SdNotifyWatchdog)
case sig := <-interrupt: case sig := <-interrupt:
if sig == syscall.SIGUSR1 { if sig == syscall.SIGUSR1 {
response := make(chan circolog.CommandResponse) response := make(chan circolog.CommandResponse)
@ -139,7 +130,6 @@ func main() {
} }
if sig == syscall.SIGTERM || sig == syscall.SIGINT { if sig == syscall.SIGTERM || sig == syscall.SIGINT {
log.Println("Quitting because of signal", sig) log.Println("Quitting because of signal", sig)
daemon.SdNotify(false, daemon.SdNotifyStopping)
server.Kill() server.Kill()
if err := httpQueryServer.Shutdown(nil); err != nil { if err := httpQueryServer.Shutdown(nil); err != nil {
fmt.Fprintln(os.Stderr, "Error closing http server:", err) fmt.Fprintln(os.Stderr, "Error closing http server:", err)

View file

@ -1,21 +0,0 @@
Query language
===================
circolog uses a sql-inspired query language. If you know SQL, then you can use "where clauses" in circolog. If
you don't know SQL, don't worry: the language is easy enough for you to learn the most basic queries without
worrying too much.
You can only filter the rows, you can't sort them or group them in any way.
Reference
-----------
Available fields:
- `message`: the string with the main information
- `app_name`: also known as "program" sometimes
- `facility`: an integer describing auth, daemon, user, etc.
- `hostname`: the hostname where the entry originated
- `timestamp`: date in format `2019-01-07T15:28:58+01:00`
- `severity`: an integer describing severity

View file

@ -1,69 +0,0 @@
// +build !nofilter
package filtering
import (
"fmt"
"os"
"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/expr"
"github.com/araddon/qlbridge/value"
"github.com/araddon/qlbridge/vm"
)
type ExprValue struct {
node expr.Node
expression string
}
func (e *ExprValue) String() string {
if e.node != nil {
return e.node.String()
} else {
return "<Empty Expression>"
}
}
func (e *ExprValue) Set(value string) error {
if value == "" {
e.node = nil
e.expression = value
return nil
}
ast, err := expr.ParseExpression(value)
if err != nil {
return err
}
e.node = ast
e.expression = value
return nil
}
// Validate answers the question wether to include a log line or not.
func (e *ExprValue) Validate(lineInput map[string]interface{}) bool {
if e.node == nil {
return true
}
line := translateMap(lineInput)
context := datasource.NewContextSimpleNative(line)
val, ok := vm.Eval(context, e.node)
if !ok || val == nil { // errors when evaluating
return false
}
if bv, isBool := val.(value.BoolValue); isBool {
return bv.Val()
}
fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean")
return false
}
func translateMap(lineInput map[string]interface{}) map[string]interface{} {
lineOutput := make(map[string]interface{})
lineOutput["prog"] = lineInput["app_name"]
lineOutput["msg"] = lineInput["message"]
lineOutput["facility"] = lineInput["facility"]
lineOutput["host"] = lineInput["hostname"]
lineOutput["time"] = lineInput["timestamp"]
lineOutput["sev"] = lineInput["severity"]
return lineOutput
}

View file

@ -1,17 +0,0 @@
// +build nofilter
package filtering
type ExprValue struct {
}
func (e *ExprValue) String() string {
return "<filtering disabled>"
}
func (e *ExprValue) Set(value string) error {
return nil
}
func (e *ExprValue) Validate(line map[string]interface{}) bool {
return true
}

View file

@ -3,12 +3,10 @@ package formatter
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"hash/fnv"
"io" "io"
"text/template" "text/template"
"time" "time"
"github.com/mgutz/ansi"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
) )
@ -27,39 +25,11 @@ func init() {
"rfc822": func(dt time.Time) string { "rfc822": func(dt time.Time) string {
return dt.Format(time.RFC822) return dt.Format(time.RFC822)
}, },
"sevName": func(s int) string {
names := []string{"emerg ", "alert ", "crit ", "err ", "warn ", "notice", "info ", "dbg "}
switch {
case s < 2: // emerg..alert
return ansi.Color(names[s], "red+b")
case s < 4: // emerg..err
return ansi.Color(names[s], "red")
case s < 6: // warn..notice
return ansi.Color(names[s], "white+b")
case s >= len(names):
return "???"
default:
return names[s]
}
},
"color": func(color, text string) string {
return ansi.Color(text, color) // slow; should use colorfunc
},
"red": ansi.ColorFunc("red+b"),
"autoColor": func(s string) string {
// from https://weechat.org/blog/post/2011/08/28/Beautify-your-WeeChat
palette := []string{"31", "35", "38", "40", "49", "63", "70", "80", "92", "99", "112", "126", "130", "138", "142", "148", "167", "169", "174", "176", "178", "184", "186", "210", "212", "215", "247"}
hash := fnv.New32()
hash.Write([]byte(s))
picked := palette[int(hash.Sum32())%len(palette)]
return ansi.Color(s, picked)
},
} }
syslogTmpl = template.Must(template.New("syslog").Funcs(tmplFuncs).Parse( syslogTmpl = template.Must(template.New("syslog").Funcs(tmplFuncs).Parse(
"{{color \"yellow\" (rfc822 (index . \"timestamp\")) }} {{index . \"hostname\"}} " + "{{rfc822 (index . \"timestamp\")}} {{index . \"hostname\"}} " +
"{{index . \"app_name\" | autoColor}}" + "{{index . \"app_name\"}}" +
"{{ if (ne (index . \"proc_id\") \"-\")}}[{{index . \"proc_id\"}}]{{end}}: " + "{{ if (ne (index . \"proc_id\") \"-\")}}[{{index . \"proc_id\"}}]{{end}}: " +
"{{ sevName (index . \"severity\") }} " +
"{{index . \"message\"}}", "{{index . \"message\"}}",
)) ))
} }

62
hub.go
View file

@ -6,7 +6,6 @@ import (
"os" "os"
"time" "time"
"git.lattuga.net/boyska/circolog/filtering"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
) )
@ -34,8 +33,6 @@ type HubCommand int
const ( const (
CommandClear = iota CommandClear = iota
CommandPauseToggle = iota CommandPauseToggle = iota
CommandStatus = iota
CommandNewFilter = iota
) )
// An HubFullCommand is a Command, complete with arguments // An HubFullCommand is a Command, complete with arguments
@ -44,26 +41,10 @@ type HubFullCommand struct {
Parameters map[string]interface{} Parameters map[string]interface{}
Response chan CommandResponse Response chan CommandResponse
} }
type CommandResponse struct { type CommandResponse struct {
Value interface{} Value interface{}
} }
// StatusResponse is an implementation of a CommandResponse
type StatusResponse struct {
Size int `json:"size"`
IsRunning bool `json:"running"`
Filter string `json:"filter"`
}
// Status return "paused/unpaused" based on isRunning value
func (r StatusResponse) Status() string {
if r.IsRunning {
return "unpaused"
}
return "paused"
}
type Hub struct { type Hub struct {
Register chan Client Register chan Client
Unregister chan Client Unregister chan Client
@ -120,7 +101,6 @@ func (h *Hub) register(cl Client) {
// Run is hub main loop; keeps everything going // Run is hub main loop; keeps everything going
func (h *Hub) Run() { func (h *Hub) Run() {
active := true active := true
var filter filtering.ExprValue
for { for {
select { select {
case cl := <-h.Register: case cl := <-h.Register:
@ -132,7 +112,7 @@ func (h *Hub) Run() {
delete(h.clients, cl) delete(h.clients, cl)
} }
case msg := <-h.LogMessages: case msg := <-h.LogMessages:
if active == true && filter.Validate(msg) { if active == true {
h.circbuf.Value = msg h.circbuf.Value = msg
h.circbuf = h.circbuf.Next() h.circbuf = h.circbuf.Next()
for client := range h.clients { for client := range h.clients {
@ -145,49 +125,27 @@ func (h *Hub) Run() {
} }
} }
case cmd := <-h.Commands: case cmd := <-h.Commands:
switch cmd.Command { if cmd.Command == CommandClear {
case CommandClear:
h.clear() h.clear()
cmd.Response <- CommandResponse{Value: true} cmd.Response <- CommandResponse{Value: true}
case CommandPauseToggle: }
if cmd.Command == CommandPauseToggle {
togglePause(cmd.Parameters["waitTime"].(time.Duration), &active) togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
if active {
fmt.Print("un")
}
fmt.Println("paused")
cmd.Response <- CommandResponse{Value: active} cmd.Response <- CommandResponse{Value: active}
case CommandStatus:
var resp = StatusResponse{
Size: h.circbuf.Len(),
IsRunning: active,
Filter: filter.String(),
}
cmd.Response <- CommandResponse{Value: resp}
case CommandNewFilter:
if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
cmd.Response <- CommandResponse{Value: map[string]interface{}{
"success": false,
"error": err.Error(),
}}
} else {
cmd.Response <- CommandResponse{Value: map[string]interface{}{
"success": true,
"error": "",
}}
}
} }
} }
} }
} }
func togglePause(waitTime time.Duration, status *bool) { func togglePause(waitTime time.Duration, status *bool) {
if waitTime != 0 { var noTime time.Duration
go func() { if waitTime != noTime {
delayedToggle := func() {
time.Sleep(waitTime) time.Sleep(waitTime)
fmt.Fprintln(os.Stderr, "toggling again") fmt.Fprintln(os.Stderr, "toggling again")
togglePause(0, status) togglePause(noTime, status)
}() }
go delayedToggle()
} }
*status = !*status *status = !*status
} }