1
0
Fork 0
forked from avana/sciacquone
sciacquone/cmd/crawlerWrapper/wrapper.go

196 lines
5.6 KiB
Go

package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path"
"time"
"git.lattuga.net/avana/sciacquone/crawler"
"gopkg.in/mgo.v2/bson"
)
/*
This command act as an HTTP wrapper for a "generic" worker. This is useful to be able to write a worker that can:
- be written in every language
- not care about HTTP
- not care about job queues, concurrency, etc.
crawlerWrapper will handle ONE job at a time. While it might be changed in the future to overcome this limitation, this is right now hardcoded.
*/
// cmd is the cmdline to run on each POST
var cmd []string
// jobsQueue is shared between submitHandler and worker
var jobsQueue chan crawler.JobSubmission
// this httpClient is global without a very specific need to be so
// a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line
// flag
var httpClient *http.Client
var pingbackRetries int
// submitHandler handles HTTP POSTs to /submit, enqueing jobs
func submitHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte("Only POST accepted"))
return
}
var request crawler.JobSubmission
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Cannot read your request\n"))
w.Write([]byte(err.Error()))
return
}
if err = json.Unmarshal(body, &request); err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Cannot parse your request:\n "))
w.Write([]byte(err.Error()))
return
}
select {
case jobsQueue <- request: // enqueued successfully
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK\n"))
default: // not enqueued; typically, this means "the queue is full"
w.Header().Set("Retry-After", "120") // 2 minutes
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("Crawler queue is full! retry later\n"))
}
}
func init() {
httpClient = &http.Client{
Timeout: time.Second * 10,
}
pingbackRetries = 3
}
func main() {
queueSize := flag.Int("queue-size", 100, "Queue size; determines memory usage and ability to handle burst")
// TODO: rate limit options
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0])
fmt.Fprintf(flag.CommandLine.Output(), "%s [options] command [arg]...\n", path.Base(os.Args[0]))
flag.PrintDefaults()
}
flag.Parse()
jobsQueue = make(chan crawler.JobSubmission, *queueSize)
pingQueue := make(chan pingback, *queueSize)
cmd = flag.Args()
if len(cmd) == 0 {
fmt.Fprintln(os.Stderr, "Error: a command must be provided")
os.Exit(2)
}
http.HandleFunc("/submit", submitHandler)
fmt.Println("submitted")
go jobsRunner(jobsQueue, pingQueue)
go pingerDispatcher(pingQueue)
err := http.ListenAndServe("localhost:8123", nil)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}
type pingback struct {
PingbackURL crawler.URL
Response crawler.JobCompletionNotice
}
// jobsRunner fetches jobs from a chan, run the user-specified worker with the job-specified stdin and,
// if asked so, ask the pingerDispatcher to make the appropriate ping
func jobsRunner(reqs <-chan crawler.JobSubmission, pings chan<- pingback) {
for {
req := <-reqs
serializedJob, err := json.Marshal(req.Job)
if err != nil {
fmt.Fprintln(os.Stderr, "Error: encoding job", err)
continue
}
c := exec.Cmd{}
c.Path = cmd[0]
c.Args = cmd
c.Stdin = bytes.NewReader(serializedJob)
var cmdout bytes.Buffer
c.Stdout = &cmdout
fmt.Println("lancio", cmd, string(serializedJob))
fmt.Println("lancio", c)
cmderr := c.Run()
if cmderr != nil {
fmt.Fprintln(os.Stderr, "Error: command errored for job", req.RequestID)
fmt.Fprintln(os.Stderr, cmderr)
} else {
fmt.Printf("job output '%s'\n", string(cmdout.String()))
}
if req.ResponseRequested {
pings <- pingback{
PingbackURL: req.PingbackURL,
Response: crawler.JobCompletionNotice{
Error: cmderr,
RequestID: req.RequestID,
Other: bson.M{"stdout": cmdout.String()},
}}
}
time.Sleep(1 * time.Second)
}
}
// pingerDispatcher fetch info about completed jobs from the chan "pings" and run pingWorkers
func pingerDispatcher(pings <-chan pingback) {
for {
ping := <-pings
// this is asynchronous; which means that many pingWorker might be running at the same time
// this is the case if submitting JobCompletionNotice is slower than receiving (and handling)
// JobSubmission. This really shouldn't be the case.
go pingWorker(ping)
}
}
// a pingWorker will POST to PingbackURL. It will attempt retries in (certain) case of error
//
// Network errors are currently the only one to be retried; in the future, HTTP status codes shall be
// considered to decide whether or not to retry
func pingWorker(ping pingback) {
for i := 0; i < pingbackRetries; i++ {
serialized, err := json.Marshal(ping.Response)
if err != nil {
fmt.Fprintln(os.Stderr, "Error: could not serialize pingback response")
fmt.Fprintln(os.Stderr, err)
return
}
resp, err := httpClient.Post(ping.PingbackURL.String(), "application/json",
bytes.NewBuffer(serialized))
if err == nil && resp.StatusCode == http.StatusOK {
return
}
if err != nil {
fmt.Fprintf(os.Stderr, "Error: could not pingback response (attempt %d/%d)\n", i+1, pingbackRetries)
fmt.Fprintln(os.Stderr, err)
// retry after 10 seconds
time.Sleep(10 * time.Second)
continue
}
if resp.StatusCode != http.StatusOK {
fmt.Fprintln(os.Stderr, "Error: caller errored on pingback:", resp.Status)
// TODO: statuscode should be better checked to understand if a retry should be done
// in that
return
}
}
}