178 lines
4.5 KiB
Go
178 lines
4.5 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"time"
|
|
|
|
"git.lattuga.net/avana/sciacquone/crawler"
|
|
"gopkg.in/mgo.v2/bson"
|
|
)
|
|
|
|
// cmd is the cmdline to run on each POST
|
|
var cmd []string
|
|
|
|
// jobsQueue is shared between submitHandler and worker
|
|
var jobsQueue chan crawler.Request
|
|
|
|
// this httpClient is global without a very specific need to be so
|
|
// a bit for performance, a bit for easy customization from main (ie: setting the timeout with a command-line
|
|
// flag
|
|
var httpClient *http.Client
|
|
|
|
var pingbackRetries int
|
|
|
|
// submitHandler handles HTTP POSTs to /submit, enqueing jobs
|
|
func submitHandler(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != "POST" {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
w.Write([]byte("Only POST accepted"))
|
|
return
|
|
}
|
|
var request crawler.Request
|
|
body, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
w.Write([]byte("Cannot read your request\n"))
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
if err = json.Unmarshal(body, &request); err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
w.Write([]byte("Cannot parse your request:\n "))
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
select {
|
|
case jobsQueue <- request: // enqueued successfully
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte("OK\n"))
|
|
default: // not enqueued; typically, this means "the queue is full"
|
|
w.Header().Set("Retry-After", "120") // 2 minutes
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
w.Write([]byte("Crawler queue is full! retry later\n"))
|
|
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
httpClient = &http.Client{
|
|
Timeout: time.Second * 10,
|
|
}
|
|
pingbackRetries = 3
|
|
}
|
|
|
|
func main() {
|
|
queueSize := flag.Int("queue-size", 100, "Queue size; determines memory usage and ability to handle burst")
|
|
// TODO: rate limit options
|
|
flag.Usage = func() {
|
|
fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0])
|
|
fmt.Fprintf(flag.CommandLine.Output(), "%s [options] command [arg]...\n", path.Base(os.Args[0]))
|
|
|
|
flag.PrintDefaults()
|
|
}
|
|
flag.Parse()
|
|
jobsQueue = make(chan crawler.Request, *queueSize)
|
|
pingQueue := make(chan pingback, *queueSize)
|
|
cmd = flag.Args()
|
|
if len(cmd) == 0 {
|
|
fmt.Fprintln(os.Stderr, "Error: a command must be provided")
|
|
os.Exit(2)
|
|
}
|
|
http.HandleFunc("/submit", submitHandler)
|
|
fmt.Println("submitted")
|
|
go worker(jobsQueue, pingQueue)
|
|
go pingerDispatcher(pingQueue)
|
|
err := http.ListenAndServe("localhost:8123", nil)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
os.Exit(1)
|
|
}
|
|
// TODO: server HTTP
|
|
}
|
|
|
|
type pingback struct {
|
|
PingbackURL crawler.URL
|
|
Response crawler.Response
|
|
}
|
|
|
|
func worker(reqs <-chan crawler.Request, pings chan<- pingback) {
|
|
for {
|
|
req := <-reqs
|
|
|
|
serializedJob, err := json.Marshal(req.Job)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, "Error: encoding job", err)
|
|
continue
|
|
}
|
|
c := exec.Cmd{}
|
|
c.Path = cmd[0]
|
|
c.Args = cmd
|
|
c.Stdin = bytes.NewReader(serializedJob)
|
|
var cmdout bytes.Buffer
|
|
c.Stdout = &cmdout
|
|
fmt.Println("lancio", cmd, string(serializedJob))
|
|
fmt.Println("lancio", c)
|
|
cmderr := c.Run()
|
|
if cmderr != nil {
|
|
fmt.Fprintln(os.Stderr, "Error: command errored for job", req.RequestID)
|
|
fmt.Fprintln(os.Stderr, cmderr)
|
|
} else {
|
|
fmt.Printf("job output '%s'\n", string(cmdout.String()))
|
|
}
|
|
|
|
if req.ResponseRequested {
|
|
pings <- pingback{
|
|
PingbackURL: req.PingbackURL,
|
|
Response: crawler.Response{
|
|
Error: cmderr,
|
|
RequestID: req.RequestID,
|
|
Other: bson.M{"stdout": cmdout.String()},
|
|
}}
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
|
|
func pingerDispatcher(pings <-chan pingback) {
|
|
for {
|
|
ping := <-pings
|
|
go pingWorker(ping)
|
|
}
|
|
}
|
|
|
|
func pingWorker(ping pingback) {
|
|
for i := 0; i < pingbackRetries; i++ {
|
|
serialized, err := json.Marshal(ping.Response)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, "Error: could not serialize pingback response")
|
|
fmt.Fprintln(os.Stderr, err)
|
|
return
|
|
}
|
|
resp, err := httpClient.Post(ping.PingbackURL.String(), "application/json",
|
|
bytes.NewBuffer(serialized))
|
|
if err == nil && resp.StatusCode == http.StatusOK {
|
|
return
|
|
}
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: could not pingback response (attempt %d/%d)\n", i+1, pingbackRetries)
|
|
fmt.Fprintln(os.Stderr, err)
|
|
// retry after 10 seconds
|
|
time.Sleep(10 * time.Second)
|
|
continue
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
fmt.Fprintln(os.Stderr, "Error: caller errored on pingback:", resp.Status)
|
|
// TODO: statuscode should be better checked to understand if a retry should be done
|
|
// in that
|
|
return
|
|
}
|
|
}
|
|
}
|