2018-08-23 01:21:53 +02:00
package circolog
2018-08-22 23:51:59 +02:00
import (
2018-08-23 00:56:27 +02:00
"container/ring"
2018-12-24 18:41:06 +01:00
"fmt"
"os"
2018-08-23 02:05:50 +02:00
"time"
2018-08-22 23:51:59 +02:00
2019-03-25 02:46:03 +01:00
"git.lattuga.net/boyska/circolog/data"
2018-12-25 03:52:53 +01:00
"git.lattuga.net/boyska/circolog/filtering"
2018-08-22 23:51:59 +02:00
"gopkg.in/mcuadros/go-syslog.v2/format"
)
// Client represent a client connected via websocket. Its most important field is the messages channel, where
2018-08-23 00:56:27 +02:00
// new messages are sent.
2018-08-22 23:51:59 +02:00
type Client struct {
2019-03-25 02:46:03 +01:00
Messages chan data . Message // only hub should write/close this
2018-11-08 19:25:40 +01:00
Options ClientOptions
}
2019-03-24 19:54:41 +01:00
// ClientOptions is a struct containing connection options for every reader
2018-11-08 19:25:40 +01:00
type ClientOptions struct {
BacklogLength int // how many past messages the client wants to receive upon connection
Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
2018-08-22 23:51:59 +02:00
}
2018-08-23 00:56:27 +02:00
// The Hub is the central "registry"; it keeps both the data storage and clients notifications
//
// The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given
2018-11-10 18:21:42 +01:00
// An HubCommand is an "enum" of different commands
type HubCommand int
const (
CommandClear = iota
CommandPauseToggle = iota
2018-12-25 01:32:54 +01:00
CommandStatus = iota
2018-12-25 03:52:53 +01:00
CommandNewFilter = iota
2018-11-10 18:21:42 +01:00
)
// An HubFullCommand is a Command, complete with arguments
type HubFullCommand struct {
2018-12-24 18:41:06 +01:00
Command HubCommand
Parameters map [ string ] interface { }
Response chan CommandResponse
2018-11-10 18:21:42 +01:00
}
2019-01-03 11:15:30 +01:00
2018-11-11 19:10:53 +01:00
type CommandResponse struct {
Value interface { }
}
2018-11-10 18:21:42 +01:00
2019-01-03 11:15:30 +01:00
// StatusResponse is an implementation of a CommandResponse
type StatusResponse struct {
2019-01-03 12:23:20 +01:00
Size int ` json:"size" `
IsRunning bool ` json:"running" `
Filter string ` json:"filter" `
2019-01-03 11:15:30 +01:00
}
// Status return "paused/unpaused" based on isRunning value
func ( r StatusResponse ) Status ( ) string {
if r . IsRunning {
return "unpaused"
}
return "paused"
}
2018-08-22 23:51:59 +02:00
type Hub struct {
Register chan Client
Unregister chan Client
2018-08-23 00:56:27 +02:00
LogMessages chan format . LogParts
2018-11-10 18:21:42 +01:00
Commands chan HubFullCommand
2018-08-23 00:56:27 +02:00
clients map [ Client ] bool
circbuf * ring . Ring
2018-08-22 23:51:59 +02:00
}
// NewHub creates an empty hub
2018-08-23 00:56:27 +02:00
func NewHub ( ringBufSize int ) Hub {
2018-08-22 23:51:59 +02:00
return Hub { clients : make ( map [ Client ] bool ) ,
Register : make ( chan Client ) ,
Unregister : make ( chan Client ) ,
2018-08-23 00:56:27 +02:00
LogMessages : make ( chan format . LogParts ) ,
2018-11-10 18:21:42 +01:00
Commands : make ( chan HubFullCommand ) ,
2018-08-23 00:56:27 +02:00
circbuf : ring . New ( ringBufSize ) ,
2018-08-22 23:51:59 +02:00
}
}
2018-08-23 12:25:07 +02:00
func ( h * Hub ) register ( cl Client ) {
if _ , ok := h . clients [ cl ] ; ! ok {
2018-11-08 19:25:40 +01:00
if ! cl . Options . Nofollow { // we won't need it in future
2018-08-23 12:25:07 +02:00
h . clients [ cl ] = true
}
2018-11-08 19:37:03 +01:00
howmany := cl . Options . BacklogLength
if howmany > h . circbuf . Len ( ) || howmany == - 1 {
howmany = h . circbuf . Len ( )
}
buf := h . circbuf . Move ( - howmany )
for i := 0 ; i < howmany ; i ++ {
item := buf . Value
if item != nil {
2018-08-23 12:25:07 +02:00
select { // send with short timeout
2019-03-25 02:46:03 +01:00
case cl . Messages <- item . ( data . Message ) :
2018-08-23 12:25:07 +02:00
break
case <- time . After ( 500 * time . Millisecond ) :
2018-11-08 19:37:03 +01:00
close ( cl . Messages )
return
2018-08-23 12:25:07 +02:00
}
}
2018-11-08 19:37:03 +01:00
buf = buf . Next ( )
}
2018-11-08 19:25:40 +01:00
if cl . Options . Nofollow {
2018-08-23 12:25:07 +02:00
close ( cl . Messages )
}
}
}
2018-08-23 00:56:27 +02:00
// Run is hub main loop; keeps everything going
2018-08-22 23:51:59 +02:00
func ( h * Hub ) Run ( ) {
2018-11-10 18:21:42 +01:00
active := true
2018-12-25 03:52:53 +01:00
var filter filtering . ExprValue
2018-08-22 23:51:59 +02:00
for {
select {
case cl := <- h . Register :
2018-08-23 12:25:07 +02:00
h . register ( cl )
2018-08-22 23:51:59 +02:00
case cl := <- h . Unregister :
_ , ok := h . clients [ cl ]
if ok {
2018-08-23 00:56:27 +02:00
close ( cl . Messages )
2018-08-22 23:51:59 +02:00
delete ( h . clients , cl )
}
2018-08-23 00:56:27 +02:00
case msg := <- h . LogMessages :
2019-03-25 02:46:03 +01:00
newmsg := data . LogEntryToMessage ( msg )
if active == true && filter . Validate ( newmsg ) {
2019-03-24 19:54:41 +01:00
h . circbuf . Value = newmsg
2018-11-10 18:21:42 +01:00
h . circbuf = h . circbuf . Next ( )
for client := range h . clients {
select { // send without blocking
2019-03-24 19:54:41 +01:00
case client . Messages <- newmsg :
2018-11-10 18:21:42 +01:00
break
default :
break
}
2018-08-23 01:14:08 +02:00
}
2018-08-22 23:51:59 +02:00
}
2018-11-10 18:21:42 +01:00
case cmd := <- h . Commands :
2018-12-25 01:32:54 +01:00
switch cmd . Command {
case CommandClear :
2018-11-10 18:21:42 +01:00
h . clear ( )
2018-12-24 15:54:22 +01:00
cmd . Response <- CommandResponse { Value : true }
2018-12-25 01:32:54 +01:00
case CommandPauseToggle :
2018-12-24 18:41:06 +01:00
togglePause ( cmd . Parameters [ "waitTime" ] . ( time . Duration ) , & active )
2018-12-25 01:27:27 +01:00
if active {
fmt . Print ( " un " )
}
fmt . Println ( "paused" )
2018-12-24 15:54:22 +01:00
cmd . Response <- CommandResponse { Value : active }
2018-12-25 01:32:54 +01:00
case CommandStatus :
2019-01-03 11:15:30 +01:00
var resp = StatusResponse {
2019-01-03 11:51:14 +01:00
Size : h . circbuf . Len ( ) ,
2019-01-03 11:15:30 +01:00
IsRunning : active ,
2019-01-03 12:23:20 +01:00
Filter : filter . String ( ) ,
2019-01-03 11:15:30 +01:00
}
cmd . Response <- CommandResponse { Value : resp }
2018-12-25 03:52:53 +01:00
case CommandNewFilter :
if err := filter . Set ( cmd . Parameters [ "where" ] . ( string ) ) ; err != nil {
cmd . Response <- CommandResponse { Value : map [ string ] interface { } {
"success" : false ,
"error" : err . Error ( ) ,
} }
} else {
cmd . Response <- CommandResponse { Value : map [ string ] interface { } {
"success" : true ,
"error" : "" ,
} }
}
2018-11-10 18:21:42 +01:00
}
2018-08-22 23:51:59 +02:00
}
}
}
2018-11-10 18:00:35 +01:00
2018-12-24 18:41:06 +01:00
func togglePause ( waitTime time . Duration , status * bool ) {
2018-12-25 01:27:27 +01:00
if waitTime != 0 {
go func ( ) {
2018-12-24 18:41:06 +01:00
time . Sleep ( waitTime )
fmt . Fprintln ( os . Stderr , "toggling again" )
2018-12-25 01:27:27 +01:00
togglePause ( 0 , status )
} ( )
2018-12-24 18:41:06 +01:00
}
* status = ! * status
}
2018-12-19 17:30:04 +01:00
// Clear removes all elements from the buffer
2018-11-10 18:21:42 +01:00
func ( h * Hub ) clear ( ) {
2018-11-10 18:00:35 +01:00
buf := h . circbuf
for i := 0 ; i < buf . Len ( ) ; i ++ {
buf . Value = nil
buf = buf . Next ( )
}
}