diff --git a/cmd/circologctl/main.go b/cmd/circologctl/main.go index 0eb7ed9..2433a5f 100644 --- a/cmd/circologctl/main.go +++ b/cmd/circologctl/main.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "os" + "strings" "time" ) @@ -30,6 +31,7 @@ func init() { //"set": setCmd, //"get": getCmd, "pause": pauseCmd, + "filter": filterCmd, "reload": reloadCmd, "restart": restartCmd, "help": helpCmd, @@ -40,6 +42,24 @@ func init() { //func getCmd(ctlSock string, args []string) error {} +func filterCmd(args []string) error { + filter := strings.Join(args[1:], " ") + postBody := make(map[string][]string) + postBody["where"] = []string{filter} + if globalOpts.debug { + fmt.Println("[DEBUG] postBody:", postBody) + } + resp, err := ctl.PostForm("http://unix/filter", postBody) + if resp.StatusCode != 200 || globalOpts.verbose { + defer resp.Body.Close() + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + fmt.Println(string(bodyBytes)) + } + return err +} func pauseCmd(args []string) error { var dontChangeAgain time.Duration flagset := flag.NewFlagSet(args[0], flag.ExitOnError) diff --git a/cmd/circologd/http_ctl.go b/cmd/circologd/http_ctl.go index 26f0c8b..fcfe111 100644 --- a/cmd/circologd/http_ctl.go +++ b/cmd/circologd/http_ctl.go @@ -16,6 +16,7 @@ import ( func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router { m := mux.NewRouter() m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST") + m.HandleFunc("/filter", setFilter(hub, verbose, debug)).Methods("POST") m.HandleFunc("/status", getStatus(hub, verbose, debug)).Methods("GET") m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST") m.HandleFunc("/help", printHelp(verbose)).Methods("GET") @@ -71,6 +72,26 @@ func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc { } } +func setFilter(hub circolog.Hub, verbose, debug bool) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + where := r.FormValue("where") + response := make(chan circolog.CommandResponse) + hub.Commands <- circolog.HubFullCommand{ + Command: circolog.CommandNewFilter, + Response: response, + Parameters: map[string]interface{}{"where": where}, + } + resp := <-response + if !resp.Value.(map[string]interface{})["success"].(bool) { + w.WriteHeader(400) + } + w.Header().Set("content-type", "application/json") + enc := json.NewEncoder(w) + enc.Encode(resp.Value) + } +} + func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if verbose { diff --git a/filtering/filter.go b/filtering/filter.go index 471e449..3025ce8 100644 --- a/filtering/filter.go +++ b/filtering/filter.go @@ -11,7 +11,8 @@ import ( ) type ExprValue struct { - Node expr.Node + Node expr.Node + Expression string } func (e *ExprValue) String() string { @@ -22,15 +23,24 @@ func (e *ExprValue) String() string { } } func (e *ExprValue) Set(value string) error { + if value == "" { + e.Node = nil + e.Expression = value + return nil + } ast, err := expr.ParseExpression(value) if err != nil { return err } e.Node = ast + e.Expression = value return nil } func (e *ExprValue) Validate(line map[string]interface{}) bool { + if e.Node == nil { + return true + } context := datasource.NewContextSimpleNative(line) val, ok := vm.Eval(context, e.Node) if !ok || val == nil { // errors when evaluating diff --git a/hub.go b/hub.go index 209f0ab..058831a 100644 --- a/hub.go +++ b/hub.go @@ -6,6 +6,7 @@ import ( "os" "time" + "git.lattuga.net/boyska/circolog/filtering" "gopkg.in/mcuadros/go-syslog.v2/format" ) @@ -34,6 +35,7 @@ const ( CommandClear = iota CommandPauseToggle = iota CommandStatus = iota + CommandNewFilter = iota ) // An HubFullCommand is a Command, complete with arguments @@ -102,6 +104,7 @@ func (h *Hub) register(cl Client) { // Run is hub main loop; keeps everything going func (h *Hub) Run() { active := true + var filter filtering.ExprValue for { select { case cl := <-h.Register: @@ -113,7 +116,7 @@ func (h *Hub) Run() { delete(h.clients, cl) } case msg := <-h.LogMessages: - if active == true { + if active == true && filter.Validate(msg) { h.circbuf.Value = msg h.circbuf = h.circbuf.Next() for client := range h.clients { @@ -141,7 +144,21 @@ func (h *Hub) Run() { cmd.Response <- CommandResponse{Value: map[string]interface{}{ "size": h.circbuf.Len(), "paused": !active, + "filter": filter.Expression, }} + case CommandNewFilter: + if err := filter.Set(cmd.Parameters["where"].(string)); err != nil { + cmd.Response <- CommandResponse{Value: map[string]interface{}{ + "success": false, + "error": err.Error(), + }} + } else { + cmd.Response <- CommandResponse{Value: map[string]interface{}{ + "success": true, + "error": "", + }} + } + } } }