1
0
Fork 0
forked from boyska/circolog

Merge branch 'sqlquery'

This commit is contained in:
boyska 2018-12-26 02:21:29 +01:00
commit d1c3c32164
7 changed files with 167 additions and 6 deletions

View file

@ -72,10 +72,10 @@ Pausing might be the easiest way to make circologd only run "when needed".
When circologd resumes, no previous message is lost. When circologd resumes, no previous message is lost.
To pause/unpause:
To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again. * `circologctl pause`
* `pkill -USR1 circologd`
To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket. * `POST /pause/toggle` to your circologd control socket
### Clear ### Clear
@ -83,3 +83,27 @@ When you clear the circologd's buffer, it will discard every message it has, but
messages. messages.
You can do that with `POST /logs/clear` You can do that with `POST /logs/clear`
### Filter
circologd can drop irrelevant messages using filters. A filter is a sql-like expression (for the exact syntax
you can see [the doc for the underlying library](https://github.com/araddon/qlbridge/blob/master/FilterQL.md),
qlbridge), but just imitating sql where clauses can be enough!
`circologctl filter message NOT LIKE '%usb%'` will discard everything related to usb.
The filter will be applied to incoming messages, so messages mentioning usb will not be saved in memory at all.
You can put zero or one filters at a time. That is, you can not stack more filters... but FilterQL syntax
supports `AND` operators, so this is not an issue.
To remove filtering (thus accepting every message) run `circologctl filter`
NOTE: `circolog-tail` supports filters with exactly the same syntax, but they are two different kinds of
filtering: one is server-side, the other is client-side. When you filter server-side with `circologctl
filter`, circologd will refuse messages not matching the filter. If you only filter with `circolog-tail`, the
message you are filtering out will still consume space in memory (and will be available to other clients).
Filtering brings big dependencies, which will add some 5-6 megabytes to circolog binaries. If you want to
avoid it, install with `go install -tags nofilter git.lattuga.net/boyska/circolog/...` and your binaries will
be a bit smaller.

View file

@ -11,6 +11,7 @@ import (
"strconv" "strconv"
"time" "time"
"git.lattuga.net/boyska/circolog/filtering"
"git.lattuga.net/boyska/circolog/formatter" "git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
@ -21,6 +22,8 @@ func main() {
addr := flag.String("addr", "localhost:9080", "http service address") addr := flag.String("addr", "localhost:9080", "http service address")
querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server") querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server")
backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)") backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)")
var filter filtering.ExprValue
flag.Var(&filter, "where", "sql-like query to filter logs")
flag.Parse() flag.Parse()
interrupt := make(chan os.Signal, 1) interrupt := make(chan os.Signal, 1)
@ -69,7 +72,10 @@ func main() {
} }
var parsed format.LogParts var parsed format.LogParts
if err := bson.Unmarshal(serialized, &parsed); err != nil { if err := bson.Unmarshal(serialized, &parsed); err != nil {
log.Println("invalid YAML", err) log.Println("invalid BSON", err)
continue
}
if !filter.Validate(parsed) {
continue continue
} }
if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil { if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {

View file

@ -9,6 +9,7 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"strings"
"time" "time"
) )
@ -30,6 +31,7 @@ func init() {
//"set": setCmd, //"set": setCmd,
//"get": getCmd, //"get": getCmd,
"pause": pauseCmd, "pause": pauseCmd,
"filter": filterCmd,
"reload": reloadCmd, "reload": reloadCmd,
"restart": restartCmd, "restart": restartCmd,
"help": helpCmd, "help": helpCmd,
@ -40,6 +42,24 @@ func init() {
//func getCmd(ctlSock string, args []string) error {} //func getCmd(ctlSock string, args []string) error {}
func filterCmd(args []string) error {
filter := strings.Join(args[1:], " ")
postBody := make(map[string][]string)
postBody["where"] = []string{filter}
if globalOpts.debug {
fmt.Println("[DEBUG] postBody:", postBody)
}
resp, err := ctl.PostForm("http://unix/filter", postBody)
if resp.StatusCode != 200 || globalOpts.verbose {
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Println(string(bodyBytes))
}
return err
}
func pauseCmd(args []string) error { func pauseCmd(args []string) error {
var dontChangeAgain time.Duration var dontChangeAgain time.Duration
flagset := flag.NewFlagSet(args[0], flag.ExitOnError) flagset := flag.NewFlagSet(args[0], flag.ExitOnError)

View file

@ -16,6 +16,7 @@ import (
func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router { func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router {
m := mux.NewRouter() m := mux.NewRouter()
m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST") m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST")
m.HandleFunc("/filter", setFilter(hub, verbose, debug)).Methods("POST")
m.HandleFunc("/status", getStatus(hub, verbose, debug)).Methods("GET") m.HandleFunc("/status", getStatus(hub, verbose, debug)).Methods("GET")
m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST") m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
m.HandleFunc("/help", printHelp(verbose)).Methods("GET") m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
@ -71,6 +72,26 @@ func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
} }
} }
func setFilter(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
where := r.FormValue("where")
response := make(chan circolog.CommandResponse)
hub.Commands <- circolog.HubFullCommand{
Command: circolog.CommandNewFilter,
Response: response,
Parameters: map[string]interface{}{"where": where},
}
resp := <-response
if !resp.Value.(map[string]interface{})["success"].(bool) {
w.WriteHeader(400)
}
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(resp.Value)
}
}
func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc { func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if verbose { if verbose {

56
filtering/filter.go Normal file
View file

@ -0,0 +1,56 @@
// +build !nofilter
package filtering
import (
"fmt"
"os"
"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/expr"
"github.com/araddon/qlbridge/value"
"github.com/araddon/qlbridge/vm"
)
type ExprValue struct {
node expr.Node
expression string
}
func (e *ExprValue) String() string {
if e.node != nil {
return e.node.String()
} else {
return "<Empty Expression>"
}
}
func (e *ExprValue) Set(value string) error {
if value == "" {
e.node = nil
e.expression = value
return nil
}
ast, err := expr.ParseExpression(value)
if err != nil {
return err
}
e.node = ast
e.expression = value
return nil
}
func (e *ExprValue) Validate(line map[string]interface{}) bool {
if e.node == nil {
return true
}
context := datasource.NewContextSimpleNative(line)
val, ok := vm.Eval(context, e.node)
if !ok || val == nil { // errors when evaluating
return false
}
if bv, isBool := val.(value.BoolValue); isBool {
return bv.Val()
}
fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean")
return false
}

17
filtering/filter_fake.go Normal file
View file

@ -0,0 +1,17 @@
// +build nofilter
package filtering
type ExprValue struct {
}
func (e *ExprValue) String() string {
return "<filtering disabled>"
}
func (e *ExprValue) Set(value string) error {
return nil
}
func (e *ExprValue) Validate(line map[string]interface{}) bool {
return true
}

19
hub.go
View file

@ -6,6 +6,7 @@ import (
"os" "os"
"time" "time"
"git.lattuga.net/boyska/circolog/filtering"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
) )
@ -34,6 +35,7 @@ const (
CommandClear = iota CommandClear = iota
CommandPauseToggle = iota CommandPauseToggle = iota
CommandStatus = iota CommandStatus = iota
CommandNewFilter = iota
) )
// An HubFullCommand is a Command, complete with arguments // An HubFullCommand is a Command, complete with arguments
@ -102,6 +104,7 @@ func (h *Hub) register(cl Client) {
// Run is hub main loop; keeps everything going // Run is hub main loop; keeps everything going
func (h *Hub) Run() { func (h *Hub) Run() {
active := true active := true
var filter filtering.ExprValue
for { for {
select { select {
case cl := <-h.Register: case cl := <-h.Register:
@ -113,7 +116,7 @@ func (h *Hub) Run() {
delete(h.clients, cl) delete(h.clients, cl)
} }
case msg := <-h.LogMessages: case msg := <-h.LogMessages:
if active == true { if active == true && filter.Validate(msg) {
h.circbuf.Value = msg h.circbuf.Value = msg
h.circbuf = h.circbuf.Next() h.circbuf = h.circbuf.Next()
for client := range h.clients { for client := range h.clients {
@ -141,7 +144,21 @@ func (h *Hub) Run() {
cmd.Response <- CommandResponse{Value: map[string]interface{}{ cmd.Response <- CommandResponse{Value: map[string]interface{}{
"size": h.circbuf.Len(), "size": h.circbuf.Len(),
"paused": !active, "paused": !active,
"filter": filter.String(),
}} }}
case CommandNewFilter:
if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
cmd.Response <- CommandResponse{Value: map[string]interface{}{
"success": false,
"error": err.Error(),
}}
} else {
cmd.Response <- CommandResponse{Value: map[string]interface{}{
"success": true,
"error": "",
}}
}
} }
} }
} }