Browse Source

server-side filtering

circologctl filter lets you load filters on circologd
boyska 5 years ago
parent
commit
14e97dd43e
4 changed files with 70 additions and 2 deletions
  1. 20 0
      cmd/circologctl/main.go
  2. 21 0
      cmd/circologd/http_ctl.go
  3. 11 1
      filtering/filter.go
  4. 18 1
      hub.go

+ 20 - 0
cmd/circologctl/main.go

@@ -9,6 +9,7 @@ import (
 	"net"
 	"net/http"
 	"os"
+	"strings"
 	"time"
 )
 
@@ -30,6 +31,7 @@ func init() {
 		//"set":     setCmd,
 		//"get":     getCmd,
 		"pause":   pauseCmd,
+		"filter":  filterCmd,
 		"reload":  reloadCmd,
 		"restart": restartCmd,
 		"help":    helpCmd,
@@ -40,6 +42,24 @@ func init() {
 
 //func getCmd(ctlSock string, args []string) error {}
 
+func filterCmd(args []string) error {
+	filter := strings.Join(args[1:], " ")
+	postBody := make(map[string][]string)
+	postBody["where"] = []string{filter}
+	if globalOpts.debug {
+		fmt.Println("[DEBUG] postBody:", postBody)
+	}
+	resp, err := ctl.PostForm("http://unix/filter", postBody)
+	if resp.StatusCode != 200 || globalOpts.verbose {
+		defer resp.Body.Close()
+		bodyBytes, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		fmt.Println(string(bodyBytes))
+	}
+	return err
+}
 func pauseCmd(args []string) error {
 	var dontChangeAgain time.Duration
 	flagset := flag.NewFlagSet(args[0], flag.ExitOnError)

+ 21 - 0
cmd/circologd/http_ctl.go

@@ -16,6 +16,7 @@ import (
 func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router {
 	m := mux.NewRouter()
 	m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST")
+	m.HandleFunc("/filter", setFilter(hub, verbose, debug)).Methods("POST")
 	m.HandleFunc("/status", getStatus(hub, verbose, debug)).Methods("GET")
 	m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
 	m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
@@ -71,6 +72,26 @@ func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
 	}
 }
 
+func setFilter(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) {
+		r.ParseForm()
+		where := r.FormValue("where")
+		response := make(chan circolog.CommandResponse)
+		hub.Commands <- circolog.HubFullCommand{
+			Command:    circolog.CommandNewFilter,
+			Response:   response,
+			Parameters: map[string]interface{}{"where": where},
+		}
+		resp := <-response
+		if !resp.Value.(map[string]interface{})["success"].(bool) {
+			w.WriteHeader(400)
+		}
+		w.Header().Set("content-type", "application/json")
+		enc := json.NewEncoder(w)
+		enc.Encode(resp.Value)
+	}
+}
+
 func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		if verbose {

+ 11 - 1
filtering/filter.go

@@ -11,7 +11,8 @@ import (
 )
 
 type ExprValue struct {
-	Node expr.Node
+	Node       expr.Node
+	Expression string
 }
 
 func (e *ExprValue) String() string {
@@ -22,15 +23,24 @@ func (e *ExprValue) String() string {
 	}
 }
 func (e *ExprValue) Set(value string) error {
+	if value == "" {
+		e.Node = nil
+		e.Expression = value
+		return nil
+	}
 	ast, err := expr.ParseExpression(value)
 	if err != nil {
 		return err
 	}
 	e.Node = ast
+	e.Expression = value
 	return nil
 }
 
 func (e *ExprValue) Validate(line map[string]interface{}) bool {
+	if e.Node == nil {
+		return true
+	}
 	context := datasource.NewContextSimpleNative(line)
 	val, ok := vm.Eval(context, e.Node)
 	if !ok || val == nil { // errors when evaluating

+ 18 - 1
hub.go

@@ -6,6 +6,7 @@ import (
 	"os"
 	"time"
 
+	"git.lattuga.net/boyska/circolog/filtering"
 	"gopkg.in/mcuadros/go-syslog.v2/format"
 )
 
@@ -34,6 +35,7 @@ const (
 	CommandClear       = iota
 	CommandPauseToggle = iota
 	CommandStatus      = iota
+	CommandNewFilter   = iota
 )
 
 // An HubFullCommand is a Command, complete with arguments
@@ -102,6 +104,7 @@ func (h *Hub) register(cl Client) {
 // Run is hub main loop; keeps everything going
 func (h *Hub) Run() {
 	active := true
+	var filter filtering.ExprValue
 	for {
 		select {
 		case cl := <-h.Register:
@@ -113,7 +116,7 @@ func (h *Hub) Run() {
 				delete(h.clients, cl)
 			}
 		case msg := <-h.LogMessages:
-			if active == true {
+			if active == true && filter.Validate(msg) {
 				h.circbuf.Value = msg
 				h.circbuf = h.circbuf.Next()
 				for client := range h.clients {
@@ -141,7 +144,21 @@ func (h *Hub) Run() {
 				cmd.Response <- CommandResponse{Value: map[string]interface{}{
 					"size":   h.circbuf.Len(),
 					"paused": !active,
+					"filter": filter.Expression,
 				}}
+			case CommandNewFilter:
+				if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
+					cmd.Response <- CommandResponse{Value: map[string]interface{}{
+						"success": false,
+						"error":   err.Error(),
+					}}
+				} else {
+					cmd.Response <- CommandResponse{Value: map[string]interface{}{
+						"success": true,
+						"error":   "",
+					}}
+				}
+
 			}
 		}
 	}