diff --git a/cmd/crawlerWrapper/README.md b/cmd/crawlerWrapper/README.md new file mode 100644 index 0000000..cb5d10e --- /dev/null +++ b/cmd/crawlerWrapper/README.md @@ -0,0 +1,46 @@ +crawlerWrapper +================== + +Run +----- + + +run it with +``` +$GOPATH/bin/crawlerWrapper wc +``` +and it will count characters of your requests. Wow. + +Job submission +----------------- + +Submit with +``` +curl localhost:8123/submit -X POST -i -d '{"requestid": "026bff12-66c9-4b02-868c-cb3bbee1c08f", "Job": {"data": "foobar"}}' +``` +yes, the requestid MUST be a valid UUIDv4. Well, you can omit it, but you shouldn't. The `Job` field must be a +a JSON object. Strings, arrays, numbers will **not** work fine. It must be an object, but it can be any +object. +This basically means that your worker should probably understand JSON. + +If you chose `wc` as worker, it will count the bytes for the literal string `{"data": "foobar"}` without doing +any JSON parsing. + +Worker output +--------------- + +You might expect that stdout coming from the worker is handled in some way. That's not exact. A worker is +supposed to deal about the rest of the chain by itself, so no, the output is not automatically fed to some +other worker or anything else. + +However, there is at least a way to have a notification of completed job. This is done adding two fields to +the job submission. + +``` +curl localhost:8123/submit -X POST -i -d '{"requestid": "'$(python3 -c 'import uuid; print(uuid.uuid4());')'", "ResponseRequested": true, "PingbackURL": "http://google.it/"}' +``` + +Here we added two fields; `ResponseRequested` is a boolean specifying if we care about knowing that the +command has completed. If unspecified, it is false. If it is true, then a `POST` will be made to the URL +specified in `PingbackURL`. The format of this POST is specified in `crawler.JobCompletionNotice`. Please +notice that the `POST` will be done even on worker error. diff --git a/cmd/crawlerWrapper/wrapper.go b/cmd/crawlerWrapper/wrapper.go index 386daf9..9f8c2b6 100644 --- a/cmd/crawlerWrapper/wrapper.go +++ b/cmd/crawlerWrapper/wrapper.go @@ -16,11 +16,20 @@ import ( "gopkg.in/mgo.v2/bson" ) +/* +This command act as an HTTP wrapper for a "generic" worker. This is useful to be able to write a worker that can: +- be written in every language +- not care about HTTP +- not care about job queues, concurrency, etc. + +crawlerWrapper will handle ONE job at a time. While it might be changed in the future to overcome this limitation, this is right now hardcoded. +*/ + // cmd is the cmdline to run on each POST var cmd []string // jobsQueue is shared between submitHandler and worker -var jobsQueue chan crawler.Request +var jobsQueue chan crawler.JobSubmission // this httpClient is global without a very specific need to be so // a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line @@ -36,7 +45,7 @@ func submitHandler(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Only POST accepted")) return } - var request crawler.Request + var request crawler.JobSubmission body, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) @@ -79,7 +88,7 @@ func main() { flag.PrintDefaults() } flag.Parse() - jobsQueue = make(chan crawler.Request, *queueSize) + jobsQueue = make(chan crawler.JobSubmission, *queueSize) pingQueue := make(chan pingback, *queueSize) cmd = flag.Args() if len(cmd) == 0 { @@ -88,22 +97,23 @@ func main() { } http.HandleFunc("/submit", submitHandler) fmt.Println("submitted") - go worker(jobsQueue, pingQueue) + go jobsRunner(jobsQueue, pingQueue) go pingerDispatcher(pingQueue) err := http.ListenAndServe("localhost:8123", nil) if err != nil { fmt.Println(err) os.Exit(1) } - // TODO: server HTTP } type pingback struct { PingbackURL crawler.URL - Response crawler.Response + Response crawler.JobCompletionNotice } -func worker(reqs <-chan crawler.Request, pings chan<- pingback) { +// jobsRunner fetches jobs from a chan, run the user-specified worker with the job-specified stdin and, +// if asked so, ask the pingerDispatcher to make the appropriate ping +func jobsRunner(reqs <-chan crawler.JobSubmission, pings chan<- pingback) { for { req := <-reqs @@ -131,7 +141,7 @@ func worker(reqs <-chan crawler.Request, pings chan<- pingback) { if req.ResponseRequested { pings <- pingback{ PingbackURL: req.PingbackURL, - Response: crawler.Response{ + Response: crawler.JobCompletionNotice{ Error: cmderr, RequestID: req.RequestID, Other: bson.M{"stdout": cmdout.String()}, @@ -141,13 +151,21 @@ func worker(reqs <-chan crawler.Request, pings chan<- pingback) { } } +// pingerDispatcher fetch info about completed jobs from the chan "pings" and run pingWorkers func pingerDispatcher(pings <-chan pingback) { for { ping := <-pings + // this is asynchronous; which means that many pingWorker might be running at the same time + // this is the case if submitting JobCompletionNotice is slower than receiving (and handling) + // JobSubmission. This really shouldn't be the case. go pingWorker(ping) } } +// a pingWorker will POST to PingbackURL. It will attempt retries in (certain) case of error +// +// Network errors are currently the only one to be retried; in the future, HTTP status codes shall be +// considered to decide whether or not to retry func pingWorker(ping pingback) { for i := 0; i < pingbackRetries; i++ { serialized, err := json.Marshal(ping.Response) diff --git a/crawler/messages.go b/crawler/messages.go index b74c8fa..dc6deef 100644 --- a/crawler/messages.go +++ b/crawler/messages.go @@ -22,18 +22,18 @@ func (u *URL) MarshalText() ([]byte, error) { return []byte(u.String()), nil } -// A Request is what the Frontend will send to the crawler. +// A JobSubmission is what the Frontend will send to the crawler. // // The "real" thing to do is Job. Other fields are metadata, useful for the response -type Request struct { +type JobSubmission struct { Job bson.M // what to do ResponseRequested bool // shall the crawler send a Response to the Frontend? RequestID uuid.UUID `json:"requestID"` // this id is mainly intended to be passed back in the CrawlerResponse PingbackURL URL // when the request has been fulfilled, this URL must be visited with a CrawlerResponse } -// A Response is what the Crawler should send to the frontend after the Job has been successfully processed. -type Response struct { +// A JobCompletionNotice is what the Crawler should send to the frontend after the Job has been successfully processed. +type JobCompletionNotice struct { RequestID uuid.UUID // the one we received in the Request Error error // was everything fine? Other bson.M // a crawler might communicate something to the frontend