main.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "net"
  7. "net/url"
  8. "os"
  9. "os/signal"
  10. "strconv"
  11. "time"
  12. "git.lattuga.net/boyska/circolog/filtering"
  13. "git.lattuga.net/boyska/circolog/formatter"
  14. "github.com/gorilla/websocket"
  15. "gopkg.in/mcuadros/go-syslog.v2/format"
  16. "gopkg.in/mgo.v2/bson"
  17. )
  18. func main() {
  19. addr := flag.String("addr", "localhost:9080", "http service address")
  20. querySocket := flag.String("socket", "", "Path to a unix domain socket for the HTTP server")
  21. backlogLimit := flag.Int("n", -1, "Limit the backlog length, defaults to no limit (-1)")
  22. var filter filtering.ExprValue
  23. flag.Var(&filter, "where", "sql-like query to filter logs")
  24. flag.Parse()
  25. interrupt := make(chan os.Signal, 1)
  26. signal.Notify(interrupt, os.Interrupt)
  27. var d *websocket.Dialer
  28. u := url.URL{Scheme: "ws",
  29. Host: *addr, // ignored in case of -socket; see the Dialer below
  30. Path: "/ws",
  31. }
  32. q := u.Query()
  33. q.Set("fmt", "bson")
  34. if *backlogLimit >= 0 {
  35. q.Set("l", strconv.Itoa(*backlogLimit))
  36. }
  37. u.RawQuery = q.Encode()
  38. if *querySocket != "" {
  39. d = &websocket.Dialer{
  40. NetDial: func(network, addr string) (net.Conn, error) {
  41. return net.Dial("unix", *querySocket)
  42. },
  43. HandshakeTimeout: 45 * time.Second, // same as DefaultDialer
  44. }
  45. log.Printf("connecting to %s", *querySocket)
  46. } else {
  47. d = websocket.DefaultDialer
  48. log.Printf("connecting to %s", *addr)
  49. }
  50. c, _, err := d.Dial(u.String(), nil)
  51. if err != nil {
  52. log.Fatal("dial:", err)
  53. }
  54. defer c.Close()
  55. log.Println("connected!", u.String())
  56. done := make(chan struct{})
  57. go func() {
  58. defer close(done)
  59. for {
  60. _, serialized, err := c.ReadMessage()
  61. if err != nil {
  62. log.Println("close:", err)
  63. return
  64. }
  65. var parsed format.LogParts
  66. if err := bson.Unmarshal(serialized, &parsed); err != nil {
  67. log.Println("invalid BSON", err)
  68. continue
  69. }
  70. if !filter.Validate(parsed) {
  71. continue
  72. }
  73. if err := formatter.WriteFormatted(os.Stdout, formatter.FormatSyslog, parsed); err != nil {
  74. log.Println("error printing", err)
  75. }
  76. fmt.Println()
  77. }
  78. }()
  79. for {
  80. select {
  81. case <-done:
  82. return
  83. case <-interrupt:
  84. log.Println("interrupt")
  85. // Cleanly close the connection by sending a close message and then waiting (with timeout) for the
  86. // server to close the connection.
  87. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  88. if err != nil {
  89. log.Println("write close:", err)
  90. return
  91. }
  92. select {
  93. case <-done:
  94. log.Println("Successfully close")
  95. case <-time.After(1 * time.Second):
  96. log.Println("Forced close")
  97. }
  98. return
  99. }
  100. }
  101. }