233 lines
6 KiB
Go
233 lines
6 KiB
Go
package circolog
|
|
|
|
import (
|
|
"container/ring"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"git.lattuga.net/boyska/circolog/filtering"
|
|
"gopkg.in/mcuadros/go-syslog.v2/format"
|
|
)
|
|
|
|
// Message is currently an alias for format.Logparts, but this is only temporary; sooner or later, a real struct will be used
|
|
// The advantage of having an explicit Message is to clear out what data we are sending to circolog "readers"
|
|
// This is not necessarily (and not in practice) the same structure that we receive from logging programs
|
|
type Message format.LogParts
|
|
|
|
// Client represent a client connected via websocket. Its most important field is the messages channel, where
|
|
// new messages are sent.
|
|
type Client struct {
|
|
Messages chan Message // only hub should write/close this
|
|
Options ClientOptions
|
|
}
|
|
|
|
// ClientOptions is a struct containing connection options for every reader
|
|
type ClientOptions struct {
|
|
BacklogLength int // how many past messages the client wants to receive upon connection
|
|
Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
|
|
}
|
|
|
|
// The Hub is the central "registry"; it keeps both the data storage and clients notifications
|
|
//
|
|
// The channel "register" and "unregister" can be seen as "command"
|
|
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
|
|
// has "options", such as Nofollow, to explain the Hub what should be given
|
|
|
|
// An HubCommand is an "enum" of different commands
|
|
type HubCommand int
|
|
|
|
const (
|
|
CommandClear = iota
|
|
CommandPauseToggle = iota
|
|
CommandStatus = iota
|
|
CommandNewFilter = iota
|
|
)
|
|
|
|
// An HubFullCommand is a Command, complete with arguments
|
|
type HubFullCommand struct {
|
|
Command HubCommand
|
|
Parameters map[string]interface{}
|
|
Response chan CommandResponse
|
|
}
|
|
|
|
type CommandResponse struct {
|
|
Value interface{}
|
|
}
|
|
|
|
// StatusResponse is an implementation of a CommandResponse
|
|
type StatusResponse struct {
|
|
Size int `json:"size"`
|
|
IsRunning bool `json:"running"`
|
|
Filter string `json:"filter"`
|
|
}
|
|
|
|
// Status return "paused/unpaused" based on isRunning value
|
|
func (r StatusResponse) Status() string {
|
|
if r.IsRunning {
|
|
return "unpaused"
|
|
}
|
|
return "paused"
|
|
}
|
|
|
|
type Hub struct {
|
|
Register chan Client
|
|
Unregister chan Client
|
|
LogMessages chan format.LogParts
|
|
Commands chan HubFullCommand
|
|
|
|
clients map[Client]bool
|
|
circbuf *ring.Ring
|
|
}
|
|
|
|
// NewHub creates an empty hub
|
|
func NewHub(ringBufSize int) Hub {
|
|
return Hub{clients: make(map[Client]bool),
|
|
Register: make(chan Client),
|
|
Unregister: make(chan Client),
|
|
LogMessages: make(chan format.LogParts),
|
|
Commands: make(chan HubFullCommand),
|
|
circbuf: ring.New(ringBufSize),
|
|
}
|
|
}
|
|
|
|
func (h *Hub) register(cl Client) {
|
|
if _, ok := h.clients[cl]; !ok {
|
|
if !cl.Options.Nofollow { // we won't need it in future
|
|
h.clients[cl] = true
|
|
}
|
|
|
|
howmany := cl.Options.BacklogLength
|
|
if howmany > h.circbuf.Len() || howmany == -1 {
|
|
howmany = h.circbuf.Len()
|
|
}
|
|
buf := h.circbuf.Move(-howmany)
|
|
for i := 0; i < howmany; i++ {
|
|
item := buf.Value
|
|
if item != nil {
|
|
select { // send with short timeout
|
|
case cl.Messages <- item.(Message):
|
|
break
|
|
case <-time.After(500 * time.Millisecond):
|
|
close(cl.Messages)
|
|
return
|
|
}
|
|
}
|
|
|
|
buf = buf.Next()
|
|
}
|
|
|
|
if cl.Options.Nofollow {
|
|
close(cl.Messages)
|
|
}
|
|
}
|
|
}
|
|
|
|
// logEntryToMessage converts messages received from writers to the format we promise to readers
|
|
func logEntryToMessage(orig format.LogParts) Message {
|
|
m := Message{}
|
|
if orig["version"] == 1 { // RFC5424
|
|
m["prog"] = orig["app_name"]
|
|
m["client"] = orig["client"]
|
|
m["host"] = orig["hostname"]
|
|
m["proc_id"] = orig["proc_id"]
|
|
m["msg"] = orig["message"]
|
|
m["facility"] = orig["facility"]
|
|
m["time"] = orig["timestamp"]
|
|
m["sev"] = orig["severity"]
|
|
} else { //RFC3164
|
|
m["prog"] = orig["tag"]
|
|
m["client"] = orig["client"]
|
|
m["host"] = orig["hostname"]
|
|
m["msg"] = orig["content"]
|
|
m["sev"] = orig["severity"]
|
|
m["time"] = orig["timestamp"]
|
|
m["proc_id"] = "-"
|
|
}
|
|
return m
|
|
}
|
|
|
|
// Run is hub main loop; keeps everything going
|
|
func (h *Hub) Run() {
|
|
active := true
|
|
var filter filtering.ExprValue
|
|
for {
|
|
select {
|
|
case cl := <-h.Register:
|
|
h.register(cl)
|
|
case cl := <-h.Unregister:
|
|
_, ok := h.clients[cl]
|
|
if ok {
|
|
close(cl.Messages)
|
|
delete(h.clients, cl)
|
|
}
|
|
case msg := <-h.LogMessages:
|
|
if active == true && filter.Validate(msg) {
|
|
newmsg := logEntryToMessage(msg)
|
|
h.circbuf.Value = newmsg
|
|
h.circbuf = h.circbuf.Next()
|
|
for client := range h.clients {
|
|
select { // send without blocking
|
|
case client.Messages <- newmsg:
|
|
break
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
case cmd := <-h.Commands:
|
|
switch cmd.Command {
|
|
case CommandClear:
|
|
h.clear()
|
|
cmd.Response <- CommandResponse{Value: true}
|
|
case CommandPauseToggle:
|
|
togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
|
|
if active {
|
|
fmt.Print("un")
|
|
}
|
|
fmt.Println("paused")
|
|
cmd.Response <- CommandResponse{Value: active}
|
|
case CommandStatus:
|
|
var resp = StatusResponse{
|
|
Size: h.circbuf.Len(),
|
|
IsRunning: active,
|
|
Filter: filter.String(),
|
|
}
|
|
cmd.Response <- CommandResponse{Value: resp}
|
|
case CommandNewFilter:
|
|
if err := filter.Set(cmd.Parameters["where"].(string)); err != nil {
|
|
cmd.Response <- CommandResponse{Value: map[string]interface{}{
|
|
"success": false,
|
|
"error": err.Error(),
|
|
}}
|
|
} else {
|
|
cmd.Response <- CommandResponse{Value: map[string]interface{}{
|
|
"success": true,
|
|
"error": "",
|
|
}}
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func togglePause(waitTime time.Duration, status *bool) {
|
|
if waitTime != 0 {
|
|
go func() {
|
|
time.Sleep(waitTime)
|
|
fmt.Fprintln(os.Stderr, "toggling again")
|
|
togglePause(0, status)
|
|
}()
|
|
}
|
|
*status = !*status
|
|
}
|
|
|
|
// Clear removes all elements from the buffer
|
|
func (h *Hub) clear() {
|
|
buf := h.circbuf
|
|
for i := 0; i < buf.Len(); i++ {
|
|
buf.Value = nil
|
|
buf = buf.Next()
|
|
}
|
|
}
|