1
0
Fork 0
forked from boyska/circolog

Compare commits

..

No commits in common. "89c59e5713f3aae60f1fcf6675610b6e15b729ce" and "dbbbcb24a6a66c86ee6c9632f292fc416ec37eab" have entirely different histories.

8 changed files with 67 additions and 453 deletions

View file

@ -1,22 +0,0 @@
---
workspace:
base: /go
path: src/git.lattuga.net/boyska/circolog
pipeline:
build:
image: golang:${GO_VER}
commands:
- go get ./...
- go test
- go build
matrix:
GO_VER:
- 1.5
- 1.6
- 1.7
- 1.8
- 1.9
- 1.10
- latest

View file

@ -4,38 +4,3 @@ This is useful when you want to keep some (heavy detailed) log available, but yo
things to disk.
On your "main" syslog, send some message to this one!
## Integration examples
In these examples I'll refer to the usage of UNIX sockets. They are more secure than TCP/UDP sockets because
they have file permissions, they can be "masked" using mount namespaces, etc.
However, circlogd supports udp/tcp sockets easily, so that should not be an issue.
### syslog-ng
To integrate into syslog-ng, put this in `/etc/syslog-ng/conf.d/circolog.conf`
```
destination d_circolog {
unix-dgram("/run/circolog-syslog.sock"
flags(syslog-protocol)
);
};
log { source(s_src); destination(d_circolog); };
```
and run `circologd -syslogd-socket /run/circolog-syslog.sock -query-socket /run/circolog-query.sock`
## Client
`curl` might be enough of a client for most uses.
curl --unix-socket /run/circolog-query.sock localhost/
will give you everything that circologd has in memory
If you want to "follow" (as in `tail -f`) you need to use the websocket interface. However, I don't know of
any websocket client supporting UNIX domain socket, so you have two options:
1. wait until I write a proper `circolog-tail` client implementing it all
2. Use `circologd` with `-query-addr 127.0.0.1:9080`, add some iptables rule to prevent non-root to access that
port, and run `ws ws://localhost:9080/ws`. You'll get all the "backlog", and will follow new log messages.

View file

@ -1,87 +0,0 @@
package main
import (
"flag"
"fmt"
"log"
"net"
"net/url"
"os"
"os/signal"
"time"
"github.com/gorilla/websocket"
)
func main() {
addr := flag.String("addr", "localhost:9080", "http service address")
querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server")
flag.Parse()
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
var d *websocket.Dialer
u := url.URL{Scheme: "ws",
Host: *addr, // ignored in case of -socket; see the Dialer below
Path: "/ws",
}
if *querySocket != "" {
d = &websocket.Dialer{
NetDial: func(network, addr string) (net.Conn, error) {
return net.Dial("unix", *querySocket)
},
HandshakeTimeout: 45 * time.Second, // same as DefaultDialer
}
log.Printf("connecting to %s", *querySocket)
} else {
d = websocket.DefaultDialer
log.Printf("connecting to %s", *addr)
}
c, _, err := d.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
log.Println("connected!", u.String())
done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Println("close:", err)
return
}
fmt.Println(string(message))
}
}()
for {
select {
case <-done:
return
case <-interrupt:
log.Println("interrupt")
// Cleanly close the connection by sending a close message and then waiting (with timeout) for the
// server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("write close:", err)
return
}
select {
case <-done:
log.Println("Successfully close")
case <-time.After(1 * time.Second):
log.Println("Forced close")
}
return
}
}
}

View file

@ -1,114 +0,0 @@
package main
import (
"fmt"
"net/http"
"os"
"strconv"
"time"
"git.lattuga.net/boyska/circolog"
"github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
func setupHTTP(hub circolog.Hub) {
http.HandleFunc("/", getHTTPHandler(hub))
http.HandleFunc("/ws", getWSHandler(hub))
}
func getHTTPHandler(hub circolog.Hub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
client := circolog.Client{
Messages: make(chan format.LogParts, 20),
Nofollow: true}
hub.Register <- client
err := r.ParseForm()
if err != nil {
fmt.Fprintln(os.Stderr, "error parsing http request", err)
}
// Looking for known parameter in the request
// TODO: write specialized function
var requestMessageLen int
if reqL, ok := r.Form["l"]; ok {
var err error
switch {
case len(reqL) == 1:
requestMessageLen, err = strconv.Atoi(reqL[0])
if requestMessageLen <= 0 {
fmt.Fprintln(os.Stderr, "malformed request, l non positive:", requestMessageLen)
//requestMessageLen := 0
}
if err != nil {
fmt.Fprintln(os.Stderr, "malformed request on parameter l:", err)
}
case len(reqL) > 1:
requestMessageLen, err = strconv.Atoi(reqL[len(reqL)-1])
fmt.Fprintln(os.Stderr, "multiple values of l parameter, taking last:",
requestMessageLen)
if err != nil {
fmt.Fprintln(os.Stderr, "malformed request on parameter l:", err)
}
default:
fmt.Fprintln(os.Stderr, "empty parameter l in request")
}
}
i := 1
for x := range client.Messages {
w.Write([]byte(circolog.FormatSyslog(x)))
w.Write([]byte("\n"))
if requestMessageLen != 0 && i >= requestMessageLen {
break
}
i++
}
}
}
func getWSHandler(hub circolog.Hub) http.HandlerFunc {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
return func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
client := circolog.Client{Messages: make(chan format.LogParts, 20)}
hub.Register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go func(conn *websocket.Conn, c circolog.Client) {
defer func() {
hub.Unregister <- c
conn.Close()
}()
for {
select {
case message, ok := <-c.Messages:
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// The hub closed the channel.
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write([]byte(circolog.FormatSyslog(message)))
if err := w.Close(); err != nil {
return
}
// TODO: ticker/ping
}
}
}(conn, client)
}
}

View file

@ -1,63 +0,0 @@
package main
import (
"flag"
"fmt"
"net"
"net/http"
"os"
"git.lattuga.net/boyska/circolog"
syslog "gopkg.in/mcuadros/go-syslog.v2"
)
func main() {
var err error
syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses")
// dumpSocketPath := flag.String("dump-socket", "/run/buffer.sock", "The socket that user will connect to in order to receive logs")
bufsize := flag.Int("buffer-size", 1000, "Number of messages to keep")
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
querySocket := flag.String("query-socket", "", "Path to a unix domain socket for the HTTP server; recommended for security reasons!")
flag.Parse()
hub := circolog.NewHub(*bufsize)
handler := syslog.NewChannelHandler(hub.LogMessages)
server := syslog.NewServer()
server.SetFormat(syslog.RFC5424)
server.SetHandler(handler)
if *syslogSocketPath != "" {
if err = server.ListenUnixgram(*syslogSocketPath); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
fmt.Printf("Binding socket `%s` [syslog]\n", *syslogSocketPath)
} else {
fmt.Printf("Binding address `%s` [syslog]\n", *syslogAddr)
if err = server.ListenUDP(*syslogAddr); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
}
if err = server.Boot(); err != nil {
fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1)
}
go hub.Run()
setupHTTP(hub)
if *querySocket != "" {
fmt.Printf("Binding address `%s` [http]\n", *querySocket)
unixListener, err := net.Listen("unix", *querySocket)
if err != nil {
fmt.Fprintln(os.Stderr, "Error binding HTTP unix domain socket", err)
}
defer os.Remove(*querySocket)
http.Serve(unixListener, nil)
} else {
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
http.ListenAndServe(*queryAddr, nil)
}
server.Wait()
}

View file

@ -1,39 +0,0 @@
package circolog
import (
"bytes"
"text/template"
"time"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
// Formatter is an interface, so that multiple implementations can exist
type Formatter func(format.LogParts) string
var tmplFuncs template.FuncMap
var syslogTmpl *template.Template
func init() {
tmplFuncs := template.FuncMap{
"dateFormat": func(dt time.Time, fmt string) string {
return dt.Format(fmt)
},
"rfc822": func(dt time.Time) string {
return dt.Format(time.RFC822)
},
}
syslogTmpl = template.Must(template.New("syslog").Funcs(tmplFuncs).Parse(
"{{rfc822 (index . \"timestamp\")}} {{index . \"hostname\"}} " +
"{{index . \"app_name\"}}" +
"{{ if (ne (index . \"proc_id\") \"-\")}}[{{index . \"proc_id\"}}]{{end}}: " +
"{{index . \"message\"}}",
))
}
// FormatSyslog format a message in the typical format used in /var/log/messages
func FormatSyslog(msg format.LogParts) string {
var buf bytes.Buffer
syslogTmpl.Execute(&buf, msg)
return buf.String()
}

93
hub.go
View file

@ -1,93 +0,0 @@
package circolog
import (
"container/ring"
"time"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
// Client represent a client connected via websocket. Its most important field is the messages channel, where
// new messages are sent.
type Client struct {
Messages chan format.LogParts // only hub should write/close this
Nofollow bool // if Nofollow is true, the hub will not keep this client permanently. Rather, it will send every message to "Messages" and close the channel. Use this if you want to get the messages one-shot
}
// The Hub is the central "registry"; it keeps both the data storage and clients notifications
//
// The channel "register" and "unregister" can be seen as "command"
// keep in mind that "registering" is what you do also to get messages in a one-time fashion. In fact, Client
// has "options", such as Nofollow, to explain the Hub what should be given
type Hub struct {
Register chan Client
Unregister chan Client
LogMessages chan format.LogParts
clients map[Client]bool
circbuf *ring.Ring
}
// NewHub creates an empty hub
func NewHub(ringBufSize int) Hub {
return Hub{clients: make(map[Client]bool),
Register: make(chan Client),
Unregister: make(chan Client),
LogMessages: make(chan format.LogParts),
circbuf: ring.New(ringBufSize),
}
}
func (h *Hub) register(cl Client) {
if _, ok := h.clients[cl]; !ok {
if !cl.Nofollow { // we won't need it in future
h.clients[cl] = true
}
circbufDoExit := false
h.circbuf.Do(func(x interface{}) {
if circbufDoExit {
return
}
if x != nil {
select { // send with short timeout
case cl.Messages <- x.(format.LogParts):
break
case <-time.After(500 * time.Millisecond):
circbufDoExit = true
break
}
}
})
if cl.Nofollow {
close(cl.Messages)
}
}
}
// Run is hub main loop; keeps everything going
func (h *Hub) Run() {
for {
select {
case cl := <-h.Register:
h.register(cl)
case cl := <-h.Unregister:
_, ok := h.clients[cl]
if ok {
close(cl.Messages)
delete(h.clients, cl)
}
case msg := <-h.LogMessages:
h.circbuf.Value = msg
h.circbuf = h.circbuf.Next()
for client := range h.clients {
select { // send without blocking
case client.Messages <- msg:
break
default:
break
}
}
}
}
}

67
main.go Normal file
View file

@ -0,0 +1,67 @@
package main
import (
"container/ring"
"flag"
"fmt"
"net/http"
syslog "gopkg.in/mcuadros/go-syslog.v2"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
var circbuf *ring.Ring
func syslogdHandler(channel syslog.LogPartsChannel) {
for logParts := range channel {
fmt.Println(logParts)
circbuf.Value = logParts
circbuf = circbuf.Next()
}
}
func httpHandler(w http.ResponseWriter, r *http.Request) {
circbuf.Do(func(x interface{}) {
if x == nil {
return
}
logmsg := x.(format.LogParts)
if logmsg["message"] == nil {
return
}
c := logmsg["message"].(string)
w.Write([]byte(c))
w.Write([]byte("\n"))
})
}
func main() {
syslogSocketPath := flag.String("syslogd-socket", "", "The socket to listen to syslog addresses")
// dumpSocketPath := flag.String("dump-socket", "/run/buffer.sock", "The socket that user will connect to in order to receive logs")
bufsize := flag.Int("buffer-size", 1000, "Number of messages to keep")
syslogAddr := flag.String("syslog-addr", "127.0.0.1:9514", "Address:port where to listen for syslog messages")
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
flag.Parse()
channel := make(chan format.LogParts)
handler := syslog.NewChannelHandler(channel)
server := syslog.NewServer()
server.SetFormat(syslog.RFC5424)
server.SetHandler(handler)
if *syslogSocketPath != "" {
server.ListenUnixgram(*syslogSocketPath)
fmt.Printf("Binding socket `%s` [syslog]\n", *syslogSocketPath)
} else {
fmt.Printf("Binding address `%s` [syslog]\n", *syslogAddr)
server.ListenUDP(*syslogAddr)
}
circbuf = ring.New(*bufsize)
server.Boot()
go syslogdHandler(channel)
http.HandleFunc("/", httpHandler)
fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
http.ListenAndServe(*queryAddr, nil)
server.Wait()
}