|
@@ -16,11 +16,20 @@ import (
|
|
|
"gopkg.in/mgo.v2/bson"
|
|
|
)
|
|
|
|
|
|
+/*
|
|
|
+This command act as an HTTP wrapper for a "generic" worker. This is useful to be able to write a worker that can:
|
|
|
+- be written in every language
|
|
|
+- not care about HTTP
|
|
|
+- not care about job queues, concurrency, etc.
|
|
|
+
|
|
|
+crawlerWrapper will handle ONE job at a time. While it might be changed in the future to overcome this limitation, this is right now hardcoded.
|
|
|
+*/
|
|
|
+
|
|
|
// cmd is the cmdline to run on each POST
|
|
|
var cmd []string
|
|
|
|
|
|
// jobsQueue is shared between submitHandler and worker
|
|
|
-var jobsQueue chan crawler.Request
|
|
|
+var jobsQueue chan crawler.JobSubmission
|
|
|
|
|
|
// this httpClient is global without a very specific need to be so
|
|
|
// a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line
|
|
@@ -36,7 +45,7 @@ func submitHandler(w http.ResponseWriter, r *http.Request) {
|
|
|
w.Write([]byte("Only POST accepted"))
|
|
|
return
|
|
|
}
|
|
|
- var request crawler.Request
|
|
|
+ var request crawler.JobSubmission
|
|
|
body, err := ioutil.ReadAll(r.Body)
|
|
|
if err != nil {
|
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
@@ -79,7 +88,7 @@ func main() {
|
|
|
flag.PrintDefaults()
|
|
|
}
|
|
|
flag.Parse()
|
|
|
- jobsQueue = make(chan crawler.Request, *queueSize)
|
|
|
+ jobsQueue = make(chan crawler.JobSubmission, *queueSize)
|
|
|
pingQueue := make(chan pingback, *queueSize)
|
|
|
cmd = flag.Args()
|
|
|
if len(cmd) == 0 {
|
|
@@ -88,22 +97,23 @@ func main() {
|
|
|
}
|
|
|
http.HandleFunc("/submit", submitHandler)
|
|
|
fmt.Println("submitted")
|
|
|
- go worker(jobsQueue, pingQueue)
|
|
|
+ go jobsRunner(jobsQueue, pingQueue)
|
|
|
go pingerDispatcher(pingQueue)
|
|
|
err := http.ListenAndServe("localhost:8123", nil)
|
|
|
if err != nil {
|
|
|
fmt.Println(err)
|
|
|
os.Exit(1)
|
|
|
}
|
|
|
- // TODO: server HTTP
|
|
|
}
|
|
|
|
|
|
type pingback struct {
|
|
|
PingbackURL crawler.URL
|
|
|
- Response crawler.Response
|
|
|
+ Response crawler.JobCompletionNotice
|
|
|
}
|
|
|
|
|
|
-func worker(reqs <-chan crawler.Request, pings chan<- pingback) {
|
|
|
+// jobsRunner fetches jobs from a chan, run the user-specified worker with the job-specified stdin and,
|
|
|
+// if asked so, ask the pingerDispatcher to make the appropriate ping
|
|
|
+func jobsRunner(reqs <-chan crawler.JobSubmission, pings chan<- pingback) {
|
|
|
for {
|
|
|
req := <-reqs
|
|
|
|
|
@@ -131,7 +141,7 @@ func worker(reqs <-chan crawler.Request, pings chan<- pingback) {
|
|
|
if req.ResponseRequested {
|
|
|
pings <- pingback{
|
|
|
PingbackURL: req.PingbackURL,
|
|
|
- Response: crawler.Response{
|
|
|
+ Response: crawler.JobCompletionNotice{
|
|
|
Error: cmderr,
|
|
|
RequestID: req.RequestID,
|
|
|
Other: bson.M{"stdout": cmdout.String()},
|
|
@@ -141,13 +151,21 @@ func worker(reqs <-chan crawler.Request, pings chan<- pingback) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// pingerDispatcher fetch info about completed jobs from the chan "pings" and run pingWorkers
|
|
|
func pingerDispatcher(pings <-chan pingback) {
|
|
|
for {
|
|
|
ping := <-pings
|
|
|
+ // this is asynchronous; which means that many pingWorker might be running at the same time
|
|
|
+ // this is the case if submitting JobCompletionNotice is slower than receiving (and handling)
|
|
|
+ // JobSubmission. This really shouldn't be the case.
|
|
|
go pingWorker(ping)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// a pingWorker will POST to PingbackURL. It will attempt retries in (certain) case of error
|
|
|
+//
|
|
|
+// Network errors are currently the only one to be retried; in the future, HTTP status codes shall be
|
|
|
+// considered to decide whether or not to retry
|
|
|
func pingWorker(ping pingback) {
|
|
|
for i := 0; i < pingbackRetries; i++ {
|
|
|
serialized, err := json.Marshal(ping.Response)
|