forked from avana/sciacquone
initial commit
This commit is contained in:
commit
edb69d28a6
2 changed files with 218 additions and 0 deletions
178
cmd/crawlerWrapper/wrapper.go
Normal file
178
cmd/crawlerWrapper/wrapper.go
Normal file
|
@ -0,0 +1,178 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"git.lattuga.net/avana/sciacquone/crawler"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
// cmd is the cmdline to run on each POST
|
||||
var cmd []string
|
||||
|
||||
// jobsQueue is shared between submitHandler and worker
|
||||
var jobsQueue chan crawler.Request
|
||||
|
||||
// this httpClient is global without a very specific need to be so
|
||||
// a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line
|
||||
// flag
|
||||
var httpClient *http.Client
|
||||
|
||||
var pingbackRetries int
|
||||
|
||||
// submitHandler handles HTTP POSTs to /submit, enqueing jobs
|
||||
func submitHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
w.Write([]byte("Only POST accepted"))
|
||||
return
|
||||
}
|
||||
var request crawler.Request
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Write([]byte("Cannot read your request\n"))
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
if err = json.Unmarshal(body, &request); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
w.Write([]byte("Cannot parse your request:\n "))
|
||||
w.Write([]byte(err.Error()))
|
||||
return
|
||||
}
|
||||
select {
|
||||
case jobsQueue <- request: // enqueued successfully
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write([]byte("OK\n"))
|
||||
default: // not enqueued; typically, this means "the queue is full"
|
||||
w.Header().Set("Retry-After", "120") // 2 minutes
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
w.Write([]byte("Crawler queue is full! retry later\n"))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
httpClient = &http.Client{
|
||||
Timeout: time.Second * 10,
|
||||
}
|
||||
pingbackRetries = 3
|
||||
}
|
||||
|
||||
func main() {
|
||||
queueSize := flag.Int("queue-size", 100, "Queue size; determines memory usage and ability to handle burst")
|
||||
// TODO: rate limit options
|
||||
flag.Usage = func() {
|
||||
fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0])
|
||||
fmt.Fprintf(flag.CommandLine.Output(), "%s [options] command [arg]...\n", path.Base(os.Args[0]))
|
||||
|
||||
flag.PrintDefaults()
|
||||
}
|
||||
flag.Parse()
|
||||
jobsQueue = make(chan crawler.Request, *queueSize)
|
||||
pingQueue := make(chan pingback, *queueSize)
|
||||
cmd = flag.Args()
|
||||
if len(cmd) == 0 {
|
||||
fmt.Fprintln(os.Stderr, "Error: a command must be provided")
|
||||
os.Exit(2)
|
||||
}
|
||||
http.HandleFunc("/submit", submitHandler)
|
||||
fmt.Println("submitted")
|
||||
go worker(jobsQueue, pingQueue)
|
||||
go pingerDispatcher(pingQueue)
|
||||
err := http.ListenAndServe("localhost:8123", nil)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// TODO: server HTTP
|
||||
}
|
||||
|
||||
type pingback struct {
|
||||
PingbackURL crawler.URL
|
||||
Response crawler.Response
|
||||
}
|
||||
|
||||
func worker(reqs <-chan crawler.Request, pings chan<- pingback) {
|
||||
for {
|
||||
req := <-reqs
|
||||
|
||||
serializedJob, err := json.Marshal(req.Job)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Error: encoding job", err)
|
||||
continue
|
||||
}
|
||||
c := exec.Cmd{}
|
||||
c.Path = cmd[0]
|
||||
c.Args = cmd
|
||||
c.Stdin = bytes.NewReader(serializedJob)
|
||||
var cmdout bytes.Buffer
|
||||
c.Stdout = &cmdout
|
||||
fmt.Println("lancio", cmd, string(serializedJob))
|
||||
fmt.Println("lancio", c)
|
||||
cmderr := c.Run()
|
||||
if cmderr != nil {
|
||||
fmt.Fprintln(os.Stderr, "Error: command errored for job", req.RequestID)
|
||||
fmt.Fprintln(os.Stderr, cmderr)
|
||||
} else {
|
||||
fmt.Printf("job output '%s'\n", string(cmdout.String()))
|
||||
}
|
||||
|
||||
if req.ResponseRequested {
|
||||
pings <- pingback{
|
||||
PingbackURL: req.PingbackURL,
|
||||
Response: crawler.Response{
|
||||
Error: cmderr,
|
||||
RequestID: req.RequestID,
|
||||
Other: bson.M{"stdout": cmdout.String()},
|
||||
}}
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func pingerDispatcher(pings <-chan pingback) {
|
||||
for {
|
||||
ping := <-pings
|
||||
go pingWorker(ping)
|
||||
}
|
||||
}
|
||||
|
||||
func pingWorker(ping pingback) {
|
||||
for i := 0; i < pingbackRetries; i++ {
|
||||
serialized, err := json.Marshal(ping.Response)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Error: could not serialize pingback response")
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
return
|
||||
}
|
||||
resp, err := httpClient.Post(ping.PingbackURL.String(), "application/json",
|
||||
bytes.NewBuffer(serialized))
|
||||
if err == nil && resp.StatusCode == http.StatusOK {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: could not pingback response (attempt %d/%d)\n", i+1, pingbackRetries)
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
// retry after 10 seconds
|
||||
time.Sleep(10 * time.Second)
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
fmt.Fprintln(os.Stderr, "Error: caller errored on pingback:", resp.Status)
|
||||
// TODO: statuscode should be better checked to understand if a retry should be done
|
||||
// in that
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
40
crawler/messages.go
Normal file
40
crawler/messages.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package crawler
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"github.com/satori/go.uuid"
|
||||
"gopkg.in/mgo.v2/bson"
|
||||
)
|
||||
|
||||
// URL is our reimplementation of url.URL to support marshalling
|
||||
type URL struct{ *url.URL }
|
||||
|
||||
// UnmarshalText implements TextUnmarshaler (JSON, too)
|
||||
func (u *URL) UnmarshalText(text []byte) error {
|
||||
u2, err := url.Parse(string(text))
|
||||
u.URL = u2
|
||||
return err
|
||||
}
|
||||
|
||||
// MarshalText implements TextMarshaler (JSON, too)
|
||||
func (u *URL) MarshalText() ([]byte, error) {
|
||||
return []byte(u.String()), nil
|
||||
}
|
||||
|
||||
// A Request is what the Frontend will send to the crawler.
|
||||
//
|
||||
// The "real" thing to do is Job. Other fields are metadata, useful for the response
|
||||
type Request struct {
|
||||
Job bson.M // what to do
|
||||
ResponseRequested bool // shall the crawler send a Response to the Frontend?
|
||||
RequestID uuid.UUID `json:"requestID"` // this id is mainly intended to be passed back in the CrawlerResponse
|
||||
PingbackURL URL // when the request has been fulfilled, this URL must be visited with a CrawlerResponse
|
||||
}
|
||||
|
||||
// A Response is what the Crawler should send to the frontend after the Job has been successfully processed.
|
||||
type Response struct {
|
||||
RequestID uuid.UUID // the one we received in the Request
|
||||
Error error // was everything fine?
|
||||
Other bson.M // a crawler might communicate something to the frontend
|
||||
}
|
Loading…
Reference in a new issue