JOB handling + /api/ namespace

This commit is contained in:
boyska 2013-11-29 23:55:19 +01:00
parent 3244793336
commit e2939c6169
4 changed files with 229 additions and 140 deletions

74
server/processqueue.py Normal file
View file

@ -0,0 +1,74 @@
import multiprocessing
class JobQueue(object):
def __init__(self):
self.pool = multiprocessing.Pool(processes=1)
self.last_job_id = 0
self.jobs = {} # job_id: AsyncResult
def submit(self, function, *args, **kwargs):
self.last_job_id += 1
job_id = self.last_job_id
def clean_jobs(res):
'''this callback will remove the job from the queue'''
del self.jobs[job_id]
self.jobs[job_id] = self.pool.apply_async(function, args, kwargs,
clean_jobs)
return job_id
def check_job(self, job_id):
'''
If the job is running, return the asyncResult.
If it has already completed, returns True.
If no such job_id exists at all, returns False
'''
if job_id <= 0:
raise ValueError("non-valid job_id")
if self.last_job_id < job_id:
return False
if job_id in self.jobs:
return self.jobs[job_id]
return True
def join(self):
self.pool.close()
self.pool.join()
self.pool = None
def simulate_long_job(recid=None, starttime=None, endtime=None, name='', filename=None):
from time import sleep
print "evviva " + name
sleep(2)
print "lavoro su " + name
sleep(2)
print "done su " + name
_queue = None
def get_process_queue():
global _queue
if _queue is None:
_queue = JobQueue()
return _queue
if __name__ == '__main__':
from datetime import datetime
n = datetime.now()
def sleep(n):
import time
print "Inizio %d" % n
time.sleep(n)
print "Finisco %d" % n
return n
get_process_queue().submit(sleep, 3)
get_process_queue().submit(sleep, 3)
get_process_queue().join()
print get_process_queue().jobs
delta = (datetime.now() - n).total_seconds()
print delta
assert 5 < delta < 7

View file

@ -1,9 +1,11 @@
import datetime
from datetime import datetime
import logging
from functools import partial
from bottle import Bottle, request, static_file, redirect
from bottle import Bottle, request, static_file, redirect, abort
from techrec import Rec, RecDB
from processqueue import get_process_queue, simulate_long_job
class RecServer:
def __init__(self):
@ -17,108 +19,113 @@ class RecServer:
### This is the API part of the app
# TODO: move to namespace /api/
# TODO: create a "sub-application"
self._app.route('/help', callback=self.help)
self._app.route('/help/', callback=self.help)
self._app.route('/api/help', callback=self.help)
self._app.route('/create', method="POST", callback=self.create)
# self._app.post('/create', callback=self.create)
self._app.route('/api/create', method="POST", callback=self.create)
self._app.route('/update', method="POST", callback=self.update)
self._app.route('/search', method=["GET", "POST"], callback=self.search)
self._app.route('/delete', method="POST", callback=self.delete)
self._app.route('/api/update', method="POST", callback=self.update)
self._app.route('/api/search', callback=self.search)
self._app.route('/api/delete', method="POST", callback=self.delete)
self._app.route('/api/jobs', callback=self.running_jobs)
self._app.route('/api/jobs/<job_id:int>', callback=self.check_job)
## Static part of the site
self._app.route('/static/<filepath:path>',
callback= lambda filepath: static_file(filepath, root='static/'))
callback=lambda filepath: static_file(filepath,
root='static/'))
self._app.route('/', callback=lambda: redirect('/new.html'))
self._app.route('/new.html', callback=lambda: static_file('new.html',
self._app.route('/new.html',
callback=partial(static_file, 'new.html',
root='pages/'))
self._app.route('/tempo.html', callback=lambda: static_file('tempo.html',
self._app.route('/tempo.html',
callback=partial(static_file, 'tempo.html',
root='pages/'))
def extsearch( self, args ):
print "ARG", args
return self.rec_err("EXT")
"""
CREATE HANDLER
"""
# @route('/create', method=['OPTIONS','POST'])
def create(self):
req = dict( request.POST.allitems() )
req = dict(request.POST.allitems())
ret = {}
print "Server:: Create request %s " % req
starttime = ""
if req["starttime-" + req["recid"]] != "":
starttime = datetime.datetime.strptime( req["starttime-"+req["recid"]] , "%Y/%m/%d %H:%M:%S")
if req["starttime"] != "":
starttime = datetime.strptime(req["starttime"],
"%Y/%m/%d %H:%M:%S")
endtime = datetime.datetime.now()
if req["endtime-" + req["recid"]] != "":
endtime = datetime.datetime.strptime( req["endtime-"+req["recid"]] , "%Y/%m/%d %H:%M:%S")
print "Name %s RECID %s Starttime %s EndTime %s" %(req["name-"+req["recid"]],req["recid"], starttime,endtime )
ret = self.db.add( Rec(name=req["name-"+req["recid"]],
endtime = datetime.now()
if req["endtime"] != "":
endtime = datetime.strptime(req["endtime-"+req["recid"]],
"%Y/%m/%d %H:%M:%S")
print "Name %s RECID %s Starttime %s EndTime %s" %\
(req["name"], req["recid"], starttime, endtime)
ret = self.db.add(Rec(name=req["name"],
recid=req["recid"],
starttime=starttime,
endtime=endtime )
endtime=endtime)
)
return self.rec_msg("Nuova registrazione creata! (id:%d)" % ret.id,
id=ret.id)
# @route('/active')
def getactive(self):
print "GetActive"
"""
DELETE HANDLER
"""
# @route('/delete/<recid>') # @route('/delete/<recid>/')
def delete( self, recid = None ):
req = dict( request.POST.allitems() )
logging.info("Server: request delete %s " % ( req ) )
if not req.has_key( "recid" ):
def delete(self, recid=None):
req = dict(request.POST.allitems())
logging.info("Server: request delete %s " % (req))
if 'recid' not in req:
return self.rec_err("No valid ID")
if self.db.delete( req["recid"] ):
if self.db.delete(req["recid"]):
return self.rec_msg("DELETE OK")
else:
return self.rec_err("DELETE error: %s" % (self.db.get_err()))
"""
UPDATE HANDLER
"""
# @route('/delete/<recid>') # @route('/delete/<recid>/')
def update( self ):
req = dict( request.POST.allitems() )
def update(self):
req = dict(request.POST.allitems())
ret={}
ret["starttime"] = req ["starttime-"+req["recid"]]
ret["endtime"] = req["endtime-"+req["recid"]]
ret["name"] = req["name-"+req["recid"]]
ret = {}
ret["starttime"] = req["starttime"]
ret["endtime"] = req["endtime"]
ret["name"] = req["name"]
if self.db.update( req["recid"], ret ):
return self.rec_msg("Aggiornamento completato!");
if not self.db.update(req["recid"], ret):
return self.rec_err("Errore Aggiornamento")
req['filename'] = 'ror-%s-%s' % (req['recid'], req['name'])
# TODO: real ffmpeg job!
job_id = get_process_queue().submit(simulate_long_job, **req)
print "SUBMITTED: %d" % job_id
return self.rec_msg("Aggiornamento completato!", job_id=job_id)
def check_job(self, job_id):
try:
job = get_process_queue().check_job(job_id)
except ValueError:
abort(400, 'job_id not valid')
def ret(status):
return {'job_status': status, 'job_id': job_id}
if job is True:
return ret('DONE')
if job is False:
abort(404, 'No such job has ever been spawned')
else:
return self.rec_err("Errore Aggiornamento");
if job.ready():
try:
res = job.get()
return res
except Exception as exc:
return ret('FAILED')
return ret('WIP')
"""
JSON' RESPONDER
"""
def rec_msg(self, msg, **kwargs):
d = {"message": msg, "status": True}
d.update(kwargs)
return d
def rec_err(self, msg):
return {"error": msg, "status": False}
def running_jobs(self):
res = {}
res['last_job_id'] = get_process_queue().last_job_id
res['running'] = get_process_queue().jobs.keys()
return res
"""
@route('/search') # @route('/search/') # @route('/search/<key>/<value>')
"""
def search( self, args=None):
def search(self, args=None):
if request.method == 'GET':
req = dict( request.GET.allitems() )
else:
@ -172,6 +179,16 @@ class RecServer:
<h2>/update </h2>\
<h3>Not implemented.</h3>"
# JSON UTILS
def rec_msg(self, msg, **kwargs):
d = {"message": msg, "status": True}
d.update(kwargs)
return d
def rec_err(self, msg):
return {"error": msg, "status": False}
if __name__ == "__main__":
c = RecServer()
c._app.run(host="localhost", port="8000", debug=True, reloader=True)

View file

@ -10,10 +10,10 @@ function trx_logarea( code ) { return "logarea-"+code; }
function rs_button( code ) { return "button"+code; }
function rs_trxarea( code ) { return "recarea-"+code; }
function rs_trxname( code ) { return "name-"+code; }
function rs_trxname( code ) { return "name"; }
function rs_buttonarea( code ) { return "butarea-"+code; }
function rs_inputstart( code ) { return "starttime-"+code; }
function rs_inputend( code ) { return "endtime-"+code; }
function rs_inputstart( code ) { return "starttime"; }
function rs_inputend( code ) { return "endtime"; }
function rs_formid(code) { return "form-"+code; }
function rs_dellink(code) { return "dellink-"+code;}
function rs_id(code) { return code; }
@ -270,9 +270,7 @@ function recUpdate( recid ) {
*
*/
function RecAjax(apipath, dataString ) {
var srv = srvaddr + apipath ;
var srv = srvaddr + "api/" + apipath ;
var request = $.ajax({
type: "POST",
cache: false,
@ -335,4 +333,4 @@ function ChangeState(recid, from, to) {
}
} // End function ChangeState
// vim: set ts=4 sw=4 et:

View file

@ -8,7 +8,7 @@ $(document).ready(function(){
function (event) {
event.preventDefault();
dataString = $(this).serialize();
var request = $.getJSON('/search', dataString);
var request = $.getJSON('/api/search', dataString);
$("#searchresult").html(" ");
request.done( function(data) {