From edb69d28a68180b52a6052ad4e3ea6536de6ff54 Mon Sep 17 00:00:00 2001 From: boyska Date: Fri, 4 May 2018 19:25:42 +0200 Subject: [PATCH] initial commit --- cmd/crawlerWrapper/wrapper.go | 178 ++++++++++++++++++++++++++++++++++ crawler/messages.go | 40 ++++++++ 2 files changed, 218 insertions(+) create mode 100644 cmd/crawlerWrapper/wrapper.go create mode 100644 crawler/messages.go diff --git a/cmd/crawlerWrapper/wrapper.go b/cmd/crawlerWrapper/wrapper.go new file mode 100644 index 0000000..386daf9 --- /dev/null +++ b/cmd/crawlerWrapper/wrapper.go @@ -0,0 +1,178 @@ +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "os/exec" + "path" + "time" + + "git.lattuga.net/avana/sciacquone/crawler" + "gopkg.in/mgo.v2/bson" +) + +// cmd is the cmdline to run on each POST +var cmd []string + +// jobsQueue is shared between submitHandler and worker +var jobsQueue chan crawler.Request + +// this httpClient is global without a very specific need to be so +// a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line +// flag +var httpClient *http.Client + +var pingbackRetries int + +// submitHandler handles HTTP POSTs to /submit, enqueing jobs +func submitHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + w.Write([]byte("Only POST accepted")) + return + } + var request crawler.Request + body, err := ioutil.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Cannot read your request\n")) + w.Write([]byte(err.Error())) + return + } + if err = json.Unmarshal(body, &request); err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Cannot parse your request:\n ")) + w.Write([]byte(err.Error())) + return + } + select { + case jobsQueue <- request: // enqueued successfully + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK\n")) + default: // not enqueued; typically, this means "the queue is full" + w.Header().Set("Retry-After", "120") // 2 minutes + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("Crawler queue is full! retry later\n")) + + } +} + +func init() { + httpClient = &http.Client{ + Timeout: time.Second * 10, + } + pingbackRetries = 3 +} + +func main() { + queueSize := flag.Int("queue-size", 100, "Queue size; determines memory usage and ability to handle burst") + // TODO: rate limit options + flag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0]) + fmt.Fprintf(flag.CommandLine.Output(), "%s [options] command [arg]...\n", path.Base(os.Args[0])) + + flag.PrintDefaults() + } + flag.Parse() + jobsQueue = make(chan crawler.Request, *queueSize) + pingQueue := make(chan pingback, *queueSize) + cmd = flag.Args() + if len(cmd) == 0 { + fmt.Fprintln(os.Stderr, "Error: a command must be provided") + os.Exit(2) + } + http.HandleFunc("/submit", submitHandler) + fmt.Println("submitted") + go worker(jobsQueue, pingQueue) + go pingerDispatcher(pingQueue) + err := http.ListenAndServe("localhost:8123", nil) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + // TODO: server HTTP +} + +type pingback struct { + PingbackURL crawler.URL + Response crawler.Response +} + +func worker(reqs <-chan crawler.Request, pings chan<- pingback) { + for { + req := <-reqs + + serializedJob, err := json.Marshal(req.Job) + if err != nil { + fmt.Fprintln(os.Stderr, "Error: encoding job", err) + continue + } + c := exec.Cmd{} + c.Path = cmd[0] + c.Args = cmd + c.Stdin = bytes.NewReader(serializedJob) + var cmdout bytes.Buffer + c.Stdout = &cmdout + fmt.Println("lancio", cmd, string(serializedJob)) + fmt.Println("lancio", c) + cmderr := c.Run() + if cmderr != nil { + fmt.Fprintln(os.Stderr, "Error: command errored for job", req.RequestID) + fmt.Fprintln(os.Stderr, cmderr) + } else { + fmt.Printf("job output '%s'\n", string(cmdout.String())) + } + + if req.ResponseRequested { + pings <- pingback{ + PingbackURL: req.PingbackURL, + Response: crawler.Response{ + Error: cmderr, + RequestID: req.RequestID, + Other: bson.M{"stdout": cmdout.String()}, + }} + } + time.Sleep(1 * time.Second) + } +} + +func pingerDispatcher(pings <-chan pingback) { + for { + ping := <-pings + go pingWorker(ping) + } +} + +func pingWorker(ping pingback) { + for i := 0; i < pingbackRetries; i++ { + serialized, err := json.Marshal(ping.Response) + if err != nil { + fmt.Fprintln(os.Stderr, "Error: could not serialize pingback response") + fmt.Fprintln(os.Stderr, err) + return + } + resp, err := httpClient.Post(ping.PingbackURL.String(), "application/json", + bytes.NewBuffer(serialized)) + if err == nil && resp.StatusCode == http.StatusOK { + return + } + if err != nil { + fmt.Fprintf(os.Stderr, "Error: could not pingback response (attempt %d/%d)\n", i+1, pingbackRetries) + fmt.Fprintln(os.Stderr, err) + // retry after 10 seconds + time.Sleep(10 * time.Second) + continue + } + if resp.StatusCode != http.StatusOK { + fmt.Fprintln(os.Stderr, "Error: caller errored on pingback:", resp.Status) + // TODO: statuscode should be better checked to understand if a retry should be done + // in that + return + } + } +} diff --git a/crawler/messages.go b/crawler/messages.go new file mode 100644 index 0000000..b74c8fa --- /dev/null +++ b/crawler/messages.go @@ -0,0 +1,40 @@ +package crawler + +import ( + "net/url" + + "github.com/satori/go.uuid" + "gopkg.in/mgo.v2/bson" +) + +// URL is our reimplementation of url.URL to support marshalling +type URL struct{ *url.URL } + +// UnmarshalText implements TextUnmarshaler (JSON, too) +func (u *URL) UnmarshalText(text []byte) error { + u2, err := url.Parse(string(text)) + u.URL = u2 + return err +} + +// MarshalText implements TextMarshaler (JSON, too) +func (u *URL) MarshalText() ([]byte, error) { + return []byte(u.String()), nil +} + +// A Request is what the Frontend will send to the crawler. +// +// The "real" thing to do is Job. Other fields are metadata, useful for the response +type Request struct { + Job bson.M // what to do + ResponseRequested bool // shall the crawler send a Response to the Frontend? + RequestID uuid.UUID `json:"requestID"` // this id is mainly intended to be passed back in the CrawlerResponse + PingbackURL URL // when the request has been fulfilled, this URL must be visited with a CrawlerResponse +} + +// A Response is what the Crawler should send to the frontend after the Job has been successfully processed. +type Response struct { + RequestID uuid.UUID // the one we received in the Request + Error error // was everything fine? + Other bson.M // a crawler might communicate something to the frontend +}