1
0
Fork 0
forked from boyska/circolog

refactor: no globals

it still isn't very clean; globals are avoided with factories, too many
pieces with no clear role
This commit is contained in:
boyska 2018-08-23 00:34:02 +02:00
parent b1b83f488e
commit 97fd191f0e
2 changed files with 93 additions and 82 deletions

80
http.go Normal file
View file

@ -0,0 +1,80 @@
package main
import (
"container/ring"
"net/http"
"time"
"github.com/gorilla/websocket"
"gopkg.in/mcuadros/go-syslog.v2/format"
)
func setupHttp(circbuf *ring.Ring, hub Hub) {
http.HandleFunc("/", getHTTPHandler(circbuf))
http.HandleFunc("/ws", getWSHandler(hub))
}
func getHTTPHandler(circbuf *ring.Ring) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
circbuf.Do(func(x interface{}) {
if x == nil {
return
}
logmsg := x.(format.LogParts)
if logmsg["message"] == nil {
return
}
c := logmsg["message"].(string)
w.Write([]byte(c))
w.Write([]byte("\n"))
})
}
}
func getWSHandler(hub Hub) http.HandlerFunc {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
return func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
client := Client{messages: make(chan format.LogParts)}
hub.Register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go func(conn *websocket.Conn, c Client) {
defer func() {
hub.Unregister <- c
conn.Close()
}()
for {
select {
case message, ok := <-c.messages:
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// The hub closed the channel.
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
if msg, ok := message["message"]; ok {
w.Write([]byte(msg.(string)))
}
if err := w.Close(); err != nil {
return
}
// TODO: ticker/ping
}
}
}(conn, client)
}
}

83
main.go
View file

@ -6,89 +6,20 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
"time"
"github.com/gorilla/websocket"
syslog "gopkg.in/mcuadros/go-syslog.v2" syslog "gopkg.in/mcuadros/go-syslog.v2"
"gopkg.in/mcuadros/go-syslog.v2/format" "gopkg.in/mcuadros/go-syslog.v2/format"
) )
var circbuf *ring.Ring func getSyslogdHandler(circbuf *ring.Ring, hub Hub) func(channel syslog.LogPartsChannel) {
var hub Hub return func(channel syslog.LogPartsChannel) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func init() {
hub = NewHub()
}
func syslogdHandler(channel syslog.LogPartsChannel) {
for logParts := range channel { for logParts := range channel {
hub.logMessages <- logParts hub.logMessages <- logParts
fmt.Println(logParts) fmt.Println(logParts)
circbuf.Value = logParts circbuf.Value = logParts
circbuf = circbuf.Next() circbuf = circbuf.Next()
} }
}
func httpHandler(w http.ResponseWriter, r *http.Request) {
circbuf.Do(func(x interface{}) {
if x == nil {
return
} }
logmsg := x.(format.LogParts)
if logmsg["message"] == nil {
return
}
c := logmsg["message"].(string)
w.Write([]byte(c))
w.Write([]byte("\n"))
})
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
httpHandler(w, r)
client := Client{messages: make(chan format.LogParts)}
hub.Register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go func(conn *websocket.Conn, c Client) {
defer func() {
hub.Unregister <- c
conn.Close()
}()
for {
select {
case message, ok := <-c.messages:
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// The hub closed the channel.
conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
if msg, ok := message["message"]; ok {
w.Write([]byte(msg.(string)))
}
if err := w.Close(); err != nil {
return
}
// TODO: ticker/ping
}
}
}(conn, client)
} }
func main() { func main() {
@ -100,6 +31,8 @@ func main() {
queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service") queryAddr := flag.String("query-addr", "127.0.0.1:9080", "Address:port where to bind the query service")
flag.Parse() flag.Parse()
var hub Hub
hub = NewHub()
channel := make(chan format.LogParts) channel := make(chan format.LogParts)
handler := syslog.NewChannelHandler(channel) handler := syslog.NewChannelHandler(channel)
@ -119,18 +52,16 @@ func main() {
os.Exit(1) os.Exit(1)
} }
} }
circbuf = ring.New(*bufsize) circbuf := ring.New(*bufsize)
if err = server.Boot(); err != nil { if err = server.Boot(); err != nil {
fmt.Fprintln(os.Stderr, "argh", err) fmt.Fprintln(os.Stderr, "argh", err)
os.Exit(1) os.Exit(1)
} }
go hub.Run() go hub.Run()
go syslogdHandler(channel) go getSyslogdHandler(circbuf, hub)(channel)
http.HandleFunc("/", httpHandler) setupHttp(circbuf, hub)
http.HandleFunc("/ws", wsHandler)
fmt.Printf("Binding address `%s` [http]\n", *queryAddr) fmt.Printf("Binding address `%s` [http]\n", *queryAddr)
http.ListenAndServe(*queryAddr, nil) http.ListenAndServe(*queryAddr, nil)
server.Wait() server.Wait()
} }