package main import ( "bytes" "encoding/json" "flag" "fmt" "io/ioutil" "net/http" "os" "os/exec" "path" "time" "git.lattuga.net/avana/sciacquone/crawler" "gopkg.in/mgo.v2/bson" ) /* This command act as an HTTP wrapper for a "generic" worker. This is useful to be able to write a worker that can: - be written in every language - not care about HTTP - not care about job queues, concurrency, etc. crawlerWrapper will handle ONE job at a time. While it might be changed in the future to overcome this limitation, this is right now hardcoded. */ // cmd is the cmdline to run on each POST var cmd []string // jobsQueue is shared between submitHandler and worker var jobsQueue chan crawler.JobSubmission // this httpClient is global without a very specific need to be so // a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line // flag var httpClient *http.Client var pingbackRetries int // submitHandler handles HTTP POSTs to /submit, enqueing jobs func submitHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) w.Write([]byte("Only POST accepted")) return } var request crawler.JobSubmission body, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("Cannot read your request\n")) w.Write([]byte(err.Error())) return } if err = json.Unmarshal(body, &request); err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("Cannot parse your request:\n ")) w.Write([]byte(err.Error())) return } select { case jobsQueue <- request: // enqueued successfully w.WriteHeader(http.StatusOK) w.Write([]byte("OK\n")) default: // not enqueued; typically, this means "the queue is full" w.Header().Set("Retry-After", "120") // 2 minutes w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte("Crawler queue is full! retry later\n")) } } func init() { httpClient = &http.Client{ Timeout: time.Second * 10, } pingbackRetries = 3 } func main() { queueSize := flag.Int("queue-size", 100, "Queue size; determines memory usage and ability to handle burst") // TODO: rate limit options flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0]) fmt.Fprintf(flag.CommandLine.Output(), "%s [options] command [arg]...\n", path.Base(os.Args[0])) flag.PrintDefaults() } flag.Parse() jobsQueue = make(chan crawler.JobSubmission, *queueSize) pingQueue := make(chan pingback, *queueSize) cmd = flag.Args() if len(cmd) == 0 { fmt.Fprintln(os.Stderr, "Error: a command must be provided") os.Exit(2) } http.HandleFunc("/submit", submitHandler) fmt.Println("submitted") go jobsRunner(jobsQueue, pingQueue) go pingerDispatcher(pingQueue) err := http.ListenAndServe("localhost:8123", nil) if err != nil { fmt.Println(err) os.Exit(1) } } type pingback struct { PingbackURL crawler.URL Response crawler.JobCompletionNotice } // jobsRunner fetches jobs from a chan, run the user-specified worker with the job-specified stdin and, // if asked so, ask the pingerDispatcher to make the appropriate ping func jobsRunner(reqs <-chan crawler.JobSubmission, pings chan<- pingback) { for { req := <-reqs serializedJob, err := json.Marshal(req.Job) if err != nil { fmt.Fprintln(os.Stderr, "Error: encoding job", err) continue } c := exec.Cmd{} c.Path = cmd[0] c.Args = cmd c.Stdin = bytes.NewReader(serializedJob) var cmdout bytes.Buffer c.Stdout = &cmdout fmt.Println("lancio", cmd, string(serializedJob)) fmt.Println("lancio", c) cmderr := c.Run() if cmderr != nil { fmt.Fprintln(os.Stderr, "Error: command errored for job", req.RequestID) fmt.Fprintln(os.Stderr, cmderr) } else { fmt.Printf("job output '%s'\n", string(cmdout.String())) } if req.ResponseRequested { pings <- pingback{ PingbackURL: req.PingbackURL, Response: crawler.JobCompletionNotice{ Error: cmderr, RequestID: req.RequestID, Other: bson.M{"stdout": cmdout.String()}, }} } time.Sleep(1 * time.Second) } } // pingerDispatcher fetch info about completed jobs from the chan "pings" and run pingWorkers func pingerDispatcher(pings <-chan pingback) { for { ping := <-pings // this is asynchronous; which means that many pingWorker might be running at the same time // this is the case if submitting JobCompletionNotice is slower than receiving (and handling) // JobSubmission. This really shouldn't be the case. go pingWorker(ping) } } // a pingWorker will POST to PingbackURL. It will attempt retries in (certain) case of error // // Network errors are currently the only one to be retried; in the future, HTTP status codes shall be // considered to decide whether or not to retry func pingWorker(ping pingback) { for i := 0; i < pingbackRetries; i++ { serialized, err := json.Marshal(ping.Response) if err != nil { fmt.Fprintln(os.Stderr, "Error: could not serialize pingback response") fmt.Fprintln(os.Stderr, err) return } resp, err := httpClient.Post(ping.PingbackURL.String(), "application/json", bytes.NewBuffer(serialized)) if err == nil && resp.StatusCode == http.StatusOK { return } if err != nil { fmt.Fprintf(os.Stderr, "Error: could not pingback response (attempt %d/%d)\n", i+1, pingbackRetries) fmt.Fprintln(os.Stderr, err) // retry after 10 seconds time.Sleep(10 * time.Second) continue } if resp.StatusCode != http.StatusOK { fmt.Fprintln(os.Stderr, "Error: caller errored on pingback:", resp.Status) // TODO: statuscode should be better checked to understand if a retry should be done // in that return } } }