From ec3934501a785231f2f106c289080d3b17ae92d1 Mon Sep 17 00:00:00 2001 From: boyska Date: Mon, 25 Mar 2019 02:46:03 +0100 Subject: [PATCH] filtering based on reader-Messages --- cmd/circolog-tail/main.go | 4 ++-- cmd/circologd/http_log.go | 5 +++-- data/data.go | 32 ++++++++++++++++++++++++++++++++ filtering/filter.go | 15 ++------------- formatter/format.go | 8 ++++---- hub.go | 38 +++++--------------------------------- 6 files changed, 48 insertions(+), 54 deletions(-) create mode 100644 data/data.go diff --git a/cmd/circolog-tail/main.go b/cmd/circolog-tail/main.go index eb446ae..4d13950 100644 --- a/cmd/circolog-tail/main.go +++ b/cmd/circolog-tail/main.go @@ -11,7 +11,7 @@ import ( "strconv" "time" - "git.lattuga.net/boyska/circolog" + "git.lattuga.net/boyska/circolog/data" "git.lattuga.net/boyska/circolog/filtering" "git.lattuga.net/boyska/circolog/formatter" "github.com/gorilla/websocket" @@ -112,7 +112,7 @@ func main() { log.Println("close:", err) return } - var parsed circolog.Message + var parsed data.Message if err := bson.Unmarshal(serialized, &parsed); err != nil { log.Println("invalid BSON", err) continue diff --git a/cmd/circologd/http_log.go b/cmd/circologd/http_log.go index be09f19..712d15a 100644 --- a/cmd/circologd/http_log.go +++ b/cmd/circologd/http_log.go @@ -9,6 +9,7 @@ import ( "time" "git.lattuga.net/boyska/circolog" + "git.lattuga.net/boyska/circolog/data" "git.lattuga.net/boyska/circolog/formatter" "github.com/gorilla/websocket" ) @@ -99,7 +100,7 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc { opts.Nofollow = true client := circolog.Client{ - Messages: make(chan circolog.Message, 20), + Messages: make(chan data.Message, 20), Options: opts, } hub.Register <- client @@ -140,7 +141,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc { return } client := circolog.Client{ - Messages: make(chan circolog.Message, 20), + Messages: make(chan data.Message, 20), Options: opts, } hub.Register <- client diff --git a/data/data.go b/data/data.go new file mode 100644 index 0000000..314fdb1 --- /dev/null +++ b/data/data.go @@ -0,0 +1,32 @@ +package data + +import "gopkg.in/mcuadros/go-syslog.v2/format" + +// Message is currently an alias for format.Logparts, but this is only temporary; sooner or later, a real struct will be used +// The advantage of having an explicit Message is to clear out what data we are sending to circolog "readers" +// This is not necessarily (and not in practice) the same structure that we receive from logging programs +type Message format.LogParts + +// LogEntryToMessage converts messages received from writers to the format we promise to readers +func LogEntryToMessage(orig format.LogParts) Message { + m := Message{} + if orig["version"] == 1 { // RFC5424 + m["prog"] = orig["app_name"] + m["client"] = orig["client"] + m["host"] = orig["hostname"] + m["proc_id"] = orig["proc_id"] + m["msg"] = orig["message"] + m["facility"] = orig["facility"] + m["time"] = orig["timestamp"] + m["sev"] = orig["severity"] + } else { //RFC3164 + m["prog"] = orig["tag"] + m["client"] = orig["client"] + m["host"] = orig["hostname"] + m["msg"] = orig["content"] + m["sev"] = orig["severity"] + m["time"] = orig["timestamp"] + m["proc_id"] = "-" + } + return m +} diff --git a/filtering/filter.go b/filtering/filter.go index 10889b7..25b26bd 100644 --- a/filtering/filter.go +++ b/filtering/filter.go @@ -6,6 +6,7 @@ import ( "fmt" "os" + "git.lattuga.net/boyska/circolog/data" "github.com/araddon/qlbridge/datasource" "github.com/araddon/qlbridge/expr" "github.com/araddon/qlbridge/value" @@ -40,11 +41,10 @@ func (e *ExprValue) Set(value string) error { } // Validate answers the question whether to include a log line or not. -func (e *ExprValue) Validate(logLine map[string]interface{}) bool { +func (e *ExprValue) Validate(line data.Message) bool { if e.node == nil { return true } - line := translateMap(logLine) context := datasource.NewContextSimpleNative(line) val, ok := vm.Eval(context, e.node) if !ok || val == nil { // errors when evaluating @@ -56,14 +56,3 @@ func (e *ExprValue) Validate(logLine map[string]interface{}) bool { fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean") return false } - -func translateMap(lineInput map[string]interface{}) map[string]interface{} { - lineOutput := make(map[string]interface{}) - lineOutput["prog"] = lineInput["app_name"] - lineOutput["msg"] = lineInput["message"] - lineOutput["facility"] = lineInput["facility"] - lineOutput["host"] = lineInput["hostname"] - lineOutput["time"] = lineInput["timestamp"] - lineOutput["sev"] = lineInput["severity"] - return lineOutput -} diff --git a/formatter/format.go b/formatter/format.go index 3396378..a037c1c 100644 --- a/formatter/format.go +++ b/formatter/format.go @@ -8,13 +8,13 @@ import ( "text/template" "time" - "git.lattuga.net/boyska/circolog" + "git.lattuga.net/boyska/circolog/data" "github.com/mgutz/ansi" "gopkg.in/mgo.v2/bson" ) // Formatter is an interface, so that multiple implementations can exist -type Formatter func(circolog.Message) string +type Formatter func(data.Message) string var tmplFuncs template.FuncMap var syslogTmpl *template.Template @@ -87,7 +87,7 @@ func (rf Format) String() string { return "" } -func (rf Format) WriteFormatted(w io.Writer, msg circolog.Message) error { +func (rf Format) WriteFormatted(w io.Writer, msg data.Message) error { return WriteFormatted(w, rf, msg) } @@ -110,7 +110,7 @@ const ( FormatBSON = iota ) -func WriteFormatted(w io.Writer, f Format, msg circolog.Message) error { +func WriteFormatted(w io.Writer, f Format, msg data.Message) error { switch f { case FormatSyslog: return syslogTmpl.Execute(w, msg) diff --git a/hub.go b/hub.go index a59bd84..0894d3b 100644 --- a/hub.go +++ b/hub.go @@ -6,19 +6,15 @@ import ( "os" "time" + "git.lattuga.net/boyska/circolog/data" "git.lattuga.net/boyska/circolog/filtering" "gopkg.in/mcuadros/go-syslog.v2/format" ) -// Message is currently an alias for format.Logparts, but this is only temporary; sooner or later, a real struct will be used -// The advantage of having an explicit Message is to clear out what data we are sending to circolog "readers" -// This is not necessarily (and not in practice) the same structure that we receive from logging programs -type Message format.LogParts - // Client represent a client connected via websocket. Its most important field is the messages channel, where // new messages are sent. type Client struct { - Messages chan Message // only hub should write/close this + Messages chan data.Message // only hub should write/close this Options ClientOptions } @@ -106,7 +102,7 @@ func (h *Hub) register(cl Client) { item := buf.Value if item != nil { select { // send with short timeout - case cl.Messages <- item.(Message): + case cl.Messages <- item.(data.Message): break case <-time.After(500 * time.Millisecond): close(cl.Messages) @@ -123,30 +119,6 @@ func (h *Hub) register(cl Client) { } } -// logEntryToMessage converts messages received from writers to the format we promise to readers -func logEntryToMessage(orig format.LogParts) Message { - m := Message{} - if orig["version"] == 1 { // RFC5424 - m["prog"] = orig["app_name"] - m["client"] = orig["client"] - m["host"] = orig["hostname"] - m["proc_id"] = orig["proc_id"] - m["msg"] = orig["message"] - m["facility"] = orig["facility"] - m["time"] = orig["timestamp"] - m["sev"] = orig["severity"] - } else { //RFC3164 - m["prog"] = orig["tag"] - m["client"] = orig["client"] - m["host"] = orig["hostname"] - m["msg"] = orig["content"] - m["sev"] = orig["severity"] - m["time"] = orig["timestamp"] - m["proc_id"] = "-" - } - return m -} - // Run is hub main loop; keeps everything going func (h *Hub) Run() { active := true @@ -162,8 +134,8 @@ func (h *Hub) Run() { delete(h.clients, cl) } case msg := <-h.LogMessages: - if active == true && filter.Validate(msg) { - newmsg := logEntryToMessage(msg) + newmsg := data.LogEntryToMessage(msg) + if active == true && filter.Validate(newmsg) { h.circbuf.Value = newmsg h.circbuf = h.circbuf.Next() for client := range h.clients {