1
0
Fork 0
forked from boyska/circolog

circolog.Message: for readers

refs #14
This commit is contained in:
boyska 2019-03-24 19:54:41 +01:00
parent 85ccd65543
commit aeceda5caa
4 changed files with 38 additions and 17 deletions

View file

@ -11,12 +11,12 @@ import (
"strconv" "strconv"
"time" "time"
"git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/filtering" "git.lattuga.net/boyska/circolog/filtering"
"git.lattuga.net/boyska/circolog/formatter" "git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
isatty "github.com/mattn/go-isatty" isatty "github.com/mattn/go-isatty"
"github.com/mgutz/ansi" "github.com/mgutz/ansi"
"gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
) )
@ -112,7 +112,7 @@ func main() {
log.Println("close:", err) log.Println("close:", err)
return return
} }
var parsed format.LogParts var parsed circolog.Message
if err := bson.Unmarshal(serialized, &parsed); err != nil { if err := bson.Unmarshal(serialized, &parsed); err != nil {
log.Println("invalid BSON", err) log.Println("invalid BSON", err)
continue continue

View file

@ -11,7 +11,6 @@ import (
"git.lattuga.net/boyska/circolog" "git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/formatter" "git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
) )
func setupHTTP(hub circolog.Hub) *http.ServeMux { func setupHTTP(hub circolog.Hub) *http.ServeMux {
@ -100,7 +99,7 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
opts.Nofollow = true opts.Nofollow = true
client := circolog.Client{ client := circolog.Client{
Messages: make(chan format.LogParts, 20), Messages: make(chan circolog.Message, 20),
Options: opts, Options: opts,
} }
hub.Register <- client hub.Register <- client
@ -141,7 +140,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
return return
} }
client := circolog.Client{ client := circolog.Client{
Messages: make(chan format.LogParts, 20), Messages: make(chan circolog.Message, 20),
Options: opts, Options: opts,
} }
hub.Register <- client hub.Register <- client

View file

@ -8,13 +8,13 @@ import (
"text/template" "text/template"
"time" "time"
"git.lattuga.net/boyska/circolog"
"github.com/mgutz/ansi" "github.com/mgutz/ansi"
"gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
) )
// Formatter is an interface, so that multiple implementations can exist // Formatter is an interface, so that multiple implementations can exist
type Formatter func(format.LogParts) string type Formatter func(circolog.Message) string
var tmplFuncs template.FuncMap var tmplFuncs template.FuncMap
var syslogTmpl *template.Template var syslogTmpl *template.Template
@ -56,11 +56,11 @@ func init() {
}, },
} }
syslogTmpl = template.Must(template.New("syslog").Funcs(tmplFuncs).Parse( syslogTmpl = template.Must(template.New("syslog").Funcs(tmplFuncs).Parse(
"{{color \"yellow\" (rfc822 (index . \"timestamp\")) }} {{index . \"hostname\"}} " + "{{color \"yellow\" (rfc822 (index . \"time\")) }} {{index . \"hostname\"}} " +
"{{index . \"app_name\" | autoColor}}" + "{{index . \"prog\" | autoColor}}" +
"{{ if (ne (index . \"proc_id\") \"-\")}}[{{index . \"proc_id\"}}]{{end}}: " + "{{ if (ne (index . \"proc_id\") \"-\")}}[{{index . \"proc_id\"}}]{{end}}: " +
"{{ sevName (index . \"severity\") }} " + "{{ sevName (index . \"sev\") }} " +
"{{index . \"message\"}}", "{{index . \"msg\"}}",
)) ))
} }
@ -87,7 +87,7 @@ func (rf Format) String() string {
return "" return ""
} }
func (rf Format) WriteFormatted(w io.Writer, msg format.LogParts) error { func (rf Format) WriteFormatted(w io.Writer, msg circolog.Message) error {
return WriteFormatted(w, rf, msg) return WriteFormatted(w, rf, msg)
} }
@ -110,7 +110,7 @@ const (
FormatBSON = iota FormatBSON = iota
) )
func WriteFormatted(w io.Writer, f Format, msg format.LogParts) error { func WriteFormatted(w io.Writer, f Format, msg circolog.Message) error {
switch f { switch f {
case FormatSyslog: case FormatSyslog:
return syslogTmpl.Execute(w, msg) return syslogTmpl.Execute(w, msg)

30
hub.go
View file

@ -10,13 +10,19 @@ import (
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
) )
// Message is currently an alias for format.Logparts, but this is only temporary; sooner or later, a real struct will be used
// The advantage of having an explicit Message is to clear out what data we are sending to circolog "readers"
// This is not necessarily (and not in practice) the same structure that we receive from logging programs
type Message format.LogParts
// Client represent a client connected via websocket. Its most important field is the messages channel, where // Client represent a client connected via websocket. Its most important field is the messages channel, where
// new messages are sent. // new messages are sent.
type Client struct { type Client struct {
Messages chan format.LogParts // only hub should write/close this Messages chan Message // only hub should write/close this
Options ClientOptions Options ClientOptions
} }
// ClientOptions is a struct containing connection options for every reader
type ClientOptions struct { type ClientOptions struct {
BacklogLength int // how many past messages the client wants to receive upon connection BacklogLength int // how many past messages the client wants to receive upon connection
Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
@ -100,7 +106,7 @@ func (h *Hub) register(cl Client) {
item := buf.Value item := buf.Value
if item != nil { if item != nil {
select { // send with short timeout select { // send with short timeout
case cl.Messages <- item.(format.LogParts): case cl.Messages <- item.(Message):
break break
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
close(cl.Messages) close(cl.Messages)
@ -117,6 +123,21 @@ func (h *Hub) register(cl Client) {
} }
} }
// logEntryToMessage converts messages received from writers to the format we promise to readers
func logEntryToMessage(orig format.LogParts) Message {
m := Message{}
// it currently only supports RFC5424. TODO: support RFC3164
m["prog"] = orig["app_name"]
m["client"] = orig["client"]
m["hostname"] = orig["hostname"]
m["proc_id"] = orig["proc_id"]
m["msg"] = orig["message"]
m["facility"] = orig["facility"]
m["time"] = orig["timestamp"]
m["sev"] = orig["severity"]
return m
}
// Run is hub main loop; keeps everything going // Run is hub main loop; keeps everything going
func (h *Hub) Run() { func (h *Hub) Run() {
active := true active := true
@ -133,11 +154,12 @@ func (h *Hub) Run() {
} }
case msg := <-h.LogMessages: case msg := <-h.LogMessages:
if active == true && filter.Validate(msg) { if active == true && filter.Validate(msg) {
h.circbuf.Value = msg newmsg := logEntryToMessage(msg)
h.circbuf.Value = newmsg
h.circbuf = h.circbuf.Next() h.circbuf = h.circbuf.Next()
for client := range h.clients { for client := range h.clients {
select { // send without blocking select { // send without blocking
case client.Messages <- msg: case client.Messages <- newmsg:
break break
default: default:
break break