From 8bb28d7a7c772b0bcf94aeb370b34d007810831c Mon Sep 17 00:00:00 2001 From: boyska Date: Tue, 25 Dec 2018 02:53:46 +0100 Subject: [PATCH 1/7] simple client-side sql filtering --- cmd/circolog-tail/main.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/cmd/circolog-tail/main.go b/cmd/circolog-tail/main.go index ce653a7..efed1e4 100644 --- a/cmd/circolog-tail/main.go +++ b/cmd/circolog-tail/main.go @@ -12,17 +12,30 @@ import ( "time" "git.lattuga.net/boyska/circolog/formatter" + "github.com/araddon/qlbridge/datasource" + "github.com/araddon/qlbridge/expr" + "github.com/araddon/qlbridge/value" + "github.com/araddon/qlbridge/vm" "github.com/gorilla/websocket" "gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mgo.v2/bson" ) +// TODO: type ExpressionValue + func main() { addr := flag.String("addr", "localhost:9080", "http service address") querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server") backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)") + filter := flag.String("where", "", "sql-like query to filter logs") flag.Parse() + filterExpr, err := expr.ParseExpression(*filter) + if err != nil { + fmt.Fprintln(os.Stderr, "invalid filter:", err) + os.Exit(2) + } + interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) var d *websocket.Dialer @@ -69,7 +82,19 @@ func main() { } var parsed format.LogParts if err := bson.Unmarshal(serialized, &parsed); err != nil { - log.Println("invalid YAML", err) + log.Println("invalid BSON", err) + continue + } + context := datasource.NewContextSimpleNative(parsed) + val, ok := vm.Eval(context, filterExpr) + if !ok || val == nil { // errors when evaluating + continue + } + if val.Type() != value.BoolType { + fmt.Fprintln(os.Stderr, "WARNING: The expression doesn't return a boolean") + continue + } + if val.Value().(bool) != true { continue } if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil { From 89419185ed14f30f3dcb6e9330919599c8046bf3 Mon Sep 17 00:00:00 2001 From: boyska Date: Tue, 25 Dec 2018 03:04:34 +0100 Subject: [PATCH 2/7] sql: better client-side handling and validation --- cmd/circolog-tail/main.go | 53 +++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/cmd/circolog-tail/main.go b/cmd/circolog-tail/main.go index efed1e4..1e16e4b 100644 --- a/cmd/circolog-tail/main.go +++ b/cmd/circolog-tail/main.go @@ -21,21 +21,34 @@ import ( "gopkg.in/mgo.v2/bson" ) -// TODO: type ExpressionValue +type ExprValue struct { + Node expr.Node +} + +func (e *ExprValue) String() string { + if e.Node != nil { + return e.Node.String() + } else { + return "" + } +} +func (e *ExprValue) Set(value string) error { + ast, err := expr.ParseExpression(value) + if err != nil { + return err + } + e.Node = ast + return nil +} func main() { addr := flag.String("addr", "localhost:9080", "http service address") querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server") backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)") - filter := flag.String("where", "", "sql-like query to filter logs") + var filter ExprValue + flag.Var(&filter, "where", "sql-like query to filter logs") flag.Parse() - filterExpr, err := expr.ParseExpression(*filter) - if err != nil { - fmt.Fprintln(os.Stderr, "invalid filter:", err) - os.Exit(2) - } - interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) var d *websocket.Dialer @@ -85,17 +98,19 @@ func main() { log.Println("invalid BSON", err) continue } - context := datasource.NewContextSimpleNative(parsed) - val, ok := vm.Eval(context, filterExpr) - if !ok || val == nil { // errors when evaluating - continue - } - if val.Type() != value.BoolType { - fmt.Fprintln(os.Stderr, "WARNING: The expression doesn't return a boolean") - continue - } - if val.Value().(bool) != true { - continue + if filter.Node != nil { + context := datasource.NewContextSimpleNative(parsed) + val, ok := vm.Eval(context, filter.Node) + if !ok || val == nil { // errors when evaluating + continue + } + if val.Type() != value.BoolType { + fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean") + continue + } + if val.Value().(bool) != true { + continue + } } if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil { log.Println("error printing", err) From 8735ad2c21c2d91b5a549875dddaa207036f0e75 Mon Sep 17 00:00:00 2001 From: boyska Date: Tue, 25 Dec 2018 03:17:14 +0100 Subject: [PATCH 3/7] refactor filtering code goal: use filters server-side, too --- cmd/circolog-tail/main.go | 42 ++++------------------------------ filtering/filter.go | 47 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 38 deletions(-) create mode 100644 filtering/filter.go diff --git a/cmd/circolog-tail/main.go b/cmd/circolog-tail/main.go index 1e16e4b..67c7d21 100644 --- a/cmd/circolog-tail/main.go +++ b/cmd/circolog-tail/main.go @@ -11,41 +11,18 @@ import ( "strconv" "time" + "git.lattuga.net/boyska/circolog/filtering" "git.lattuga.net/boyska/circolog/formatter" - "github.com/araddon/qlbridge/datasource" - "github.com/araddon/qlbridge/expr" - "github.com/araddon/qlbridge/value" - "github.com/araddon/qlbridge/vm" "github.com/gorilla/websocket" "gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mgo.v2/bson" ) -type ExprValue struct { - Node expr.Node -} - -func (e *ExprValue) String() string { - if e.Node != nil { - return e.Node.String() - } else { - return "" - } -} -func (e *ExprValue) Set(value string) error { - ast, err := expr.ParseExpression(value) - if err != nil { - return err - } - e.Node = ast - return nil -} - func main() { addr := flag.String("addr", "localhost:9080", "http service address") querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server") backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)") - var filter ExprValue + var filter filtering.ExprValue flag.Var(&filter, "where", "sql-like query to filter logs") flag.Parse() @@ -98,19 +75,8 @@ func main() { log.Println("invalid BSON", err) continue } - if filter.Node != nil { - context := datasource.NewContextSimpleNative(parsed) - val, ok := vm.Eval(context, filter.Node) - if !ok || val == nil { // errors when evaluating - continue - } - if val.Type() != value.BoolType { - fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean") - continue - } - if val.Value().(bool) != true { - continue - } + if filter.Node != nil && !filter.Validate(parsed) { + continue } if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil { log.Println("error printing", err) diff --git a/filtering/filter.go b/filtering/filter.go new file mode 100644 index 0000000..471e449 --- /dev/null +++ b/filtering/filter.go @@ -0,0 +1,47 @@ +package filtering + +import ( + "fmt" + "os" + + "github.com/araddon/qlbridge/datasource" + "github.com/araddon/qlbridge/expr" + "github.com/araddon/qlbridge/value" + "github.com/araddon/qlbridge/vm" +) + +type ExprValue struct { + Node expr.Node +} + +func (e *ExprValue) String() string { + if e.Node != nil { + return e.Node.String() + } else { + return "" + } +} +func (e *ExprValue) Set(value string) error { + ast, err := expr.ParseExpression(value) + if err != nil { + return err + } + e.Node = ast + return nil +} + +func (e *ExprValue) Validate(line map[string]interface{}) bool { + context := datasource.NewContextSimpleNative(line) + val, ok := vm.Eval(context, e.Node) + if !ok || val == nil { // errors when evaluating + return false + } + if val.Type() != value.BoolType { + fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean") + return false + } + if val.Value().(bool) != true { + return false + } + return true +} From 14e97dd43e6fe011e67130ddacc28c0f11d4dc29 Mon Sep 17 00:00:00 2001 From: boyska Date: Tue, 25 Dec 2018 03:52:53 +0100 Subject: [PATCH 4/7] server-side filtering circologctl filter lets you load filters on circologd --- cmd/circologctl/main.go | 20 ++++++++++++++++++++ cmd/circologd/http_ctl.go | 21 +++++++++++++++++++++ filtering/filter.go | 12 +++++++++++- hub.go | 19 ++++++++++++++++++- 4 files changed, 70 insertions(+), 2 deletions(-) diff --git a/cmd/circologctl/main.go b/cmd/circologctl/main.go index 0eb7ed9..2433a5f 100644 --- a/cmd/circologctl/main.go +++ b/cmd/circologctl/main.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "os" + "strings" "time" ) @@ -30,6 +31,7 @@ func init() { //"set": setCmd, //"get": getCmd, "pause": pauseCmd, + "filter": filterCmd, "reload": reloadCmd, "restart": restartCmd, "help": helpCmd, @@ -40,6 +42,24 @@ func init() { //func getCmd(ctlSock string, args []string) error {} +func filterCmd(args []string) error { + filter := strings.Join(args[1:], " ") + postBody := make(map[string][]string) + postBody["where"] = []string{filter} + if globalOpts.debug { + fmt.Println("[DEBUG] postBody:", postBody) + } + resp, err := ctl.PostForm("http://unix/filter", postBody) + if resp.StatusCode != 200 || globalOpts.verbose { + defer resp.Body.Close() + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + fmt.Println(string(bodyBytes)) + } + return err +} func pauseCmd(args []string) error { var dontChangeAgain time.Duration flagset := flag.NewFlagSet(args[0], flag.ExitOnError) diff --git a/cmd/circologd/http_ctl.go b/cmd/circologd/http_ctl.go index 26f0c8b..fcfe111 100644 --- a/cmd/circologd/http_ctl.go +++ b/cmd/circologd/http_ctl.go @@ -16,6 +16,7 @@ import ( func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router { m := mux.NewRouter() m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST") + m.HandleFunc("/filter", setFilter(hub, verbose, debug)).Methods("POST") m.HandleFunc("/status", getStatus(hub, verbose, debug)).Methods("GET") m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST") m.HandleFunc("/help", printHelp(verbose)).Methods("GET") @@ -71,6 +72,26 @@ func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc { } } +func setFilter(hub circolog.Hub, verbose, debug bool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + where := r.FormValue("where") + response := make(chan circolog.CommandResponse) + hub.Commands <- circolog.HubFullCommand{ + Command: circolog.CommandNewFilter, + Response: response, + Parameters: map[string]interface{}{"where": where}, + } + resp := <-response + if !resp.Value.(map[string]interface{})["success"].(bool) { + w.WriteHeader(400) + } + w.Header().Set("content-type", "application/json") + enc := json.NewEncoder(w) + enc.Encode(resp.Value) + } +} + func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if verbose { diff --git a/filtering/filter.go b/filtering/filter.go index 471e449..3025ce8 100644 --- a/filtering/filter.go +++ b/filtering/filter.go @@ -11,7 +11,8 @@ import ( ) type ExprValue struct { - Node expr.Node + Node expr.Node + Expression string } func (e *ExprValue) String() string { @@ -22,15 +23,24 @@ func (e *ExprValue) String() string { } } func (e *ExprValue) Set(value string) error { + if value == "" { + e.Node = nil + e.Expression = value + return nil + } ast, err := expr.ParseExpression(value) if err != nil { return err } e.Node = ast + e.Expression = value return nil } func (e *ExprValue) Validate(line map[string]interface{}) bool { + if e.Node == nil { + return true + } context := datasource.NewContextSimpleNative(line) val, ok := vm.Eval(context, e.Node) if !ok || val == nil { // errors when evaluating diff --git a/hub.go b/hub.go index 209f0ab..058831a 100644 --- a/hub.go +++ b/hub.go @@ -6,6 +6,7 @@ import ( "os" "time" + "git.lattuga.net/boyska/circolog/filtering" "gopkg.in/mcuadros/go-syslog.v2/format" ) @@ -34,6 +35,7 @@ const ( CommandClear = iota CommandPauseToggle = iota CommandStatus = iota + CommandNewFilter = iota ) // An HubFullCommand is a Command, complete with arguments @@ -102,6 +104,7 @@ func (h *Hub) register(cl Client) { // Run is hub main loop; keeps everything going func (h *Hub) Run() { active := true + var filter filtering.ExprValue for { select { case cl := <-h.Register: @@ -113,7 +116,7 @@ func (h *Hub) Run() { delete(h.clients, cl) } case msg := <-h.LogMessages: - if active == true { + if active == true && filter.Validate(msg) { h.circbuf.Value = msg h.circbuf = h.circbuf.Next() for client := range h.clients { @@ -141,7 +144,21 @@ func (h *Hub) Run() { cmd.Response <- CommandResponse{Value: map[string]interface{}{ "size": h.circbuf.Len(), "paused": !active, + "filter": filter.Expression, }} + case CommandNewFilter: + if err := filter.Set(cmd.Parameters["where"].(string)); err != nil { + cmd.Response <- CommandResponse{Value: map[string]interface{}{ + "success": false, + "error": err.Error(), + }} + } else { + cmd.Response <- CommandResponse{Value: map[string]interface{}{ + "success": true, + "error": "", + }} + } + } } } From 658a4bbb1eb4805c5319b6d011435f4832acbe48 Mon Sep 17 00:00:00 2001 From: boyska Date: Wed, 26 Dec 2018 01:29:39 +0100 Subject: [PATCH 5/7] filtering can be disabled with -tags nofilter it will make your binaries way smaller --- cmd/circolog-tail/main.go | 2 +- filtering/filter.go | 22 ++++++++++++---------- filtering/filter_fake.go | 17 +++++++++++++++++ hub.go | 2 +- 4 files changed, 31 insertions(+), 12 deletions(-) create mode 100644 filtering/filter_fake.go diff --git a/cmd/circolog-tail/main.go b/cmd/circolog-tail/main.go index 67c7d21..41bc920 100644 --- a/cmd/circolog-tail/main.go +++ b/cmd/circolog-tail/main.go @@ -75,7 +75,7 @@ func main() { log.Println("invalid BSON", err) continue } - if filter.Node != nil && !filter.Validate(parsed) { + if !filter.Validate(parsed) { continue } if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil { diff --git a/filtering/filter.go b/filtering/filter.go index 3025ce8..0deebed 100644 --- a/filtering/filter.go +++ b/filtering/filter.go @@ -1,3 +1,5 @@ +// +build !nofilter + package filtering import ( @@ -11,38 +13,38 @@ import ( ) type ExprValue struct { - Node expr.Node - Expression string + node expr.Node + expression string } func (e *ExprValue) String() string { - if e.Node != nil { - return e.Node.String() + if e.node != nil { + return e.node.String() } else { return "" } } func (e *ExprValue) Set(value string) error { if value == "" { - e.Node = nil - e.Expression = value + e.node = nil + e.expression = value return nil } ast, err := expr.ParseExpression(value) if err != nil { return err } - e.Node = ast - e.Expression = value + e.node = ast + e.expression = value return nil } func (e *ExprValue) Validate(line map[string]interface{}) bool { - if e.Node == nil { + if e.node == nil { return true } context := datasource.NewContextSimpleNative(line) - val, ok := vm.Eval(context, e.Node) + val, ok := vm.Eval(context, e.node) if !ok || val == nil { // errors when evaluating return false } diff --git a/filtering/filter_fake.go b/filtering/filter_fake.go new file mode 100644 index 0000000..223b1b8 --- /dev/null +++ b/filtering/filter_fake.go @@ -0,0 +1,17 @@ +// +build nofilter + +package filtering + +type ExprValue struct { +} + +func (e *ExprValue) String() string { + return "" +} + +func (e *ExprValue) Set(value string) error { + return nil +} +func (e *ExprValue) Validate(line map[string]interface{}) bool { + return true +} diff --git a/hub.go b/hub.go index 058831a..15d9da7 100644 --- a/hub.go +++ b/hub.go @@ -144,7 +144,7 @@ func (h *Hub) Run() { cmd.Response <- CommandResponse{Value: map[string]interface{}{ "size": h.circbuf.Len(), "paused": !active, - "filter": filter.Expression, + "filter": filter.String(), }} case CommandNewFilter: if err := filter.Set(cmd.Parameters["where"].(string)); err != nil { From 5db7e2f01bfb0d6cc43ec98baaa32fe797df4c0e Mon Sep 17 00:00:00 2001 From: boyska Date: Wed, 26 Dec 2018 01:54:30 +0100 Subject: [PATCH 6/7] filtering code cleanup --- filtering/filter.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/filtering/filter.go b/filtering/filter.go index 0deebed..bc3f825 100644 --- a/filtering/filter.go +++ b/filtering/filter.go @@ -48,12 +48,9 @@ func (e *ExprValue) Validate(line map[string]interface{}) bool { if !ok || val == nil { // errors when evaluating return false } - if val.Type() != value.BoolType { - fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean") - return false + if bv, isBool := val.(value.BoolValue); isBool { + return bv.Val() } - if val.Value().(bool) != true { - return false - } - return true + fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean") + return false } From 6f638735910d606984dfefd8276282ab41eeaa72 Mon Sep 17 00:00:00 2001 From: boyska Date: Wed, 26 Dec 2018 02:21:15 +0100 Subject: [PATCH 7/7] filtering explained in README.md --- README.md | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8e79a89..75da52a 100644 --- a/README.md +++ b/README.md @@ -72,10 +72,10 @@ Pausing might be the easiest way to make circologd only run "when needed". When circologd resumes, no previous message is lost. - -To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again. - -To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket. +To pause/unpause: + * `circologctl pause` + * `pkill -USR1 circologd` + * `POST /pause/toggle` to your circologd control socket ### Clear @@ -83,3 +83,27 @@ When you clear the circologd's buffer, it will discard every message it has, but messages. You can do that with `POST /logs/clear` + +### Filter + +circologd can drop irrelevant messages using filters. A filter is a sql-like expression (for the exact syntax +you can see [the doc for the underlying library](https://github.com/araddon/qlbridge/blob/master/FilterQL.md), +qlbridge), but just imitating sql where clauses can be enough! + +`circologctl filter message NOT LIKE '%usb%'` will discard everything related to usb. + +The filter will be applied to incoming messages, so messages mentioning usb will not be saved in memory at all. + +You can put zero or one filters at a time. That is, you can not stack more filters... but FilterQL syntax +supports `AND` operators, so this is not an issue. + +To remove filtering (thus accepting every message) run `circologctl filter` + +NOTE: `circolog-tail` supports filters with exactly the same syntax, but they are two different kinds of +filtering: one is server-side, the other is client-side. When you filter server-side with `circologctl +filter`, circologd will refuse messages not matching the filter. If you only filter with `circolog-tail`, the +message you are filtering out will still consume space in memory (and will be available to other clients). + +Filtering brings big dependencies, which will add some 5-6 megabytes to circolog binaries. If you want to +avoid it, install with `go install -tags nofilter git.lattuga.net/boyska/circolog/...` and your binaries will +be a bit smaller.