Merge remote-tracking branch 'blallo/master'
This commit is contained in:
commit
64dc363de7
4 changed files with 238 additions and 22 deletions
124
cmd/circologctl/main.go
Normal file
124
cmd/circologctl/main.go
Normal file
|
@ -0,0 +1,124 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var globalOpts struct {
|
||||||
|
ctlSock string
|
||||||
|
verbose bool
|
||||||
|
debug bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var ctl http.Client
|
||||||
|
|
||||||
|
type commandFunc func([]string) error
|
||||||
|
|
||||||
|
var cmdMap map[string]commandFunc
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cmdMap = map[string]commandFunc{
|
||||||
|
// TODO: implement set and get of config at runtime
|
||||||
|
//"set": setCmd,
|
||||||
|
//"get": getCmd,
|
||||||
|
"pause": pauseCmd,
|
||||||
|
"reload": reloadCmd,
|
||||||
|
"restart": restartCmd,
|
||||||
|
"help": helpCmd,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//func setCmd(ctlSock string, args []string) error {}
|
||||||
|
|
||||||
|
//func getCmd(ctlSock string, args []string) error {}
|
||||||
|
|
||||||
|
func pauseCmd(args []string) error {
|
||||||
|
var dontChangeAgain time.Duration
|
||||||
|
flagset := flag.NewFlagSet(args[0], flag.ExitOnError)
|
||||||
|
waitTime := flagset.Duration("wait-time", dontChangeAgain, "How long to wait before untoggling the state, defaults to never")
|
||||||
|
flagset.Parse(args[1:])
|
||||||
|
postBody := make(map[string][]string)
|
||||||
|
if *waitTime != dontChangeAgain {
|
||||||
|
postBody["waitTime"] = []string{fmt.Sprintf("%s", *waitTime)}
|
||||||
|
}
|
||||||
|
if globalOpts.debug {
|
||||||
|
fmt.Println("[DEBUG] postBody:", postBody)
|
||||||
|
}
|
||||||
|
resp, err := ctl.PostForm("http://unix/pause/toggle", postBody)
|
||||||
|
if globalOpts.verbose {
|
||||||
|
defer resp.Body.Close()
|
||||||
|
bodyBytes, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Println(string(bodyBytes))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func reloadCmd(args []string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func restartCmd(args []string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func helpCmd(args []string) error {
|
||||||
|
usage(os.Stdout)
|
||||||
|
os.Exit(0)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func usage(w io.Writer) {
|
||||||
|
fmt.Fprintf(w, "USAGE: %s [globalOpts] [SUBCOMMAND] [opts]\n", os.Args[0])
|
||||||
|
fmt.Fprintf(w, "\nSUBCOMMANDS:\n\n")
|
||||||
|
for command := range cmdMap {
|
||||||
|
fmt.Fprintf(w, "\t%s\n", command)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseAndRun(args []string) {
|
||||||
|
cmdName := args[0]
|
||||||
|
cmdToRun, ok := cmdMap[cmdName]
|
||||||
|
if !ok {
|
||||||
|
fmt.Fprintf(os.Stderr, "Unknown subcommand: %s\n", cmdName)
|
||||||
|
usage(os.Stderr)
|
||||||
|
os.Exit(2)
|
||||||
|
}
|
||||||
|
// from here: https://gist.github.com/teknoraver/5ffacb8757330715bcbcc90e6d46ac74
|
||||||
|
ctl = http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||||
|
return net.Dial("unix", globalOpts.ctlSock)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := cmdToRun(args)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Error:\n%s\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.StringVar(&globalOpts.ctlSock, "ctl-socket", "/tmp/circologd-ctl.sock",
|
||||||
|
"Path to a unix domain socket for the control server; leave empty to disable")
|
||||||
|
flag.BoolVar(&globalOpts.verbose, "verbose", false, "Print more output")
|
||||||
|
flag.BoolVar(&globalOpts.debug, "debug", false, "Print debugging info")
|
||||||
|
flag.Parse()
|
||||||
|
args := flag.Args()
|
||||||
|
if len(args) == 0 {
|
||||||
|
usage(os.Stderr)
|
||||||
|
os.Exit(-1)
|
||||||
|
}
|
||||||
|
parseAndRun(args)
|
||||||
|
}
|
|
@ -2,23 +2,51 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
fractal "git.lattuga.net/blallo/gotools/formatting"
|
||||||
"git.lattuga.net/boyska/circolog"
|
"git.lattuga.net/boyska/circolog"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupHTTPCtl(hub circolog.Hub) *mux.Router {
|
func setupHTTPCtl(hub circolog.Hub, verbose, debug bool) *mux.Router {
|
||||||
mux := mux.NewRouter()
|
m := mux.NewRouter()
|
||||||
mux.HandleFunc("/pause/toggle", togglePause(hub)).Methods("POST")
|
m.HandleFunc("/pause/toggle", togglePause(hub, verbose, debug)).Methods("POST")
|
||||||
mux.HandleFunc("/logs/clear", clearQueue(hub)).Methods("POST")
|
m.HandleFunc("/logs/clear", clearQueue(hub, verbose)).Methods("POST")
|
||||||
return mux
|
m.HandleFunc("/help", printHelp(verbose)).Methods("GET")
|
||||||
|
m.HandleFunc("/echo", echo(verbose)).Methods("GET")
|
||||||
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func togglePause(hub circolog.Hub) http.HandlerFunc {
|
func togglePause(hub circolog.Hub, verbose, debug bool) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
|
if verbose {
|
||||||
resp := <-hub.Responses
|
log.Printf("[%s] %s - toggled pause", r.Method, r.RemoteAddr)
|
||||||
|
}
|
||||||
|
r.ParseForm()
|
||||||
|
waitTimePar := r.FormValue("waitTime")
|
||||||
|
var waitTime time.Duration
|
||||||
|
var err error
|
||||||
|
if waitTimePar != "" {
|
||||||
|
waitTime, err = time.ParseDuration(waitTimePar)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("waitTime not understood:", waitTimePar)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if debug {
|
||||||
|
fmt.Println("[DEBUG] waitTime:", waitTime)
|
||||||
|
}
|
||||||
|
response := make(chan circolog.CommandResponse)
|
||||||
|
hub.Commands <- circolog.HubFullCommand{
|
||||||
|
Command: circolog.CommandPauseToggle,
|
||||||
|
Response: response,
|
||||||
|
Parameters: map[string]interface{}{"waitTime": waitTime},
|
||||||
|
}
|
||||||
|
resp := <-response
|
||||||
active := resp.Value.(bool)
|
active := resp.Value.(bool)
|
||||||
w.Header().Set("content-type", "application/json")
|
w.Header().Set("content-type", "application/json")
|
||||||
enc := json.NewEncoder(w)
|
enc := json.NewEncoder(w)
|
||||||
|
@ -26,13 +54,49 @@ func togglePause(hub circolog.Hub) http.HandlerFunc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func clearQueue(hub circolog.Hub) http.HandlerFunc {
|
func clearQueue(hub circolog.Hub, verbose bool) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear}
|
if verbose {
|
||||||
resp := <-hub.Responses
|
log.Printf("[%s] %s - cleared queue", r.Method, r.RemoteAddr)
|
||||||
|
}
|
||||||
|
response := make(chan circolog.CommandResponse)
|
||||||
|
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear, Response: response}
|
||||||
|
resp := <-response
|
||||||
success := resp.Value.(bool)
|
success := resp.Value.(bool)
|
||||||
w.Header().Set("content-type", "application/json")
|
w.Header().Set("content-type", "application/json")
|
||||||
enc := json.NewEncoder(w)
|
enc := json.NewEncoder(w)
|
||||||
enc.Encode(map[string]interface{}{"success": success})
|
enc.Encode(map[string]interface{}{"success": success})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func printHelp(verbose bool) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if verbose {
|
||||||
|
log.Printf("[%s] %s - asked for help", r.Method, r.RemoteAddr)
|
||||||
|
}
|
||||||
|
w.Header().Set("content-type", "application/json")
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
|
var pathsWithDocs = map[string]string{
|
||||||
|
"/pause/toggle": "Toggle the server from pause state (not listening)",
|
||||||
|
"/logs/clear": "Wipe the buffer from all the messages",
|
||||||
|
"/help": "This help",
|
||||||
|
"/echo": "Answers to heartbeat",
|
||||||
|
}
|
||||||
|
fractal.EncodeJSON(pathsWithDocs, enc)
|
||||||
|
if verbose {
|
||||||
|
errEnc := json.NewEncoder(os.Stderr)
|
||||||
|
fractal.EncodeJSON(pathsWithDocs, errEnc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func echo(verbose bool) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
log.Println("Echo")
|
||||||
|
if verbose {
|
||||||
|
log.Printf("[%s] %s - asked for echo", r.Method, r.RemoteAddr)
|
||||||
|
}
|
||||||
|
w.Header().Set("content-type", "text/plain")
|
||||||
|
fmt.Fprintln(w, "I am on!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -29,10 +29,12 @@ func main() {
|
||||||
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
|
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
|
||||||
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
|
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
|
||||||
ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable")
|
ctlSocket := flag.String("ctl-socket", "/tmp/circologd-ctl.sock", "Path to a unix domain socket for the control server; leave empty to disable")
|
||||||
|
verbose := flag.Bool("verbose", false, "Print more output executing the daemon")
|
||||||
|
debug := flag.Bool("debug", false, "Print debugging info executing the daemon")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
interrupt := make(chan os.Signal, 1)
|
interrupt := make(chan os.Signal, 1)
|
||||||
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGTERM)
|
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGTERM)
|
||||||
|
|
||||||
hub := circolog.NewHub(*bufsize)
|
hub := circolog.NewHub(*bufsize)
|
||||||
handler := syslog.NewChannelHandler(hub.LogMessages)
|
handler := syslog.NewChannelHandler(hub.LogMessages)
|
||||||
|
@ -85,7 +87,7 @@ func main() {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
httpCtlServer := http.Server{Handler: setupHTTPCtl(hub)}
|
httpCtlServer := http.Server{Handler: setupHTTPCtl(hub, *verbose, *debug)}
|
||||||
if *ctlSocket != "" {
|
if *ctlSocket != "" {
|
||||||
fmt.Printf("Binding address `%s` [http]\n", *ctlSocket)
|
fmt.Printf("Binding address `%s` [http]\n", *ctlSocket)
|
||||||
unixListener, err := net.Listen("unix", *ctlSocket)
|
unixListener, err := net.Listen("unix", *ctlSocket)
|
||||||
|
@ -107,14 +109,25 @@ func main() {
|
||||||
select {
|
select {
|
||||||
case sig := <-interrupt:
|
case sig := <-interrupt:
|
||||||
if sig == syscall.SIGUSR1 {
|
if sig == syscall.SIGUSR1 {
|
||||||
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle}
|
response := make(chan circolog.CommandResponse)
|
||||||
resp := <-hub.Responses
|
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandPauseToggle, Response: response}
|
||||||
|
resp := <-response
|
||||||
if resp.Value.(bool) {
|
if resp.Value.(bool) {
|
||||||
log.Println("resumed")
|
log.Println("resumed")
|
||||||
} else {
|
} else {
|
||||||
log.Println("paused")
|
log.Println("paused")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if sig == syscall.SIGUSR2 {
|
||||||
|
response := make(chan circolog.CommandResponse)
|
||||||
|
hub.Commands <- circolog.HubFullCommand{Command: circolog.CommandClear, Response: response}
|
||||||
|
resp := <-response
|
||||||
|
if resp.Value.(bool) {
|
||||||
|
log.Println("buffer cleaned")
|
||||||
|
} else {
|
||||||
|
log.Println("buffer NOT cleaned")
|
||||||
|
}
|
||||||
|
}
|
||||||
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
|
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
|
||||||
log.Println("Quitting because of signal", sig)
|
log.Println("Quitting because of signal", sig)
|
||||||
server.Kill()
|
server.Kill()
|
||||||
|
|
27
hub.go
27
hub.go
|
@ -2,6 +2,8 @@ package circolog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/ring"
|
"container/ring"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||||
|
@ -36,6 +38,8 @@ const (
|
||||||
// An HubFullCommand is a Command, complete with arguments
|
// An HubFullCommand is a Command, complete with arguments
|
||||||
type HubFullCommand struct {
|
type HubFullCommand struct {
|
||||||
Command HubCommand
|
Command HubCommand
|
||||||
|
Parameters map[string]interface{}
|
||||||
|
Response chan CommandResponse
|
||||||
}
|
}
|
||||||
type CommandResponse struct {
|
type CommandResponse struct {
|
||||||
Value interface{}
|
Value interface{}
|
||||||
|
@ -46,7 +50,6 @@ type Hub struct {
|
||||||
Unregister chan Client
|
Unregister chan Client
|
||||||
LogMessages chan format.LogParts
|
LogMessages chan format.LogParts
|
||||||
Commands chan HubFullCommand
|
Commands chan HubFullCommand
|
||||||
Responses chan CommandResponse
|
|
||||||
|
|
||||||
clients map[Client]bool
|
clients map[Client]bool
|
||||||
circbuf *ring.Ring
|
circbuf *ring.Ring
|
||||||
|
@ -59,7 +62,6 @@ func NewHub(ringBufSize int) Hub {
|
||||||
Unregister: make(chan Client),
|
Unregister: make(chan Client),
|
||||||
LogMessages: make(chan format.LogParts),
|
LogMessages: make(chan format.LogParts),
|
||||||
Commands: make(chan HubFullCommand),
|
Commands: make(chan HubFullCommand),
|
||||||
Responses: make(chan CommandResponse),
|
|
||||||
circbuf: ring.New(ringBufSize),
|
circbuf: ring.New(ringBufSize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -125,17 +127,30 @@ func (h *Hub) Run() {
|
||||||
case cmd := <-h.Commands:
|
case cmd := <-h.Commands:
|
||||||
if cmd.Command == CommandClear {
|
if cmd.Command == CommandClear {
|
||||||
h.clear()
|
h.clear()
|
||||||
h.Responses <- CommandResponse{Value: true}
|
cmd.Response <- CommandResponse{Value: true}
|
||||||
}
|
}
|
||||||
if cmd.Command == CommandPauseToggle {
|
if cmd.Command == CommandPauseToggle {
|
||||||
active = !active
|
togglePause(cmd.Parameters["waitTime"].(time.Duration), &active)
|
||||||
h.Responses <- CommandResponse{Value: active}
|
cmd.Response <- CommandResponse{Value: active}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear removes every all elements from the buffer
|
func togglePause(waitTime time.Duration, status *bool) {
|
||||||
|
var noTime time.Duration
|
||||||
|
if waitTime != noTime {
|
||||||
|
delayedToggle := func() {
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
fmt.Fprintln(os.Stderr, "toggling again")
|
||||||
|
togglePause(noTime, status)
|
||||||
|
}
|
||||||
|
go delayedToggle()
|
||||||
|
}
|
||||||
|
*status = !*status
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear removes all elements from the buffer
|
||||||
func (h *Hub) clear() {
|
func (h *Hub) clear() {
|
||||||
buf := h.circbuf
|
buf := h.circbuf
|
||||||
for i := 0; i < buf.Len(); i++ {
|
for i := 0; i < buf.Len(); i++ {
|
||||||
|
|
Loading…
Reference in a new issue