Browse Source

Merge branch 'master' into statusctl

Blallo 5 years ago
parent
commit
98a06659cb
8 changed files with 235 additions and 9 deletions
  1. 28 4
      README.md
  2. 49 1
      cmd/circolog-tail/main.go
  3. 21 0
      cmd/circologctl/main.go
  4. 21 0
      cmd/circologd/http_ctl.go
  5. 56 0
      filtering/filter.go
  6. 17 0
      filtering/filter_fake.go
  7. 22 1
      formatter/format.go
  8. 21 3
      hub.go

+ 28 - 4
README.md

@@ -72,10 +72,10 @@ Pausing might be the easiest way to make circologd only run "when needed".
 
 When circologd resumes, no previous message is lost.
 
-
-To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again.
-
-To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket.
+To pause/unpause:
+ * `circologctl pause`
+ * `pkill -USR1 circologd`
+ * `POST /pause/toggle` to your circologd control socket
 
 ### Clear
 
@@ -83,3 +83,27 @@ When you clear the circologd's buffer, it will discard every message it has, but
 messages.
 
 You can do that with `POST /logs/clear`
+
+### Filter
+
+circologd can drop irrelevant messages using filters. A filter is a sql-like expression (for the exact syntax
+you can see [the doc for the underlying library](https://github.com/araddon/qlbridge/blob/master/FilterQL.md),
+qlbridge), but just imitating sql where clauses can be enough!
+
+`circologctl filter message NOT LIKE '%usb%'` will discard everything related to usb.
+
+The filter will be applied to incoming messages, so messages mentioning usb will not be saved in memory at all.
+
+You can put zero or one filters at a time. That is, you can not stack more filters... but FilterQL syntax
+supports `AND` operators, so this is not an issue.
+
+To remove filtering (thus accepting every message) run `circologctl filter`
+
+NOTE: `circolog-tail` supports filters with exactly the same syntax, but they are two different kinds of
+filtering: one is server-side, the other is client-side. When you filter server-side with `circologctl
+filter`, circologd will refuse messages not matching the filter. If you only filter with `circolog-tail`, the
+message you are filtering out will still consume space in memory (and will be available to other clients).
+
+Filtering brings big dependencies, which will add some 5-6 megabytes to circolog binaries. If you want to
+avoid it, install with `go install -tags nofilter git.lattuga.net/boyska/circolog/...` and your binaries will
+be a bit smaller.

+ 49 - 1
cmd/circolog-tail/main.go

@@ -11,18 +11,63 @@ import (
 	"strconv"
 	"time"
 
+	"git.lattuga.net/boyska/circolog/filtering"
 	"git.lattuga.net/boyska/circolog/formatter"
 	"github.com/gorilla/websocket"
+	isatty "github.com/mattn/go-isatty"
+	"github.com/mgutz/ansi"
 	"gopkg.in/mcuadros/go-syslog.v2/format"
 	"gopkg.in/mgo.v2/bson"
 )
 
+type BoolAuto uint
+
+const (
+	BoolAuto_NO   BoolAuto = iota
+	BoolAuto_YES  BoolAuto = iota
+	BoolAuto_AUTO BoolAuto = iota
+)
+
+func (b *BoolAuto) String() string {
+	switch *b {
+	case BoolAuto_NO:
+		return "no"
+	case BoolAuto_YES:
+		return "always"
+	case BoolAuto_AUTO:
+		return "auto"
+	}
+	return ""
+}
+func (b *BoolAuto) Set(s string) error {
+	switch s {
+	case "auto":
+		*b = BoolAuto_AUTO
+	case "always":
+		*b = BoolAuto_YES
+	case "no":
+		*b = BoolAuto_NO
+	default:
+		return fmt.Errorf("Invalid value %s", s)
+	}
+	return nil
+}
+
 func main() {
 	addr := flag.String("addr", "localhost:9080", "http service address")
 	querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server")
 	backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)")
+	var filter filtering.ExprValue
+	flag.Var(&filter, "where", "sql-like query to filter logs")
+	// TODO: change to color-mode=auto/no/always
+	hasColor := BoolAuto_AUTO
+	flag.Var(&hasColor, "color", "dis/enable colors")
 	flag.Parse()
 
+	if hasColor == BoolAuto_NO || (!isatty.IsTerminal(os.Stdout.Fd()) && hasColor != BoolAuto_YES) {
+		ansi.DisableColors(true)
+	}
+
 	interrupt := make(chan os.Signal, 1)
 	signal.Notify(interrupt, os.Interrupt)
 	var d *websocket.Dialer
@@ -69,7 +114,10 @@ func main() {
 			}
 			var parsed format.LogParts
 			if err := bson.Unmarshal(serialized, &parsed); err != nil {
-				log.Println("invalid YAML", err)
+				log.Println("invalid BSON", err)
+				continue
+			}
+			if !filter.Validate(parsed) {
 				continue
 			}
 			if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {

+ 21 - 0
cmd/circologctl/main.go

@@ -10,6 +10,7 @@ import (
 	"net"
 	"net/http"
 	"os"
+	"strings"
 	"time"
 
 	"git.lattuga.net/boyska/circolog"
@@ -34,6 +35,7 @@ func init() {
 		//"get":     getCmd,
 		"status":  statusCmd,
 		"pause":   pauseCmd,
+		"filter":  filterCmd,
 		"reload":  reloadCmd,
 		"restart": restartCmd,
 		"help":    helpCmd,
@@ -102,6 +104,25 @@ func pauseCmd(args []string) error {
 	return err
 }
 
+func filterCmd(args []string) error {
+	filter := strings.Join(args[1:], " ")
+	postBody := make(map[string][]string)
+	postBody["where"] = []string{filter}
+	if globalOpts.debug {
+		fmt.Println("[DEBUG] postBody:", postBody)
+	}
+	resp, err := ctl.PostForm("http://unix/filter", postBody)
+	if resp.StatusCode != 200 || globalOpts.verbose {
+		defer resp.Body.Close()
+		bodyBytes, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		fmt.Println(string(bodyBytes))
+	}
+	return err
+}
+
 func reloadCmd(args []string) error {
 	return nil
 }

+ 21 - 0
cmd/circologd/http_ctl.go

@@ -16,6 +16,7 @@ import (
 func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router {
 	m := mux.NewRouter()
 	m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST")
+	m.HandleFunc("/filter", setFilter(hub, verbose, debug)).Methods("POST")
 	m.HandleFunc("/status", getStatus(hub, verbose, debug)).Methods("GET")
 	m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
 	m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
@@ -71,6 +72,26 @@ func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
 	}
 }
 
+func setFilter(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) {
+		r.ParseForm()
+		where := r.FormValue("where")
+		response := make(chan circolog.CommandResponse)
+		hub.Commands <- circolog.HubFullCommand{
+			Command:    circolog.CommandNewFilter,
+			Response:   response,
+			Parameters: map[string]interface{}{"where": where},
+		}
+		resp := <-response
+		if !resp.Value.(map[string]interface{})["success"].(bool) {
+			w.WriteHeader(400)
+		}
+		w.Header().Set("content-type", "application/json")
+		enc := json.NewEncoder(w)
+		enc.Encode(resp.Value)
+	}
+}
+
 func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 		if verbose {

+ 56 - 0
filtering/filter.go

@@ -0,0 +1,56 @@
+// +build !nofilter
+
+package filtering
+
+import (
+	"fmt"
+	"os"
+
+	"github.com/araddon/qlbridge/datasource"
+	"github.com/araddon/qlbridge/expr"
+	"github.com/araddon/qlbridge/value"
+	"github.com/araddon/qlbridge/vm"
+)
+
+type ExprValue struct {
+	node       expr.Node
+	expression string
+}
+
+func (e *ExprValue) String() string {
+	if e.node != nil {
+		return e.node.String()
+	} else {
+		return "<Empty Expression>"
+	}
+}
+func (e *ExprValue) Set(value string) error {
+	if value == "" {
+		e.node = nil
+		e.expression = value
+		return nil
+	}
+	ast, err := expr.ParseExpression(value)
+	if err != nil {
+		return err
+	}
+	e.node = ast
+	e.expression = value
+	return nil
+}
+
+func (e *ExprValue) Validate(line map[string]interface{}) bool {
+	if e.node == nil {
+		return true
+	}
+	context := datasource.NewContextSimpleNative(line)
+	val, ok := vm.Eval(context, e.node)
+	if !ok || val == nil { // errors when evaluating
+		return false
+	}
+	if bv, isBool := val.(value.BoolValue); isBool {
+		return bv.Val()
+	}
+	fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean")
+	return false
+}

+ 17 - 0
filtering/filter_fake.go

@@ -0,0 +1,17 @@
+// +build nofilter
+
+package filtering
+
+type ExprValue struct {
+}
+
+func (e *ExprValue) String() string {
+	return "<filtering disabled>"
+}
+
+func (e *ExprValue) Set(value string) error {
+	return nil
+}
+func (e *ExprValue) Validate(line map[string]interface{}) bool {
+	return true
+}

+ 22 - 1
formatter/format.go

@@ -7,6 +7,7 @@ import (
 	"text/template"
 	"time"
 
+	"github.com/mgutz/ansi"
 	"gopkg.in/mcuadros/go-syslog.v2/format"
 	"gopkg.in/mgo.v2/bson"
 )
@@ -25,11 +26,31 @@ func init() {
 		"rfc822": func(dt time.Time) string {
 			return dt.Format(time.RFC822)
 		},
+		"sevName": func(s int) string {
+			names := []string{"emerg ", "alert ", "crit  ", "err   ", "warn  ", "notice", "info  ", "dbg   "}
+			switch {
+			case s < 2: // emerg..alert
+				return ansi.Color(names[s], "red+b")
+			case s < 4: // emerg..err
+				return ansi.Color(names[s], "red")
+			case s < 6: // warn..notice
+				return ansi.Color(names[s], "white+b")
+			case s >= len(names):
+				return "???"
+			default:
+				return names[s]
+			}
+		},
+		"color": func(color, text string) string {
+			return ansi.Color(text, color) // slow; should use colorfunc
+		},
+		"red": ansi.ColorFunc("red+b"),
 	}
 	syslogTmpl = template.Must(template.New("syslog").Funcs(tmplFuncs).Parse(
-		"{{rfc822 (index . \"timestamp\")}} {{index . \"hostname\"}} " +
+		"{{color \"yellow\" (rfc822 (index . \"timestamp\")) }} {{index . \"hostname\"}} " +
 			"{{index . \"app_name\"}}" +
 			"{{ if (ne (index . \"proc_id\") \"-\")}}[{{index . \"proc_id\"}}]{{end}}: " +
+			"{{ sevName (index . \"severity\") }} " +
 			"{{index . \"message\"}}",
 	))
 }

+ 21 - 3
hub.go

@@ -6,6 +6,7 @@ import (
 	"os"
 	"time"
 
+	"git.lattuga.net/boyska/circolog/filtering"
 	"gopkg.in/mcuadros/go-syslog.v2/format"
 )
 
@@ -34,6 +35,7 @@ const (
 	CommandClear       = iota
 	CommandPauseToggle = iota
 	CommandStatus      = iota
+	CommandNewFilter   = iota
 )
 
 // An HubFullCommand is a Command, complete with arguments
@@ -49,8 +51,9 @@ type CommandResponse struct {
 
 // StatusResponse is an implementation of a CommandResponse
 type StatusResponse struct {
-	Size      int  `json:"size"`
-	IsRunning bool `json:"running"`
+	Size      int    `json:"size"`
+	IsRunning bool   `json:"running"`
+	Filter    string `json:"filter"`
 }
 
 // Status return "paused/unpaused" based on isRunning value
@@ -117,6 +120,7 @@ func (h *Hub) register(cl Client) {
 // Run is hub main loop; keeps everything going
 func (h *Hub) Run() {
 	active := true
+	var filter filtering.ExprValue
 	for {
 		select {
 		case cl := <-h.Register:
@@ -128,7 +132,7 @@ func (h *Hub) Run() {
 				delete(h.clients, cl)
 			}
 		case msg := <-h.LogMessages:
-			if active == true {
+			if active == true && filter.Validate(msg) {
 				h.circbuf.Value = msg
 				h.circbuf = h.circbuf.Next()
 				for client := range h.clients {
@@ -156,8 +160,22 @@ func (h *Hub) Run() {
 				var resp = StatusResponse{
 					Size:      h.circbuf.Len(),
 					IsRunning: active,
+					Filter:    filter.String(),
 				}
 				cmd.Response <- CommandResponse{Value: resp}
+			case CommandNewFilter:
+				if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
+					cmd.Response <- CommandResponse{Value: map[string]interface{}{
+						"success": false,
+						"error":   err.Error(),
+					}}
+				} else {
+					cmd.Response <- CommandResponse{Value: map[string]interface{}{
+						"success": true,
+						"error":   "",
+					}}
+				}
+
 			}
 		}
 	}