filtering based on reader-Messages

This commit is contained in:
boyska 2019-03-25 02:46:03 +01:00
parent fefd2d7e5c
commit ec3934501a
6 changed files with 48 additions and 54 deletions

View file

@ -11,7 +11,7 @@ import (
"strconv"
"time"
"git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/data"
"git.lattuga.net/boyska/circolog/filtering"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket"
@ -112,7 +112,7 @@ func main() {
log.Println("close:", err)
return
}
var parsed circolog.Message
var parsed data.Message
if err := bson.Unmarshal(serialized, &parsed); err != nil {
log.Println("invalid BSON", err)
continue

View file

@ -9,6 +9,7 @@ import (
"time"
"git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/data"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket"
)
@ -99,7 +100,7 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
opts.Nofollow = true
client := circolog.Client{
Messages: make(chan circolog.Message, 20),
Messages: make(chan data.Message, 20),
Options: opts,
}
hub.Register <- client
@ -140,7 +141,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
return
}
client := circolog.Client{
Messages: make(chan circolog.Message, 20),
Messages: make(chan data.Message, 20),
Options: opts,
}
hub.Register <- client

32
data/data.go Normal file
View file

@ -0,0 +1,32 @@
package data
import "gopkg.in/mcuadros/go-syslog.v2/format"
// Message is currently an alias for format.Logparts, but this is only temporary; sooner or later, a real struct will be used
// The advantage of having an explicit Message is to clear out what data we are sending to circolog "readers"
// This is not necessarily (and not in practice) the same structure that we receive from logging programs
type Message format.LogParts
// LogEntryToMessage converts messages received from writers to the format we promise to readers
func LogEntryToMessage(orig format.LogParts) Message {
m := Message{}
if orig["version"] == 1 { // RFC5424
m["prog"] = orig["app_name"]
m["client"] = orig["client"]
m["host"] = orig["hostname"]
m["proc_id"] = orig["proc_id"]
m["msg"] = orig["message"]
m["facility"] = orig["facility"]
m["time"] = orig["timestamp"]
m["sev"] = orig["severity"]
} else { //RFC3164
m["prog"] = orig["tag"]
m["client"] = orig["client"]
m["host"] = orig["hostname"]
m["msg"] = orig["content"]
m["sev"] = orig["severity"]
m["time"] = orig["timestamp"]
m["proc_id"] = "-"
}
return m
}

View file

@ -6,6 +6,7 @@ import (
"fmt"
"os"
"git.lattuga.net/boyska/circolog/data"
"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/expr"
"github.com/araddon/qlbridge/value"
@ -40,11 +41,10 @@ func (e *ExprValue) Set(value string) error {
}
// Validate answers the question whether to include a log line or not.
func (e *ExprValue) Validate(logLine map[string]interface{}) bool {
func (e *ExprValue) Validate(line data.Message) bool {
if e.node == nil {
return true
}
line := translateMap(logLine)
context := datasource.NewContextSimpleNative(line)
val, ok := vm.Eval(context, e.node)
if !ok || val == nil { // errors when evaluating
@ -56,14 +56,3 @@ func (e *ExprValue) Validate(logLine map[string]interface{}) bool {
fmt.Fprintln(os.Stderr, "WARNING: The 'where' expression doesn't return a boolean")
return false
}
func translateMap(lineInput map[string]interface{}) map[string]interface{} {
lineOutput := make(map[string]interface{})
lineOutput["prog"] = lineInput["app_name"]
lineOutput["msg"] = lineInput["message"]
lineOutput["facility"] = lineInput["facility"]
lineOutput["host"] = lineInput["hostname"]
lineOutput["time"] = lineInput["timestamp"]
lineOutput["sev"] = lineInput["severity"]
return lineOutput
}

View file

@ -8,13 +8,13 @@ import (
"text/template"
"time"
"git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/data"
"github.com/mgutz/ansi"
"gopkg.in/mgo.v2/bson"
)
// Formatter is an interface, so that multiple implementations can exist
type Formatter func(circolog.Message) string
type Formatter func(data.Message) string
var tmplFuncs template.FuncMap
var syslogTmpl *template.Template
@ -87,7 +87,7 @@ func (rf Format) String() string {
return ""
}
func (rf Format) WriteFormatted(w io.Writer, msg circolog.Message) error {
func (rf Format) WriteFormatted(w io.Writer, msg data.Message) error {
return WriteFormatted(w, rf, msg)
}
@ -110,7 +110,7 @@ const (
FormatBSON = iota
)
func WriteFormatted(w io.Writer, f Format, msg circolog.Message) error {
func WriteFormatted(w io.Writer, f Format, msg data.Message) error {
switch f {
case FormatSyslog:
return syslogTmpl.Execute(w, msg)

38
hub.go
View file

@ -6,19 +6,15 @@ import (
"os"
"time"
"git.lattuga.net/boyska/circolog/data"
"git.lattuga.net/boyska/circolog/filtering"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
// Message is currently an alias for format.Logparts, but this is only temporary; sooner or later, a real struct will be used
// The advantage of having an explicit Message is to clear out what data we are sending to circolog "readers"
// This is not necessarily (and not in practice) the same structure that we receive from logging programs
type Message format.LogParts
// Client represent a client connected via websocket. Its most important field is the messages channel, where
// new messages are sent.
type Client struct {
Messages chan Message // only hub should write/close this
Messages chan data.Message // only hub should write/close this
Options ClientOptions
}
@ -106,7 +102,7 @@ func (h *Hub) register(cl Client) {
item := buf.Value
if item != nil {
select { // send with short timeout
case cl.Messages <- item.(Message):
case cl.Messages <- item.(data.Message):
break
case <-time.After(500 * time.Millisecond):
close(cl.Messages)
@ -123,30 +119,6 @@ func (h *Hub) register(cl Client) {
}
}
// logEntryToMessage converts messages received from writers to the format we promise to readers
func logEntryToMessage(orig format.LogParts) Message {
m := Message{}
if orig["version"] == 1 { // RFC5424
m["prog"] = orig["app_name"]
m["client"] = orig["client"]
m["host"] = orig["hostname"]
m["proc_id"] = orig["proc_id"]
m["msg"] = orig["message"]
m["facility"] = orig["facility"]
m["time"] = orig["timestamp"]
m["sev"] = orig["severity"]
} else { //RFC3164
m["prog"] = orig["tag"]
m["client"] = orig["client"]
m["host"] = orig["hostname"]
m["msg"] = orig["content"]
m["sev"] = orig["severity"]
m["time"] = orig["timestamp"]
m["proc_id"] = "-"
}
return m
}
// Run is hub main loop; keeps everything going
func (h *Hub) Run() {
active := true
@ -162,8 +134,8 @@ func (h *Hub) Run() {
delete(h.clients, cl)
}
case msg := <-h.LogMessages:
if active == true && filter.Validate(msg) {
newmsg := logEntryToMessage(msg)
newmsg := data.LogEntryToMessage(msg)
if active == true && filter.Validate(newmsg) {
h.circbuf.Value = newmsg
h.circbuf = h.circbuf.Next()
for client := range h.clients {