1
0
Fork 0
forked from boyska/circolog

Compare commits

...

15 commits

Author SHA1 Message Date
89c59e5713
[http] adding first draft of server-side message number limit. 2018-11-04 23:13:27 +01:00
d380deae37 [tail] unix socket support 2018-10-25 13:59:27 +02:00
a5999adb8d add basic circolog-tail client 2018-10-25 13:59:22 +02:00
369e16d6c3 format in "/var/log/messages style" 2018-08-23 15:40:57 +02:00
5b4e85fabb readme: how to integrate in your server 2018-08-23 13:00:24 +02:00
34593d380a refactor: hub reorganized a little 2018-08-23 12:25:07 +02:00
5b7ddb62a6 drone CI 2018-08-23 02:09:04 +02:00
71763cf8b1 HTTP unix domain socket 2018-08-23 02:08:53 +02:00
8568280dd1 FIX timeout issues between channels
we introduced non-blocking writes; however, being non-blocking when the
goroutine has no buffer means that many messages will be considered
lost.
This commit change from "non blocking" to "max 500ms; first timeout
means stop it all"; it also put a little buffer on client messages.
2018-08-23 02:05:50 +02:00
bf145240c2 proper cmd directory 2018-08-23 01:21:53 +02:00
97743eaad5 Hub resists to Client failures 2018-08-23 01:14:08 +02:00
66f32d1c05 refactor: now Hub keeps everything
before this there was some hidden race condition because raceCondition
is not concurrent-safe, and there was some concurrent reading and
writing.
Now everything is handled safely by the Hub.

Client now have "options" which are understood by the Hub to handle
them differently.
2018-08-23 01:04:31 +02:00
97fd191f0e refactor: no globals
it still isn't very clean; globals are avoided with factories, too many
pieces with no clear role
2018-08-23 00:34:02 +02:00
b1b83f488e Websocket to follow logs 2018-08-22 23:51:59 +02:00
3bf88506be Merge remote-tracking branch 'blallo/master' 2018-08-22 23:44:54 +02:00
8 changed files with 453 additions and 67 deletions

22
.drone.yml Normal file
View file

@ -0,0 +1,22 @@
---
workspace:
base: /go
path: src/git.lattuga.net/boyska/circolog
pipeline:
build:
image: golang:${GO_VER}
commands:
- go get ./...
- go test
- go build
matrix:
GO_VER:
- 1.5
- 1.6
- 1.7
- 1.8
- 1.9
- 1.10
- latest

View file

@ -4,3 +4,38 @@ This is useful when you want to keep some (heavy detailed) log available, but yo
things to disk.
On your "main" syslog, send some message to this one!
## Integration examples
In these examples I'll refer to the usage of UNIX sockets. They are more secure than TCP/UDP sockets because
they have file permissions, they can be "masked" using mount namespaces, etc.
However, circlogd supports udp/tcp sockets easily, so that should not be an issue.
### syslog-ng
To integrate into syslog-ng, put this in `/etc/syslog-ng/conf.d/circolog.conf`
```
destination d_circolog {
unix-dgram("/run/circolog-syslog.sock"
flags(syslog-protocol)
);
};
log { source(s_src); destination(d_circolog); };
```
and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/circolog-query.sock`
## Client
`curl` might be enough of a client for most uses.
curl --unix-socket /run/circolog-query.sock localhost/
will give you everything that circologd has in memory
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
any websocket client supporting UNIX domain socket, so you have two options:
1. wait until I write a proper `circolog-tail` client implementing it all
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.

87
cmd/circolog-tail/main.go Normal file
View file

@ -0,0 +1,87 @@
package main
import (
"flag"
"fmt"
"log"
"net"
"net/url"
"os"
"os/signal"
"time"
"github.com/gorilla/websocket"
)
func main() {
addr := flag.String("addr", "localhost:9080", "http service address")
querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server")
flag.Parse()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
var d *websocket.Dialer
u := url.URL{Scheme: "ws",
Host: *addr, // ignored in case of -socket; see the Dialer below
Path: "/ws",
}
if *querySocket != "" {
d = &websocket.Dialer{
NetDial: func(network, addr string) (net.Conn, error) {
return net.Dial("unix", *querySocket)
},
HandshakeTimeout: 45 * time.Second, // same as DefaultDialer
}
log.Printf("connecting to %s", *querySocket)
} else {
d = websocket.DefaultDialer
log.Printf("connecting to %s", *addr)
}
c, _, err := d.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
log.Println("connected!", u.String())
done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Println("close:", err)
return
}
fmt.Println(string(message))
}
}()
for {
select {
case <-done:
return
case <-interrupt:
log.Println("interrupt")
// Cleanly close the connection by sending a close message and then waiting (with timeout) for the
// server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("write close:", err)
return
}
select {
case <-done:
log.Println("Successfully close")
case <-time.After(1 * time.Second):
log.Println("Forced close")
}
return
}
}
}

114
cmd/circologd/http.go Normal file
View file

@ -0,0 +1,114 @@
package main
import (
"fmt"
"net/http"
"os"
"strconv"
"time"
"git.lattuga.net/boyska/circolog"
"github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
func setupHTTP(hub circolog.Hub) {
http.HandleFunc("/", getHTTPHandler(hub))
http.HandleFunc("/ws", getWSHandler(hub))
}
func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
client := circolog.Client{
Messages: make(chan format.LogParts, 20),
Nofollow: true}
hub.Register <- client
err := r.ParseForm()
if err != nil {
fmt.Fprintln(os.Stderr, "error parsing http request", err)
}
// Looking for known parameter in the request
// TODO: write specialized function
var requestMessageLen int
if reqL, ok := r.Form["l"]; ok {
var err error
switch {
case len(reqL) == 1:
requestMessageLen, err = strconv.Atoi(reqL[0])
if requestMessageLen <= 0 {
fmt.Fprintln(os.Stderr, "malformed request, l non positive:", requestMessageLen)
//requestMessageLen := 0
}
if err != nil {
fmt.Fprintln(os.Stderr, "malformed request on parameter l:", err)
}
case len(reqL) > 1:
requestMessageLen, err = strconv.Atoi(reqL[len(reqL)-1])
fmt.Fprintln(os.Stderr, "multiple values of l parameter, taking last:",
requestMessageLen)
if err != nil {
fmt.Fprintln(os.Stderr, "malformed request on parameter l:", err)
}
default:
fmt.Fprintln(os.Stderr, "empty parameter l in request")
}
}
i := 1
for x := range client.Messages {
w.Write([]byte(circolog.FormatSyslog(x)))
w.Write([]byte("\n"))
if requestMessageLen != 0 && i >= requestMessageLen {
break
}
i++
}
}
}
func getWSHandler(hub circolog.Hub) http.HandlerFunc {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
return func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
client := circolog.Client{Messages: make(chan format.LogParts, 20)}
hub.Register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go func(conn *websocket.Conn, c circolog.Client) {
defer func() {
hub.Unregister <- c
conn.Close()
}()
for {
select {
case message, ok := <-c.Messages:
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// The hub closed the channel.
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write([]byte(circolog.FormatSyslog(message)))
if err := w.Close(); err != nil {
return
}
// TODO: ticker/ping
}
}
}(conn, client)
}
}

63
cmd/circologd/main.go Normal file
View file

@ -0,0 +1,63 @@
package main
import (
"flag"
"fmt"
"net"
"net/http"
"os"
"git.lattuga.net/boyska/circolog"
syslog "gopkg.in/mcuadros/go-syslog.v2"
)
func main() {
var err error
syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses")
// dumpSocketPath := flag.String("dump-socket", "/run/buffer.sock", "The socket that user will connect to in order to receive logs")
bufsize := flag.Int("buffer-size", 1000, "Number of messages to keep")
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
flag.Parse()
hub := circolog.NewHub(*bufsize)
handler := syslog.NewChannelHandler(hub.LogMessages)
server := syslog.NewServer()
server.SetFormat(syslog.RFC5424)
server.SetHandler(handler)
if *syslogSocketPath != "" {
if err = server.ListenUnixgram(*syslogSocketPath); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
fmt.Printf("Binding socket `%s` [syslog]\n", *syslogSocketPath)
} else {
fmt.Printf("Binding address `%s` [syslog]\n", *syslogAddr)
if err = server.ListenUDP(*syslogAddr); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
}
if err = server.Boot(); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
go hub.Run()
setupHTTP(hub)
if *querySocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
unixListener, err := net.Listen("unix", *querySocket)
if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
}
defer os.Remove(*querySocket)
http.Serve(unixListener, nil)
} else {
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
http.ListenAndServe(*queryAddr, nil)
}
server.Wait()
}

39
format.go Normal file
View file

@ -0,0 +1,39 @@
package circolog
import (
"bytes"
"text/template"
"time"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
// Formatter is an interface, so that multiple implementations can exist
type Formatter func(format.LogParts) string
var tmplFuncs template.FuncMap
var syslogTmpl *template.Template
func init() {
tmplFuncs := template.FuncMap{
"dateFormat": func(dt time.Time, fmt string) string {
return dt.Format(fmt)
},
"rfc822": func(dt time.Time) string {
return dt.Format(time.RFC822)
},
}
syslogTmpl = template.Must(template.New("syslog").Funcs(tmplFuncs).Parse(
"{{rfc822 (index . \"timestamp\")}} {{index . \"hostname\"}} " +
"{{index . \"app_name\"}}" +
"{{ if (ne (index . \"proc_id\") \"-\")}}[{{index . \"proc_id\"}}]{{end}}: " +
"{{index . \"message\"}}",
))
}
// FormatSyslog format a message in the typical format used in /var/log/messages
func FormatSyslog(msg format.LogParts) string {
var buf bytes.Buffer
syslogTmpl.Execute(&buf, msg)
return buf.String()
}

93
hub.go Normal file
View file

@ -0,0 +1,93 @@
package circolog
import (
"container/ring"
"time"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
// Client represent a client connected via websocket. Its most important field is the messages channel, where
// new messages are sent.
type Client struct {
Messages chan format.LogParts // only hub should write/close this
Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
}
// The Hub is the central "registry"; it keeps both the data storage and clients notifications
//
// The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given
type Hub struct {
Register chan Client
Unregister chan Client
LogMessages chan format.LogParts
clients map[Client]bool
circbuf *ring.Ring
}
// NewHub creates an empty hub
func NewHub(ringBufSize int) Hub {
return Hub{clients: make(map[Client]bool),
Register: make(chan Client),
Unregister: make(chan Client),
LogMessages: make(chan format.LogParts),
circbuf: ring.New(ringBufSize),
}
}
func (h *Hub) register(cl Client) {
if _, ok := h.clients[cl]; !ok {
if !cl.Nofollow { // we won't need it in future
h.clients[cl] = true
}
circbufDoExit := false
h.circbuf.Do(func(x interface{}) {
if circbufDoExit {
return
}
if x != nil {
select { // send with short timeout
case cl.Messages <- x.(format.LogParts):
break
case <-time.After(500 * time.Millisecond):
circbufDoExit = true
break
}
}
})
if cl.Nofollow {
close(cl.Messages)
}
}
}
// Run is hub main loop; keeps everything going
func (h *Hub) Run() {
for {
select {
case cl := <-h.Register:
h.register(cl)
case cl := <-h.Unregister:
_, ok := h.clients[cl]
if ok {
close(cl.Messages)
delete(h.clients, cl)
}
case msg := <-h.LogMessages:
h.circbuf.Value = msg
h.circbuf = h.circbuf.Next()
for client := range h.clients {
select { // send without blocking
case client.Messages <- msg:
break
default:
break
}
}
}
}
}

67
main.go
View file

@ -1,67 +0,0 @@
package main
import (
"container/ring"
"flag"
"fmt"
"net/http"
syslog "gopkg.in/mcuadros/go-syslog.v2"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
var circbuf *ring.Ring
func syslogdHandler(channel syslog.LogPartsChannel) {
for logParts := range channel {
fmt.Println(logParts)
circbuf.Value = logParts
circbuf = circbuf.Next()
}
}
func httpHandler(w http.ResponseWriter, r *http.Request) {
circbuf.Do(func(x interface{}) {
if x == nil {
return
}
logmsg := x.(format.LogParts)
if logmsg["message"] == nil {
return
}
c := logmsg["message"].(string)
w.Write([]byte(c))
w.Write([]byte("\n"))
})
}
func main() {
syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses")
// dumpSocketPath := flag.String("dump-socket", "/run/buffer.sock", "The socket that user will connect to in order to receive logs")
bufsize := flag.Int("buffer-size", 1000, "Number of messages to keep")
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
flag.Parse()
channel := make(chan format.LogParts)
handler := syslog.NewChannelHandler(channel)
server := syslog.NewServer()
server.SetFormat(syslog.RFC5424)
server.SetHandler(handler)
if *syslogSocketPath != "" {
server.ListenUnixgram(*syslogSocketPath)
fmt.Printf("Binding socket `%s` [syslog]\n", *syslogSocketPath)
} else {
fmt.Printf("Binding address `%s` [syslog]\n", *syslogAddr)
server.ListenUDP(*syslogAddr)
}
circbuf = ring.New(*bufsize)
server.Boot()
go syslogdHandler(channel)
http.HandleFunc("/", httpHandler)
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
http.ListenAndServe(*queryAddr, nil)
server.Wait()
}