1
0
Fork 0
forked from boyska/circolog

Compare commits

..

No commits in common. "9ef425d827404b1673d71afad08d1a9e429869ce" and "f66e07e8738f5371db8eb2935412accb4c3839a6" have entirely different histories.

8 changed files with 49 additions and 493 deletions

View file

@ -1,9 +1,9 @@
A syslog daemon implementing circular buffer, in-memory storage. A syslog daemon implementing circular buffer, in-memory storage.
This is useful when you want to keep some (heavily detailed) log available, but you don't want to log too many This is useful when you want to keep some (heavy detailed) log available, but you don't want to log too many
things to disk. Remember: logging is useful, but can be dangerous to your users' privacy! things to disk.
On your "main" syslog, forward (part of the) messages to this one! On your "main" syslog, send some message to this one!
## Integration examples ## Integration examples
@ -27,59 +27,15 @@ and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/
## Client ## Client
`circolog` has its own client: `circolog-tail`. It is intended to resemble `tail -f` for the most basic `curl` might be enough of a client for most uses.
options; however, it will include filtering options that are common when you want to read logs, because that's
very easy when you have structured logs available.
However, one design point of circolog is to be usable without having a specific client: so the logs are
offered on both HTTP and websocket. This means that you can use `curl` if you want:
curl --unix-socket /run/circolog-query.sock localhost/ curl --unix-socket /run/circolog-query.sock localhost/
will give you everything that circologd has in memory. will give you everything that circologd has in memory
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
any websocket client supporting UNIX domain socket, so you have two options: any websocket client supporting UNIX domain socket, so you have two options:
1. Use `circolog-tail` 1. wait until I write a proper `circolog-tail` client implementing it all
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that 2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages. port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.
### HTTP URLs and parameters
When using HTTP, logs are served on `/`. Valid parameters are:
* `l`. This is the amount of lines to send. This is essentially the same as the `-n` parameter on tail
Using `l=-1` (the default) means "give me every log message that you have
* `fmt`. This selects the output format. When `fmt=json` is used, each message is returned as JSON structured
data. The format of those JSON messages is still unstable. `fmt=syslog`, the default, outputs messages using "syslog style" (RFC XXXXXX)
To use websocket, request path `/ws`. The same parameters of `/` are recognized.
## Control daemon
Circologd can be controlled, on some aspects, at run-time. It has 2 mechanisms for that: the easiest, and more
limited, is sending a signal with kill; the second, and more powerful, is a control socket, where you can give
commands to it. This control socket is just HTTP, so again `curl` is your friend. In the future a
`circolog-ctl` client will be developed.
### Pause
When circologd is paused, every new message it receives is immediately discarded. No exception. The backlog
is, however, preserved. This means that you can trigger the event that you want to investigate, pause
circolog, then analyze the logs.
Pausing might be the easiest way to make circologd only run "when needed".
When circologd resumes, no previous message is lost.
To pause circologd with signals , send a `USR1` signal to the main pid. To "resume", send a `USR1` again.
To pause with HTTP, send a `POST /pause/toggle` to your circologd control socket.
### Clear
When you clear the circologd's buffer, it will discard every message it has, but will keep collecting new
messages.
You can do that with `POST /logs/clear`

View file

@ -11,10 +11,7 @@ import (
"strconv" "strconv"
"time" "time"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson"
) )
func main() { func main() {
@ -31,7 +28,6 @@ func main() {
Path: "/ws", Path: "/ws",
} }
q := u.Query() q := u.Query()
q.Set("fmt", "bson")
if *backlogLimit >= 0 { if *backlogLimit >= 0 {
q.Set("l", strconv.Itoa(*backlogLimit)) q.Set("l", strconv.Itoa(*backlogLimit))
} }
@ -62,20 +58,12 @@ func main() {
go func() { go func() {
defer close(done) defer close(done)
for { for {
_, serialized, err := c.ReadMessage() _, message, err := c.ReadMessage()
if err != nil { if err != nil {
log.Println("close:", err) log.Println("close:", err)
return return
} }
var parsed format.LogParts fmt.Println(string(message))
if err := bson.Unmarshal(serialized, &parsed); err != nil {
log.Println("invalid YAML", err)
continue
}
if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {
log.Println("error printing", err)
}
fmt.Println()
} }
}() }()

View file

@ -1,161 +0,0 @@
package main
import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"strings"
)
var globalOpts struct {
ctlSock string
verbose bool
}
var ctl http.Client
type commandFunc func([]string) error
var cmdMap map[string]commandFunc
func init() {
cmdMap = map[string]commandFunc{
// TODO: implement set and get of config at runtime
//"set": setCmd,
//"get": getCmd,
"pause": pauseCmd,
"reload": reloadCmd,
"restart": restartCmd,
"help": helpCmd,
}
}
//func setCmd(ctlSock string, args []string) error {}
//func getCmd(ctlSock string, args []string) error {}
func parsePauseOpts(postponeTime string) (string, error) {
var waitTime int64
var err error
L := len(postponeTime)
switch unit := postponeTime[L-1]; string(unit) {
case "s":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
case "m":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60
case "h":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60
case "d":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60 * 24
case "w":
waitTime, err = strconv.ParseInt(postponeTime[:L-2], 10, 16)
if err != nil {
return "", err
}
waitTime = waitTime * 60 * 60 * 24 * 7
}
return string(waitTime), nil
}
func pauseCmd(args []string) error {
var postBody string
var err error
flagset := flag.NewFlagSet(args[0], flag.ExitOnError)
postponeTime := flagset.String("postpone", "", "How long to wait before untoggling the state")
flagset.Parse(args[1:])
if *postponeTime != "" {
postBody, err = parsePauseOpts(*postponeTime)
if err != nil {
return err
}
}
resp, err := ctl.Post("http://unix/pause/toggle", "application/octet-stream", strings.NewReader(postBody))
if globalOpts.verbose {
defer resp.Body.Close()
bodyBytes, err2 := ioutil.ReadAll(resp.Body)
if err2 != nil {
return err2
}
fmt.Println(string(bodyBytes))
}
return err
}
func reloadCmd(args []string) error {
return nil
}
func restartCmd(args []string) error {
return nil
}
func helpCmd(args []string) error {
usage(os.Stdout)
os.Exit(0)
return nil
}
func usage(w io.Writer) {
fmt.Fprintf(w, "USAGE: %s [globalOpts] [SUBCOMMAND] [opts]\n", os.Args[0])
fmt.Fprintf(w, "\nSUBCOMMANDS:\n\n")
for command := range cmdMap {
fmt.Fprintf(w, "\t%s\n", command)
}
}
func parseAndRun(args []string) {
cmdName := args[0]
cmdToRun, ok := cmdMap[cmdName]
if !ok {
fmt.Fprintf(os.Stderr, "Unknown subcommand: %s\n", cmdName)
usage(os.Stderr)
os.Exit(2)
}
// from here: https://gist.github.com/teknoraver/5ffacb8757330715bcbcc90e6d46ac74
ctl = http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", globalOpts.ctlSock)
},
},
}
err := cmdToRun(args)
if err != nil {
fmt.Fprintf(os.Stderr, "Error:\n%s\n", err)
os.Exit(1)
}
}
func main() {
flag.StringVar(&globalOpts.ctlSock, "ctl-socket", "/tmp/circologd-ctl.sock",
"Path to a unix domain socket for the control server; leave empty to disable")
flag.BoolVar(&globalOpts.verbose, "verbose", false, "Print more output")
flag.Parse()
args := flag.Args()
if len(args) == 0 {
usage(os.Stderr)
os.Exit(-1)
}
parseAndRun(args)
}

View file

@ -1,4 +1,4 @@
package formatter package main
import ( import (
"encoding/json" "encoding/json"
@ -8,7 +8,6 @@ import (
"time" "time"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
"gopkg.in/mgo.v2/bson"
) )
// Formatter is an interface, so that multiple implementations can exist // Formatter is an interface, so that multiple implementations can exist
@ -34,67 +33,32 @@ func init() {
)) ))
} }
type Format int type renderFormat int
func (rf *Format) Set(v string) error { func parseRenderFormat(s string) (renderFormat, error) {
newvalue, err := parseFormat(v)
if err != nil {
return err
}
*rf = newvalue
return nil
}
func (rf Format) String() string {
switch rf {
case FormatJSON:
return "json"
case FormatSyslog:
return "syslog"
case FormatBSON:
return "bson"
}
return ""
}
func (rf Format) WriteFormatted(w io.Writer, msg format.LogParts) error {
return WriteFormatted(w, rf, msg)
}
func parseFormat(s string) (Format, error) {
switch s { switch s {
case "json": case "json":
return FormatJSON, nil return formatJSON, nil
case "syslog": case "syslog":
return FormatSyslog, nil return formatSyslog, nil
case "bson":
return FormatBSON, nil
default: default:
return 0, fmt.Errorf("Undefined format `%s`", s) return 0, fmt.Errorf("Undefined format `%s`", s)
} }
} }
const ( const (
FormatSyslog = iota // 0 formatSyslog = iota // 0
FormatJSON = iota formatJSON = iota
FormatBSON = iota
) )
func WriteFormatted(w io.Writer, f Format, msg format.LogParts) error { func writeFormatted(w io.Writer, f renderFormat, msg format.LogParts) error {
switch f { switch f {
case FormatSyslog: case formatSyslog:
return syslogTmpl.Execute(w, msg) return syslogTmpl.Execute(w, msg)
case FormatJSON: case formatJSON:
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
enc.SetIndent("", " ") enc.SetIndent("", " ")
return enc.Encode(msg) return enc.Encode(msg)
case FormatBSON:
enc, err := bson.Marshal(msg)
if err != nil {
return err
}
_, err = w.Write(enc)
return err
} }
return nil return nil
} }

View file

@ -9,16 +9,13 @@ import (
"time" "time"
"git.lattuga.net/boyska/circolog" "git.lattuga.net/boyska/circolog"
"git.lattuga.net/boyska/circolog/formatter"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
) )
func setupHTTP(hub circolog.Hub) *http.ServeMux { func setupHTTP(hub circolog.Hub) {
mux := http.NewServeMux() http.HandleFunc("/", getHTTPHandler(hub))
mux.HandleFunc("/", getHTTPHandler(hub)) http.HandleFunc("/ws", getWSHandler(hub))
mux.HandleFunc("/ws", getWSHandler(hub))
return mux
} }
func parseParameterL(r *http.Request) (int, error) { func parseParameterL(r *http.Request) (int, error) {
@ -57,7 +54,7 @@ func parseParameters(r *http.Request) (circolog.ClientOptions, error) {
} }
type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd) type renderOptions struct { // those are options relevant to the rendered (that is, the HTTP side of circologd)
Format formatter.Format Format renderFormat
} }
func parseRenderParameters(r *http.Request) (renderOptions, error) { func parseRenderParameters(r *http.Request) (renderOptions, error) {
@ -71,10 +68,11 @@ func parseRenderParameters(r *http.Request) (renderOptions, error) {
if len(val) != 1 { if len(val) != 1 {
return opts, errors.New("Format repeated multiple times") return opts, errors.New("Format repeated multiple times")
} }
err := opts.Format.Set(val[0]) format, err := parseRenderFormat(val[0])
if err != nil { if err != nil {
return opts, err return opts, err
} }
opts.Format = format
} }
return opts, nil return opts, nil
} }
@ -106,10 +104,9 @@ func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
hub.Register <- client hub.Register <- client
for x := range client.Messages { for x := range client.Messages {
if err := render_opts.Format.WriteFormatted(w, x); err == nil { writeFormatted(w, render_opts.Format, x)
if render_opts.Format != formatter.FormatJSON { // bleah if render_opts.Format != formatJSON { // bleah
w.Write([]byte("\n")) w.Write([]byte("\n"))
}
} }
} }
} }
@ -121,18 +118,10 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
WriteBufferSize: 1024, WriteBufferSize: 1024,
} }
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
render_opts, err := parseRenderParameters(r)
if err != nil {
log.Println("Error parsing:", err)
w.WriteHeader(400)
fmt.Fprintln(w, err)
return
}
opts, err := parseParameters(r) opts, err := parseParameters(r)
if err != nil { if err != nil {
log.Println("Error on request parameter \"l\":", err) log.Println("Error on request parameter \"l\":", err)
w.WriteHeader(400) w.WriteHeader(400)
fmt.Fprintln(w, err)
return return
} }
opts.Nofollow = false opts.Nofollow = false
@ -167,7 +156,7 @@ func getWSHandler(hub circolog.Hub) http.HandlerFunc {
if err != nil { if err != nil {
return return
} }
render_opts.Format.WriteFormatted(w, message) writeFormatted(w, formatSyslog, message)
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
return return

View file

@ -1,82 +0,0 @@
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
fractal "git.lattuga.net/blallo/gotools/formatting"
"git.lattuga.net/boyska/circolog"
"github.com/gorilla/mux"
)
func setupHTTPCtl(hub circolog.Hub, verbose bool) *mux.Router {
m := mux.NewRouter()
m.HandleFunc("/pause/toggle", togglePause(hub, verbose)).Methods("POST")
m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
m.HandleFunc("/echo", echo(verbose)).Methods("GET")
return m
}
func togglePause(hub circolog.Hub, verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - toggled pause", r.Method, r.RemoteAddr)
}
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
resp := <-hub.Responses
active := resp.Value.(bool)
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"paused": !active})
}
}
func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - cleared queue", r.Method, r.RemoteAddr)
}
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
resp := <-hub.Responses
success := resp.Value.(bool)
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
enc.Encode(map[string]interface{}{"success": success})
}
}
func printHelp(verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Printf("[%s] %s - asked for help", r.Method, r.RemoteAddr)
}
w.Header().Set("content-type", "application/json")
enc := json.NewEncoder(w)
var pathsWithDocs = map[string]string{
"/pause/toggle": "Toggle the server from pause state (not listening)",
"/logs/clear": "Wipe the buffer from all the messages",
"/help": "This help",
"/echo": "Answers to heartbeat",
}
fractal.EncodeJSON(pathsWithDocs, enc)
if verbose {
errEnc := json.NewEncoder(os.Stderr)
fractal.EncodeJSON(pathsWithDocs, errEnc)
}
}
}
func echo(verbose bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Echo")
if verbose {
log.Printf("[%s] %s - asked for echo", r.Method, r.RemoteAddr)
}
w.Header().Set("content-type", "text/plain")
fmt.Fprintln(w, "I am on!")
}
}

View file

@ -3,12 +3,10 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"log"
"net" "net"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"syscall"
"git.lattuga.net/boyska/circolog" "git.lattuga.net/boyska/circolog"
syslog "gopkg.in/mcuadros/go-syslog.v2" syslog "gopkg.in/mcuadros/go-syslog.v2"
@ -16,7 +14,7 @@ import (
func cleanSocket(socket string) { func cleanSocket(socket string) {
if err := os.Remove(socket); err != nil { if err := os.Remove(socket); err != nil {
fmt.Fprintln(os.Stderr, "Error cleaning", socket, ":", err) fmt.Fprintln(os.Stderr, socket, ":", err)
} }
} }
@ -28,16 +26,13 @@ func main() {
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages") syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service") queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!") querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable")
verbose := flag.Bool("verbose", false, "Print more output executing the daemon")
flag.Parse() flag.Parse()
interrupt := make(chan os.Signal, 1) interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGTERM) signal.Notify(interrupt, os.Interrupt)
hub := circolog.NewHub(*bufsize) hub := circolog.NewHub(*bufsize)
handler := syslog.NewChannelHandler(hub.LogMessages) handler := syslog.NewChannelHandler(hub.LogMessages)
go hub.Run()
server := syslog.NewServer() server := syslog.NewServer()
server.SetFormat(syslog.RFC5424) server.SetFormat(syslog.RFC5424)
@ -60,82 +55,31 @@ func main() {
fmt.Fprintln(os.Stderr, "argh", err) fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1) os.Exit(1)
} }
go hub.Run()
httpQueryServer := http.Server{Handler: setupHTTP(hub)} setupHTTP(hub)
if *querySocket != "" { if *querySocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *querySocket) fmt.Printf("Binding address `%s` [http]\n", *querySocket)
unixListener, err := net.Listen("unix", *querySocket) unixListener, err := net.Listen("unix", *querySocket)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err) fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
return
} }
defer cleanSocket(*querySocket) go http.Serve(unixListener, nil)
go func() {
if err := httpQueryServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding", *querySocket, ":", err)
}
}()
} else { } else {
httpQueryServer.Addr = *queryAddr
fmt.Printf("Binding address `%s` [http]\n", *queryAddr) fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
go func() { go http.ListenAndServe(*queryAddr, nil)
err := httpQueryServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding", *queryAddr, ":", err)
}
}()
} }
httpCtlServer := http.Server{Handler: setupHTTPCtl(hub, *verbose)}
if *ctlSocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *ctlSocket)
unixListener, err := net.Listen("unix", *ctlSocket)
if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
return
}
defer cleanSocket(*ctlSocket)
go func() {
if err := httpCtlServer.Serve(unixListener); err != nil && err != http.ErrServerClosed {
fmt.Fprintln(os.Stderr, "error binding:", err)
}
}()
}
// TODO: now we are ready
for { for {
select { select {
case sig := <-interrupt: case <-interrupt:
if sig == syscall.SIGUSR1 { server.Kill()
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle} //server.Wait()
resp := <-hub.Responses if *syslogSocketPath != "" {
if resp.Value.(bool) {
log.Println("resumed")
} else {
log.Println("paused")
}
}
if sig == syscall.SIGUSR2 {
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
resp := <-hub.Responses
if resp.Value.(bool) {
log.Println("buffer cleaned")
} else {
log.Println("buffer NOT cleaned")
}
}
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
log.Println("Quitting because of signal", sig)
server.Kill()
if err := httpQueryServer.Shutdown(nil); err != nil {
fmt.Fprintln(os.Stderr, "Error closing http server:", err)
}
if err := httpCtlServer.Shutdown(nil); err != nil {
fmt.Fprintln(os.Stderr, "Error closing control server:", err)
}
return
} }
return
default:
} }
} }
} }

58
hub.go
View file

@ -24,29 +24,10 @@ type ClientOptions struct {
// The channel "register" and "unregister" can be seen as "command" // The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client // keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given // has "options", such as Nofollow, to explain the Hub what should be given
// An HubCommand is an "enum" of different commands
type HubCommand int
const (
CommandClear = iota
CommandPauseToggle = iota
)
// An HubFullCommand is a Command, complete with arguments
type HubFullCommand struct {
Command HubCommand
}
type CommandResponse struct {
Value interface{}
}
type Hub struct { type Hub struct {
Register chan Client Register chan Client
Unregister chan Client Unregister chan Client
LogMessages chan format.LogParts LogMessages chan format.LogParts
Commands chan HubFullCommand
Responses chan CommandResponse
clients map[Client]bool clients map[Client]bool
circbuf *ring.Ring circbuf *ring.Ring
@ -58,8 +39,6 @@ func NewHub(ringBufSize int) Hub {
Register: make(chan Client), Register: make(chan Client),
Unregister: make(chan Client), Unregister: make(chan Client),
LogMessages: make(chan format.LogParts), LogMessages: make(chan format.LogParts),
Commands: make(chan HubFullCommand),
Responses: make(chan CommandResponse),
circbuf: ring.New(ringBufSize), circbuf: ring.New(ringBufSize),
} }
} }
@ -98,7 +77,6 @@ func (h *Hub) register(cl Client) {
// Run is hub main loop; keeps everything going // Run is hub main loop; keeps everything going
func (h *Hub) Run() { func (h *Hub) Run() {
active := true
for { for {
select { select {
case cl := <-h.Register: case cl := <-h.Register:
@ -110,36 +88,16 @@ func (h *Hub) Run() {
delete(h.clients, cl) delete(h.clients, cl)
} }
case msg := <-h.LogMessages: case msg := <-h.LogMessages:
if active == true { h.circbuf.Value = msg
h.circbuf.Value = msg h.circbuf = h.circbuf.Next()
h.circbuf = h.circbuf.Next() for client := range h.clients {
for client := range h.clients { select { // send without blocking
select { // send without blocking case client.Messages <- msg:
case client.Messages <- msg: break
break default:
default: break
break
}
} }
} }
case cmd := <-h.Commands:
if cmd.Command == CommandClear {
h.clear()
h.Responses <- CommandResponse{Value: true}
}
if cmd.Command == CommandPauseToggle {
active = !active
h.Responses <- CommandResponse{Value: active}
}
} }
} }
} }
// Clear removes all elements from the buffer
func (h *Hub) clear() {
buf := h.circbuf
for i := 0; i < buf.Len(); i++ {
buf.Value = nil
buf = buf.Next()
}
}