forked from boyska/circolog
Compare commits
15 commits
dbbbcb24a6
...
89c59e5713
Author | SHA1 | Date | |
---|---|---|---|
89c59e5713 | |||
d380deae37 | |||
a5999adb8d | |||
369e16d6c3 | |||
5b4e85fabb | |||
34593d380a | |||
5b7ddb62a6 | |||
71763cf8b1 | |||
8568280dd1 | |||
bf145240c2 | |||
97743eaad5 | |||
66f32d1c05 | |||
97fd191f0e | |||
b1b83f488e | |||
3bf88506be |
8 changed files with 453 additions and 67 deletions
22
.drone.yml
Normal file
22
.drone.yml
Normal file
|
@ -0,0 +1,22 @@
|
|||
---
|
||||
workspace:
|
||||
base: /go
|
||||
path: src/git.lattuga.net/boyska/circolog
|
||||
|
||||
pipeline:
|
||||
build:
|
||||
image: golang:${GO_VER}
|
||||
commands:
|
||||
- go get ./...
|
||||
- go test
|
||||
- go build
|
||||
|
||||
matrix:
|
||||
GO_VER:
|
||||
- 1.5
|
||||
- 1.6
|
||||
- 1.7
|
||||
- 1.8
|
||||
- 1.9
|
||||
- 1.10
|
||||
- latest
|
35
README.md
35
README.md
|
@ -4,3 +4,38 @@ This is useful when you want to keep some (heavy detailed) log available, but yo
|
|||
things to disk.
|
||||
|
||||
On your "main" syslog, send some message to this one!
|
||||
|
||||
## Integration examples
|
||||
|
||||
In these examples I'll refer to the usage of UNIX sockets. They are more secure than TCP/UDP sockets because
|
||||
they have file permissions, they can be "masked" using mount namespaces, etc.
|
||||
However, circlogd supports udp/tcp sockets easily, so that should not be an issue.
|
||||
|
||||
### syslog-ng
|
||||
|
||||
To integrate into syslog-ng, put this in `/etc/syslog-ng/conf.d/circolog.conf`
|
||||
```
|
||||
destination d_circolog {
|
||||
unix-dgram("/run/circolog-syslog.sock"
|
||||
flags(syslog-protocol)
|
||||
);
|
||||
};
|
||||
log { source(s_src); destination(d_circolog); };
|
||||
```
|
||||
and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/circolog-query.sock`
|
||||
|
||||
|
||||
## Client
|
||||
|
||||
`curl` might be enough of a client for most uses.
|
||||
|
||||
curl --unix-socket /run/circolog-query.sock localhost/
|
||||
|
||||
will give you everything that circologd has in memory
|
||||
|
||||
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
|
||||
any websocket client supporting UNIX domain socket, so you have two options:
|
||||
|
||||
1. wait until I write a proper `circolog-tail` client implementing it all
|
||||
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
|
||||
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.
|
||||
|
|
87
cmd/circolog-tail/main.go
Normal file
87
cmd/circolog-tail/main.go
Normal file
|
@ -0,0 +1,87 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
func main() {
|
||||
addr := flag.String("addr", "localhost:9080", "http service address")
|
||||
querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server")
|
||||
flag.Parse()
|
||||
|
||||
interrupt := make(chan os.Signal, 1)
|
||||
signal.Notify(interrupt, os.Interrupt)
|
||||
var d *websocket.Dialer
|
||||
u := url.URL{Scheme: "ws",
|
||||
Host: *addr, // ignored in case of -socket; see the Dialer below
|
||||
Path: "/ws",
|
||||
}
|
||||
if *querySocket != "" {
|
||||
d = &websocket.Dialer{
|
||||
NetDial: func(network, addr string) (net.Conn, error) {
|
||||
return net.Dial("unix", *querySocket)
|
||||
},
|
||||
HandshakeTimeout: 45 * time.Second, // same as DefaultDialer
|
||||
}
|
||||
|
||||
log.Printf("connecting to %s", *querySocket)
|
||||
} else {
|
||||
d = websocket.DefaultDialer
|
||||
log.Printf("connecting to %s", *addr)
|
||||
}
|
||||
|
||||
c, _, err := d.Dial(u.String(), nil)
|
||||
if err != nil {
|
||||
log.Fatal("dial:", err)
|
||||
}
|
||||
defer c.Close()
|
||||
log.Println("connected!", u.String())
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
for {
|
||||
_, message, err := c.ReadMessage()
|
||||
if err != nil {
|
||||
log.Println("close:", err)
|
||||
return
|
||||
}
|
||||
fmt.Println(string(message))
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-interrupt:
|
||||
log.Println("interrupt")
|
||||
|
||||
// Cleanly close the connection by sending a close message and then waiting (with timeout) for the
|
||||
// server to close the connection.
|
||||
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||
if err != nil {
|
||||
log.Println("write close:", err)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
log.Println("Successfully close")
|
||||
case <-time.After(1 * time.Second):
|
||||
log.Println("Forced close")
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
114
cmd/circologd/http.go
Normal file
114
cmd/circologd/http.go
Normal file
|
@ -0,0 +1,114 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.lattuga.net/boyska/circolog"
|
||||
"github.com/gorilla/websocket"
|
||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||
)
|
||||
|
||||
func setupHTTP(hub circolog.Hub) {
|
||||
http.HandleFunc("/", getHTTPHandler(hub))
|
||||
http.HandleFunc("/ws", getWSHandler(hub))
|
||||
}
|
||||
|
||||
func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
client := circolog.Client{
|
||||
Messages: make(chan format.LogParts, 20),
|
||||
Nofollow: true}
|
||||
hub.Register <- client
|
||||
|
||||
err := r.ParseForm()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "error parsing http request", err)
|
||||
}
|
||||
// Looking for known parameter in the request
|
||||
// TODO: write specialized function
|
||||
var requestMessageLen int
|
||||
if reqL, ok := r.Form["l"]; ok {
|
||||
var err error
|
||||
switch {
|
||||
case len(reqL) == 1:
|
||||
requestMessageLen, err = strconv.Atoi(reqL[0])
|
||||
if requestMessageLen <= 0 {
|
||||
fmt.Fprintln(os.Stderr, "malformed request, l non positive:", requestMessageLen)
|
||||
//requestMessageLen := 0
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "malformed request on parameter l:", err)
|
||||
}
|
||||
case len(reqL) > 1:
|
||||
requestMessageLen, err = strconv.Atoi(reqL[len(reqL)-1])
|
||||
fmt.Fprintln(os.Stderr, "multiple values of l parameter, taking last:",
|
||||
requestMessageLen)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "malformed request on parameter l:", err)
|
||||
}
|
||||
default:
|
||||
fmt.Fprintln(os.Stderr, "empty parameter l in request")
|
||||
}
|
||||
}
|
||||
|
||||
i := 1
|
||||
for x := range client.Messages {
|
||||
w.Write([]byte(circolog.FormatSyslog(x)))
|
||||
w.Write([]byte("\n"))
|
||||
if requestMessageLen != 0 && i >= requestMessageLen {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getWSHandler(hub circolog.Hub) http.HandlerFunc {
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
client := circolog.Client{Messages: make(chan format.LogParts, 20)}
|
||||
hub.Register <- client
|
||||
|
||||
// Allow collection of memory referenced by the caller by doing all work in
|
||||
// new goroutines.
|
||||
go func(conn *websocket.Conn, c circolog.Client) {
|
||||
defer func() {
|
||||
hub.Unregister <- c
|
||||
conn.Close()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.Messages:
|
||||
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if !ok {
|
||||
// The hub closed the channel.
|
||||
conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
|
||||
w, err := conn.NextWriter(websocket.TextMessage)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write([]byte(circolog.FormatSyslog(message)))
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
// TODO: ticker/ping
|
||||
}
|
||||
}
|
||||
}(conn, client)
|
||||
}
|
||||
}
|
63
cmd/circologd/main.go
Normal file
63
cmd/circologd/main.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"git.lattuga.net/boyska/circolog"
|
||||
syslog "gopkg.in/mcuadros/go-syslog.v2"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var err error
|
||||
syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses")
|
||||
// dumpSocketPath := flag.String("dump-socket", "/run/buffer.sock", "The socket that user will connect to in order to receive logs")
|
||||
bufsize := flag.Int("buffer-size", 1000, "Number of messages to keep")
|
||||
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
|
||||
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
|
||||
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
|
||||
flag.Parse()
|
||||
|
||||
hub := circolog.NewHub(*bufsize)
|
||||
handler := syslog.NewChannelHandler(hub.LogMessages)
|
||||
|
||||
server := syslog.NewServer()
|
||||
server.SetFormat(syslog.RFC5424)
|
||||
server.SetHandler(handler)
|
||||
if *syslogSocketPath != "" {
|
||||
if err = server.ListenUnixgram(*syslogSocketPath); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "argh", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("Binding socket `%s` [syslog]\n", *syslogSocketPath)
|
||||
} else {
|
||||
fmt.Printf("Binding address `%s` [syslog]\n", *syslogAddr)
|
||||
if err = server.ListenUDP(*syslogAddr); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "argh", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err = server.Boot(); err != nil {
|
||||
fmt.Fprintln(os.Stderr, "argh", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
go hub.Run()
|
||||
|
||||
setupHTTP(hub)
|
||||
if *querySocket != "" {
|
||||
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
|
||||
unixListener, err := net.Listen("unix", *querySocket)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
|
||||
}
|
||||
defer os.Remove(*querySocket)
|
||||
http.Serve(unixListener, nil)
|
||||
} else {
|
||||
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
|
||||
http.ListenAndServe(*queryAddr, nil)
|
||||
}
|
||||
server.Wait()
|
||||
}
|
39
format.go
Normal file
39
format.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package circolog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||
)
|
||||
|
||||
// Formatter is an interface, so that multiple implementations can exist
|
||||
type Formatter func(format.LogParts) string
|
||||
|
||||
var tmplFuncs template.FuncMap
|
||||
var syslogTmpl *template.Template
|
||||
|
||||
func init() {
|
||||
tmplFuncs := template.FuncMap{
|
||||
"dateFormat": func(dt time.Time, fmt string) string {
|
||||
return dt.Format(fmt)
|
||||
},
|
||||
"rfc822": func(dt time.Time) string {
|
||||
return dt.Format(time.RFC822)
|
||||
},
|
||||
}
|
||||
syslogTmpl = template.Must(template.New("syslog").Funcs(tmplFuncs).Parse(
|
||||
"{{rfc822 (index . \"timestamp\")}} {{index . \"hostname\"}} " +
|
||||
"{{index . \"app_name\"}}" +
|
||||
"{{ if (ne (index . \"proc_id\") \"-\")}}[{{index . \"proc_id\"}}]{{end}}: " +
|
||||
"{{index . \"message\"}}",
|
||||
))
|
||||
}
|
||||
|
||||
// FormatSyslog format a message in the typical format used in /var/log/messages
|
||||
func FormatSyslog(msg format.LogParts) string {
|
||||
var buf bytes.Buffer
|
||||
syslogTmpl.Execute(&buf, msg)
|
||||
return buf.String()
|
||||
}
|
93
hub.go
Normal file
93
hub.go
Normal file
|
@ -0,0 +1,93 @@
|
|||
package circolog
|
||||
|
||||
import (
|
||||
"container/ring"
|
||||
"time"
|
||||
|
||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||
)
|
||||
|
||||
// Client represent a client connected via websocket. Its most important field is the messages channel, where
|
||||
// new messages are sent.
|
||||
type Client struct {
|
||||
Messages chan format.LogParts // only hub should write/close this
|
||||
Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
|
||||
}
|
||||
|
||||
// The Hub is the central "registry"; it keeps both the data storage and clients notifications
|
||||
//
|
||||
// The channel "register" and "unregister" can be seen as "command"
|
||||
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
|
||||
// has "options", such as Nofollow, to explain the Hub what should be given
|
||||
type Hub struct {
|
||||
Register chan Client
|
||||
Unregister chan Client
|
||||
LogMessages chan format.LogParts
|
||||
|
||||
clients map[Client]bool
|
||||
circbuf *ring.Ring
|
||||
}
|
||||
|
||||
// NewHub creates an empty hub
|
||||
func NewHub(ringBufSize int) Hub {
|
||||
return Hub{clients: make(map[Client]bool),
|
||||
Register: make(chan Client),
|
||||
Unregister: make(chan Client),
|
||||
LogMessages: make(chan format.LogParts),
|
||||
circbuf: ring.New(ringBufSize),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Hub) register(cl Client) {
|
||||
if _, ok := h.clients[cl]; !ok {
|
||||
if !cl.Nofollow { // we won't need it in future
|
||||
h.clients[cl] = true
|
||||
}
|
||||
|
||||
circbufDoExit := false
|
||||
h.circbuf.Do(func(x interface{}) {
|
||||
if circbufDoExit {
|
||||
return
|
||||
}
|
||||
if x != nil {
|
||||
select { // send with short timeout
|
||||
case cl.Messages <- x.(format.LogParts):
|
||||
break
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
circbufDoExit = true
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
if cl.Nofollow {
|
||||
close(cl.Messages)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run is hub main loop; keeps everything going
|
||||
func (h *Hub) Run() {
|
||||
for {
|
||||
select {
|
||||
case cl := <-h.Register:
|
||||
h.register(cl)
|
||||
case cl := <-h.Unregister:
|
||||
_, ok := h.clients[cl]
|
||||
if ok {
|
||||
close(cl.Messages)
|
||||
delete(h.clients, cl)
|
||||
}
|
||||
case msg := <-h.LogMessages:
|
||||
h.circbuf.Value = msg
|
||||
h.circbuf = h.circbuf.Next()
|
||||
for client := range h.clients {
|
||||
select { // send without blocking
|
||||
case client.Messages <- msg:
|
||||
break
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
67
main.go
67
main.go
|
@ -1,67 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"container/ring"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
syslog "gopkg.in/mcuadros/go-syslog.v2"
|
||||
"gopkg.in/mcuadros/go-syslog.v2/format"
|
||||
)
|
||||
|
||||
var circbuf *ring.Ring
|
||||
|
||||
func syslogdHandler(channel syslog.LogPartsChannel) {
|
||||
for logParts := range channel {
|
||||
fmt.Println(logParts)
|
||||
circbuf.Value = logParts
|
||||
circbuf = circbuf.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func httpHandler(w http.ResponseWriter, r *http.Request) {
|
||||
circbuf.Do(func(x interface{}) {
|
||||
if x == nil {
|
||||
return
|
||||
}
|
||||
logmsg := x.(format.LogParts)
|
||||
if logmsg["message"] == nil {
|
||||
return
|
||||
}
|
||||
c := logmsg["message"].(string)
|
||||
w.Write([]byte(c))
|
||||
w.Write([]byte("\n"))
|
||||
})
|
||||
}
|
||||
func main() {
|
||||
syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses")
|
||||
// dumpSocketPath := flag.String("dump-socket", "/run/buffer.sock", "The socket that user will connect to in order to receive logs")
|
||||
bufsize := flag.Int("buffer-size", 1000, "Number of messages to keep")
|
||||
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
|
||||
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
|
||||
flag.Parse()
|
||||
|
||||
channel := make(chan format.LogParts)
|
||||
handler := syslog.NewChannelHandler(channel)
|
||||
|
||||
server := syslog.NewServer()
|
||||
server.SetFormat(syslog.RFC5424)
|
||||
server.SetHandler(handler)
|
||||
if *syslogSocketPath != "" {
|
||||
server.ListenUnixgram(*syslogSocketPath)
|
||||
fmt.Printf("Binding socket `%s` [syslog]\n", *syslogSocketPath)
|
||||
} else {
|
||||
fmt.Printf("Binding address `%s` [syslog]\n", *syslogAddr)
|
||||
server.ListenUDP(*syslogAddr)
|
||||
}
|
||||
circbuf = ring.New(*bufsize)
|
||||
server.Boot()
|
||||
go syslogdHandler(channel)
|
||||
|
||||
http.HandleFunc("/", httpHandler)
|
||||
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
|
||||
http.ListenAndServe(*queryAddr, nil)
|
||||
|
||||
server.Wait()
|
||||
}
|
Loading…
Reference in a new issue