wrapper.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "io/ioutil"
  8. "net/http"
  9. "os"
  10. "os/exec"
  11. "path"
  12. "time"
  13. "git.lattuga.net/avana/sciacquone/crawler"
  14. "gopkg.in/mgo.v2/bson"
  15. )
  16. /*
  17. This command act as an HTTP wrapper for a "generic" worker. This is useful to be able to write a worker that can:
  18. - be written in every language
  19. - not care about HTTP
  20. - not care about job queues, concurrency, etc.
  21. crawlerWrapper will handle ONE job at a time. While it might be changed in the future to overcome this limitation, this is right now hardcoded.
  22. */
  23. // cmd is the cmdline to run on each POST
  24. var cmd []string
  25. // jobsQueue is shared between submitHandler and worker
  26. var jobsQueue chan crawler.JobSubmission
  27. // this httpClient is global without a very specific need to be so
  28. // a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line
  29. // flag
  30. var httpClient *http.Client
  31. var pingbackRetries int
  32. // submitHandler handles HTTP POSTs to /submit, enqueing jobs
  33. func submitHandler(w http.ResponseWriter, r *http.Request) {
  34. if r.Method != "POST" {
  35. w.WriteHeader(http.StatusMethodNotAllowed)
  36. w.Write([]byte("Only POST accepted"))
  37. return
  38. }
  39. var request crawler.JobSubmission
  40. body, err := ioutil.ReadAll(r.Body)
  41. if err != nil {
  42. w.WriteHeader(http.StatusBadRequest)
  43. w.Write([]byte("Cannot read your request\n"))
  44. w.Write([]byte(err.Error()))
  45. return
  46. }
  47. if err = json.Unmarshal(body, &request); err != nil {
  48. w.WriteHeader(http.StatusBadRequest)
  49. w.Write([]byte("Cannot parse your request:\n "))
  50. w.Write([]byte(err.Error()))
  51. return
  52. }
  53. select {
  54. case jobsQueue <- request: // enqueued successfully
  55. w.WriteHeader(http.StatusOK)
  56. w.Write([]byte("OK\n"))
  57. default: // not enqueued; typically, this means "the queue is full"
  58. w.Header().Set("Retry-After", "120") // 2 minutes
  59. w.WriteHeader(http.StatusServiceUnavailable)
  60. w.Write([]byte("Crawler queue is full! retry later\n"))
  61. }
  62. }
  63. func init() {
  64. httpClient = &http.Client{
  65. Timeout: time.Second * 10,
  66. }
  67. pingbackRetries = 3
  68. }
  69. func main() {
  70. queueSize := flag.Int("queue-size", 100, "Queue size; determines memory usage and ability to handle burst")
  71. // TODO: rate limit options
  72. flag.Usage = func() {
  73. fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0])
  74. fmt.Fprintf(flag.CommandLine.Output(), "%s [options] command [arg]...\n", path.Base(os.Args[0]))
  75. flag.PrintDefaults()
  76. }
  77. flag.Parse()
  78. jobsQueue = make(chan crawler.JobSubmission, *queueSize)
  79. pingQueue := make(chan pingback, *queueSize)
  80. cmd = flag.Args()
  81. if len(cmd) == 0 {
  82. fmt.Fprintln(os.Stderr, "Error: a command must be provided")
  83. os.Exit(2)
  84. }
  85. http.HandleFunc("/submit", submitHandler)
  86. fmt.Println("submitted")
  87. go jobsRunner(jobsQueue, pingQueue)
  88. go pingerDispatcher(pingQueue)
  89. err := http.ListenAndServe("localhost:8123", nil)
  90. if err != nil {
  91. fmt.Println(err)
  92. os.Exit(1)
  93. }
  94. }
  95. type pingback struct {
  96. PingbackURL crawler.URL
  97. Response crawler.JobCompletionNotice
  98. }
  99. // jobsRunner fetches jobs from a chan, run the user-specified worker with the job-specified stdin and,
  100. // if asked so, ask the pingerDispatcher to make the appropriate ping
  101. func jobsRunner(reqs <-chan crawler.JobSubmission, pings chan<- pingback) {
  102. for {
  103. req := <-reqs
  104. serializedJob, err := json.Marshal(req.Job)
  105. if err != nil {
  106. fmt.Fprintln(os.Stderr, "Error: encoding job", err)
  107. continue
  108. }
  109. c := exec.Cmd{}
  110. c.Path = cmd[0]
  111. c.Args = cmd
  112. c.Stdin = bytes.NewReader(serializedJob)
  113. var cmdout bytes.Buffer
  114. c.Stdout = &cmdout
  115. fmt.Println("lancio", cmd, string(serializedJob))
  116. fmt.Println("lancio", c)
  117. cmderr := c.Run()
  118. if cmderr != nil {
  119. fmt.Fprintln(os.Stderr, "Error: command errored for job", req.RequestID)
  120. fmt.Fprintln(os.Stderr, cmderr)
  121. } else {
  122. fmt.Printf("job output '%s'\n", string(cmdout.String()))
  123. }
  124. if req.ResponseRequested {
  125. pings <- pingback{
  126. PingbackURL: req.PingbackURL,
  127. Response: crawler.JobCompletionNotice{
  128. Error: cmderr,
  129. RequestID: req.RequestID,
  130. Other: bson.M{"stdout": cmdout.String()},
  131. }}
  132. }
  133. time.Sleep(1 * time.Second)
  134. }
  135. }
  136. // pingerDispatcher fetch info about completed jobs from the chan "pings" and run pingWorkers
  137. func pingerDispatcher(pings <-chan pingback) {
  138. for {
  139. ping := <-pings
  140. // this is asynchronous; which means that many pingWorker might be running at the same time
  141. // this is the case if submitting JobCompletionNotice is slower than receiving (and handling)
  142. // JobSubmission. This really shouldn't be the case.
  143. go pingWorker(ping)
  144. }
  145. }
  146. // a pingWorker will POST to PingbackURL. It will attempt retries in (certain) case of error
  147. //
  148. // Network errors are currently the only one to be retried; in the future, HTTP status codes shall be
  149. // considered to decide whether or not to retry
  150. func pingWorker(ping pingback) {
  151. for i := 0; i < pingbackRetries; i++ {
  152. serialized, err := json.Marshal(ping.Response)
  153. if err != nil {
  154. fmt.Fprintln(os.Stderr, "Error: could not serialize pingback response")
  155. fmt.Fprintln(os.Stderr, err)
  156. return
  157. }
  158. resp, err := httpClient.Post(ping.PingbackURL.String(), "application/json",
  159. bytes.NewBuffer(serialized))
  160. if err == nil && resp.StatusCode == http.StatusOK {
  161. return
  162. }
  163. if err != nil {
  164. fmt.Fprintf(os.Stderr, "Error: could not pingback response (attempt %d/%d)\n", i+1, pingbackRetries)
  165. fmt.Fprintln(os.Stderr, err)
  166. // retry after 10 seconds
  167. time.Sleep(10 * time.Second)
  168. continue
  169. }
  170. if resp.StatusCode != http.StatusOK {
  171. fmt.Fprintln(os.Stderr, "Error: caller errored on pingback:", resp.Status)
  172. // TODO: statuscode should be better checked to understand if a retry should be done
  173. // in that
  174. return
  175. }
  176. }
  177. }