123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- package main
- import (
- "bytes"
- "encoding/json"
- "flag"
- "fmt"
- "io/ioutil"
- "net/http"
- "os"
- "os/exec"
- "path"
- "time"
- "git.lattuga.net/avana/sciacquone/crawler"
- "gopkg.in/mgo.v2/bson"
- )
- /*
- This command act as an HTTP wrapper for a "generic" worker. This is useful to be able to write a worker that can:
- - be written in every language
- - not care about HTTP
- - not care about job queues, concurrency, etc.
- crawlerWrapper will handle ONE job at a time. While it might be changed in the future to overcome this limitation, this is right now hardcoded.
- */
- // cmd is the cmdline to run on each POST
- var cmd []string
- // jobsQueue is shared between submitHandler and worker
- var jobsQueue chan crawler.JobSubmission
- // this httpClient is global without a very specific need to be so
- // a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line
- // flag
- var httpClient *http.Client
- var pingbackRetries int
- // submitHandler handles HTTP POSTs to /submit, enqueing jobs
- func submitHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- w.WriteHeader(http.StatusMethodNotAllowed)
- w.Write([]byte("Only POST accepted"))
- return
- }
- var request crawler.JobSubmission
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- w.WriteHeader(http.StatusBadRequest)
- w.Write([]byte("Cannot read your request\n"))
- w.Write([]byte(err.Error()))
- return
- }
- if err = json.Unmarshal(body, &request); err != nil {
- w.WriteHeader(http.StatusBadRequest)
- w.Write([]byte("Cannot parse your request:\n "))
- w.Write([]byte(err.Error()))
- return
- }
- select {
- case jobsQueue <- request: // enqueued successfully
- w.WriteHeader(http.StatusOK)
- w.Write([]byte("OK\n"))
- default: // not enqueued; typically, this means "the queue is full"
- w.Header().Set("Retry-After", "120") // 2 minutes
- w.WriteHeader(http.StatusServiceUnavailable)
- w.Write([]byte("Crawler queue is full! retry later\n"))
- }
- }
- func init() {
- httpClient = &http.Client{
- Timeout: time.Second * 10,
- }
- pingbackRetries = 3
- }
- func main() {
- queueSize := flag.Int("queue-size", 100, "Queue size; determines memory usage and ability to handle burst")
- // TODO: rate limit options
- flag.Usage = func() {
- fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0])
- fmt.Fprintf(flag.CommandLine.Output(), "%s [options] command [arg]...\n", path.Base(os.Args[0]))
- flag.PrintDefaults()
- }
- flag.Parse()
- jobsQueue = make(chan crawler.JobSubmission, *queueSize)
- pingQueue := make(chan pingback, *queueSize)
- cmd = flag.Args()
- if len(cmd) == 0 {
- fmt.Fprintln(os.Stderr, "Error: a command must be provided")
- os.Exit(2)
- }
- http.HandleFunc("/submit", submitHandler)
- fmt.Println("submitted")
- go jobsRunner(jobsQueue, pingQueue)
- go pingerDispatcher(pingQueue)
- err := http.ListenAndServe("localhost:8123", nil)
- if err != nil {
- fmt.Println(err)
- os.Exit(1)
- }
- }
- type pingback struct {
- PingbackURL crawler.URL
- Response crawler.JobCompletionNotice
- }
- // jobsRunner fetches jobs from a chan, run the user-specified worker with the job-specified stdin and,
- // if asked so, ask the pingerDispatcher to make the appropriate ping
- func jobsRunner(reqs <-chan crawler.JobSubmission, pings chan<- pingback) {
- for {
- req := <-reqs
- serializedJob, err := json.Marshal(req.Job)
- if err != nil {
- fmt.Fprintln(os.Stderr, "Error: encoding job", err)
- continue
- }
- c := exec.Cmd{}
- c.Path = cmd[0]
- c.Args = cmd
- c.Stdin = bytes.NewReader(serializedJob)
- var cmdout bytes.Buffer
- c.Stdout = &cmdout
- fmt.Println("lancio", cmd, string(serializedJob))
- fmt.Println("lancio", c)
- cmderr := c.Run()
- if cmderr != nil {
- fmt.Fprintln(os.Stderr, "Error: command errored for job", req.RequestID)
- fmt.Fprintln(os.Stderr, cmderr)
- } else {
- fmt.Printf("job output '%s'\n", string(cmdout.String()))
- }
- if req.ResponseRequested {
- pings <- pingback{
- PingbackURL: req.PingbackURL,
- Response: crawler.JobCompletionNotice{
- Error: cmderr,
- RequestID: req.RequestID,
- Other: bson.M{"stdout": cmdout.String()},
- }}
- }
- time.Sleep(1 * time.Second)
- }
- }
- // pingerDispatcher fetch info about completed jobs from the chan "pings" and run pingWorkers
- func pingerDispatcher(pings <-chan pingback) {
- for {
- ping := <-pings
- // this is asynchronous; which means that many pingWorker might be running at the same time
- // this is the case if submitting JobCompletionNotice is slower than receiving (and handling)
- // JobSubmission. This really shouldn't be the case.
- go pingWorker(ping)
- }
- }
- // a pingWorker will POST to PingbackURL. It will attempt retries in (certain) case of error
- //
- // Network errors are currently the only one to be retried; in the future, HTTP status codes shall be
- // considered to decide whether or not to retry
- func pingWorker(ping pingback) {
- for i := 0; i < pingbackRetries; i++ {
- serialized, err := json.Marshal(ping.Response)
- if err != nil {
- fmt.Fprintln(os.Stderr, "Error: could not serialize pingback response")
- fmt.Fprintln(os.Stderr, err)
- return
- }
- resp, err := httpClient.Post(ping.PingbackURL.String(), "application/json",
- bytes.NewBuffer(serialized))
- if err == nil && resp.StatusCode == http.StatusOK {
- return
- }
- if err != nil {
- fmt.Fprintf(os.Stderr, "Error: could not pingback response (attempt %d/%d)\n", i+1, pingbackRetries)
- fmt.Fprintln(os.Stderr, err)
- // retry after 10 seconds
- time.Sleep(10 * time.Second)
- continue
- }
- if resp.StatusCode != http.StatusOK {
- fmt.Fprintln(os.Stderr, "Error: caller errored on pingback:", resp.Status)
- // TODO: statuscode should be better checked to understand if a retry should be done
- // in that
- return
- }
- }
- }
|