From e2939c6169c417e68b0f65746ba959892bf73335 Mon Sep 17 00:00:00 2001 From: boyska Date: Fri, 29 Nov 2013 23:55:19 +0100 Subject: [PATCH] JOB handling + /api/ namespace --- server/processqueue.py | 74 +++++++++++++++ server/server.py | 163 ++++++++++++++++++--------------- server/static/js/reclibrary.js | 130 +++++++++++++------------- server/static/js/tempo.js | 2 +- 4 files changed, 229 insertions(+), 140 deletions(-) create mode 100644 server/processqueue.py diff --git a/server/processqueue.py b/server/processqueue.py new file mode 100644 index 0000000..cf55bd7 --- /dev/null +++ b/server/processqueue.py @@ -0,0 +1,74 @@ +import multiprocessing + + +class JobQueue(object): + def __init__(self): + self.pool = multiprocessing.Pool(processes=1) + self.last_job_id = 0 + self.jobs = {} # job_id: AsyncResult + + def submit(self, function, *args, **kwargs): + self.last_job_id += 1 + job_id = self.last_job_id + + def clean_jobs(res): + '''this callback will remove the job from the queue''' + del self.jobs[job_id] + self.jobs[job_id] = self.pool.apply_async(function, args, kwargs, + clean_jobs) + return job_id + + def check_job(self, job_id): + ''' + If the job is running, return the asyncResult. + If it has already completed, returns True. + If no such job_id exists at all, returns False + ''' + if job_id <= 0: + raise ValueError("non-valid job_id") + if self.last_job_id < job_id: + return False + if job_id in self.jobs: + return self.jobs[job_id] + return True + + def join(self): + self.pool.close() + self.pool.join() + self.pool = None + + +def simulate_long_job(recid=None, starttime=None, endtime=None, name='', filename=None): + from time import sleep + print "evviva " + name + sleep(2) + print "lavoro su " + name + sleep(2) + print "done su " + name +_queue = None + + +def get_process_queue(): + global _queue + if _queue is None: + _queue = JobQueue() + return _queue + +if __name__ == '__main__': + from datetime import datetime + n = datetime.now() + + def sleep(n): + import time + print "Inizio %d" % n + time.sleep(n) + print "Finisco %d" % n + return n + + get_process_queue().submit(sleep, 3) + get_process_queue().submit(sleep, 3) + get_process_queue().join() + print get_process_queue().jobs + delta = (datetime.now() - n).total_seconds() + print delta + assert 5 < delta < 7 diff --git a/server/server.py b/server/server.py index ce9b339..5b4bdfa 100644 --- a/server/server.py +++ b/server/server.py @@ -1,9 +1,11 @@ -import datetime +from datetime import datetime import logging +from functools import partial -from bottle import Bottle, request, static_file, redirect +from bottle import Bottle, request, static_file, redirect, abort from techrec import Rec, RecDB +from processqueue import get_process_queue, simulate_long_job class RecServer: def __init__(self): @@ -17,108 +19,113 @@ class RecServer: ### This is the API part of the app # TODO: move to namespace /api/ # TODO: create a "sub-application" - self._app.route('/help', callback=self.help) - self._app.route('/help/', callback=self.help) + self._app.route('/api/help', callback=self.help) - self._app.route('/create', method="POST", callback=self.create) - # self._app.post('/create', callback=self.create) + self._app.route('/api/create', method="POST", callback=self.create) - self._app.route('/update', method="POST", callback=self.update) - self._app.route('/search', method=["GET", "POST"], callback=self.search) - self._app.route('/delete', method="POST", callback=self.delete) + self._app.route('/api/update', method="POST", callback=self.update) + self._app.route('/api/search', callback=self.search) + self._app.route('/api/delete', method="POST", callback=self.delete) + self._app.route('/api/jobs', callback=self.running_jobs) + self._app.route('/api/jobs/', callback=self.check_job) ## Static part of the site self._app.route('/static/', - callback= lambda filepath: static_file(filepath, root='static/')) + callback=lambda filepath: static_file(filepath, + root='static/')) self._app.route('/', callback=lambda: redirect('/new.html')) - self._app.route('/new.html', callback=lambda: static_file('new.html', - root='pages/')) - self._app.route('/tempo.html', callback=lambda: static_file('tempo.html', - root='pages/')) + self._app.route('/new.html', + callback=partial(static_file, 'new.html', + root='pages/')) + self._app.route('/tempo.html', + callback=partial(static_file, 'tempo.html', + root='pages/')) - def extsearch( self, args ): - print "ARG", args - return self.rec_err("EXT") - - """ - CREATE HANDLER - """ - # @route('/create', method=['OPTIONS','POST']) def create(self): - req = dict( request.POST.allitems() ) + req = dict(request.POST.allitems()) ret = {} print "Server:: Create request %s " % req starttime = "" - if req["starttime-" + req["recid"]] != "": - starttime = datetime.datetime.strptime( req["starttime-"+req["recid"]] , "%Y/%m/%d %H:%M:%S") + if req["starttime"] != "": + starttime = datetime.strptime(req["starttime"], + "%Y/%m/%d %H:%M:%S") - endtime = datetime.datetime.now() - if req["endtime-" + req["recid"]] != "": - endtime = datetime.datetime.strptime( req["endtime-"+req["recid"]] , "%Y/%m/%d %H:%M:%S") - - - print "Name %s RECID %s Starttime %s EndTime %s" %(req["name-"+req["recid"]],req["recid"], starttime,endtime ) - ret = self.db.add( Rec(name=req["name-"+req["recid"]], - recid=req["recid"], - starttime=starttime, - endtime=endtime ) - ) + endtime = datetime.now() + if req["endtime"] != "": + endtime = datetime.strptime(req["endtime-"+req["recid"]], + "%Y/%m/%d %H:%M:%S") + print "Name %s RECID %s Starttime %s EndTime %s" %\ + (req["name"], req["recid"], starttime, endtime) + ret = self.db.add(Rec(name=req["name"], + recid=req["recid"], + starttime=starttime, + endtime=endtime) + ) return self.rec_msg("Nuova registrazione creata! (id:%d)" % ret.id, - id=ret.id) + id=ret.id) - # @route('/active') def getactive(self): print "GetActive" - """ - DELETE HANDLER - """ - # @route('/delete/') # @route('/delete//') - def delete( self, recid = None ): - req = dict( request.POST.allitems() ) - logging.info("Server: request delete %s " % ( req ) ) - if not req.has_key( "recid" ): + def delete(self, recid=None): + req = dict(request.POST.allitems()) + logging.info("Server: request delete %s " % (req)) + if 'recid' not in req: return self.rec_err("No valid ID") - if self.db.delete( req["recid"] ): + if self.db.delete(req["recid"]): return self.rec_msg("DELETE OK") else: return self.rec_err("DELETE error: %s" % (self.db.get_err())) - """ - UPDATE HANDLER - """ - # @route('/delete/') # @route('/delete//') - def update( self ): - req = dict( request.POST.allitems() ) + def update(self): + req = dict(request.POST.allitems()) - ret={} - ret["starttime"] = req ["starttime-"+req["recid"]] - ret["endtime"] = req["endtime-"+req["recid"]] - ret["name"] = req["name-"+req["recid"]] + ret = {} + ret["starttime"] = req["starttime"] + ret["endtime"] = req["endtime"] + ret["name"] = req["name"] - if self.db.update( req["recid"], ret ): - return self.rec_msg("Aggiornamento completato!"); + if not self.db.update(req["recid"], ret): + return self.rec_err("Errore Aggiornamento") + req['filename'] = 'ror-%s-%s' % (req['recid'], req['name']) + + # TODO: real ffmpeg job! + job_id = get_process_queue().submit(simulate_long_job, **req) + print "SUBMITTED: %d" % job_id + return self.rec_msg("Aggiornamento completato!", job_id=job_id) + + def check_job(self, job_id): + try: + job = get_process_queue().check_job(job_id) + except ValueError: + abort(400, 'job_id not valid') + + def ret(status): + return {'job_status': status, 'job_id': job_id} + if job is True: + return ret('DONE') + if job is False: + abort(404, 'No such job has ever been spawned') else: - return self.rec_err("Errore Aggiornamento"); + if job.ready(): + try: + res = job.get() + return res + except Exception as exc: + return ret('FAILED') + return ret('WIP') - """ - JSON' RESPONDER - """ - def rec_msg(self, msg, **kwargs): - d = {"message": msg, "status": True} - d.update(kwargs) - return d - def rec_err(self, msg): - return {"error": msg, "status": False} + def running_jobs(self): + res = {} + res['last_job_id'] = get_process_queue().last_job_id + res['running'] = get_process_queue().jobs.keys() + return res - """ - @route('/search') # @route('/search/') # @route('/search//') - """ - def search( self, args=None): + def search(self, args=None): if request.method == 'GET': req = dict( request.GET.allitems() ) else: @@ -172,6 +179,16 @@ class RecServer:

/update

\

Not implemented.

" + # JSON UTILS + def rec_msg(self, msg, **kwargs): + d = {"message": msg, "status": True} + d.update(kwargs) + return d + + def rec_err(self, msg): + return {"error": msg, "status": False} + + if __name__ == "__main__": c = RecServer() c._app.run(host="localhost", port="8000", debug=True, reloader=True) diff --git a/server/static/js/reclibrary.js b/server/static/js/reclibrary.js index 73a2809..c72071a 100644 --- a/server/static/js/reclibrary.js +++ b/server/static/js/reclibrary.js @@ -3,19 +3,19 @@ console.log("Loading..."); function trx_startbut( code ) { return "startbutton-"+code; } function trx_stopbut( code ) { return "stopbutton-"+code; } function trx_downbut( code ) { return "downloadbutton-"+code; } -function trx_endbut( code ) { return "endbutton-"+code; } +function trx_endbut( code ) { return "endbutton-"+code; } function trx_logarea( code ) { return "logarea-"+code; } function rs_button( code ) { return "button"+code; } function rs_trxarea( code ) { return "recarea-"+code; } -function rs_trxname( code ) { return "name-"+code; } +function rs_trxname( code ) { return "name"; } function rs_buttonarea( code ) { return "butarea-"+code; } -function rs_inputstart( code ) { return "starttime-"+code; } -function rs_inputend( code ) { return "endtime-"+code; } +function rs_inputstart( code ) { return "starttime"; } +function rs_inputend( code ) { return "endtime"; } function rs_formid(code) { return "form-"+code; } -function rs_dellink(code) { return "dellink-"+code;} +function rs_dellink(code) { return "dellink-"+code;} function rs_id(code) { return code; } var txt_start = "Inizia"; @@ -84,12 +84,12 @@ function rec_active( recid ) { request.done( function(data) { $.each(data, function(key, val) { console.log("Key " + key + " > VAL " + val ); - $("#"+trx_logarea( recid )).append( "Key " + key + " > VAL " + val + "
" ); - }); + $("#"+trx_logarea( recid )).append( "Key " + key + " > VAL " + val + "
" ); + }); - console.log("Req OK: "+ data); - // console.log("request"+ req); - ChangeState(recid, trx_downbut(recid) , trx_endbut(recid)); + console.log("Req OK: "+ data); + // console.log("request"+ req); + ChangeState(recid, trx_downbut(recid) , trx_endbut(recid)); }); } @@ -101,7 +101,7 @@ function rec_new( ) { var myDate = new Date() - console.log("New ID "+ myDate.getTime()); + console.log("New ID "+ myDate.getTime()); var recid = "rec-"+ myDate.getTime(); console.log("[rec_new] New Rec " + recid); @@ -127,27 +127,27 @@ function rec_new( ) $("#"+rs_trxarea(recid)).append( "
Nuova trasmissione
" ); // Bind the Delete Links - $("#"+rs_dellink(recid)).click(function(){ - console.log("Remove " + rs_trxarea(recid) + "[ID"+recid+"]"); - // $("#"+rs_trxarea(recid)).remove(); - recDelete (recid,rs_trxarea(recid)); + $("#"+rs_dellink(recid)).click(function(){ + console.log("Remove " + rs_trxarea(recid) + "[ID"+recid+"]"); + // $("#"+rs_trxarea(recid)).remove(); + recDelete (recid,rs_trxarea(recid)); }); // FORM SUBMIT: THE REC IS STOPPEND AND MUST BE PROCESSED $("#"+formid).submit(function(event){ - // Immediately, mark the end time (stop action) - ChangeState(recid, trx_downbut(recid) , trx_endbut(recid)); + // Immediately, mark the end time (stop action) + ChangeState(recid, trx_downbut(recid) , trx_endbut(recid)); // Force a Name - while (true) { - if ( $("#"+rs_trxname(recid)).val() == "" ) - { - var tmpname = prompt("Nessun nome di trasmissione!!!"); - $("#"+rs_trxname(recid)).val(tmpname); - $("#"+trx_logarea(recid)).append("Titolo: "+ tmpname +"
"); - } - else { break; } - } + while (true) { + if ( $("#"+rs_trxname(recid)).val() == "" ) + { + var tmpname = prompt("Nessun nome di trasmissione!!!"); + $("#"+rs_trxname(recid)).val(tmpname); + $("#"+trx_logarea(recid)).append("Titolo: "+ tmpname +"
"); + } + else { break; } + } event.preventDefault(); @@ -171,7 +171,7 @@ function rec_new( ) $("#"+trx_startbut(recid)).click( function(event){ // Immediately, mark the start time (start action) and send it to Server - ChangeState(recid, trx_startbut(recid) , trx_stopbut(recid)); + ChangeState(recid, trx_startbut(recid) , trx_stopbut(recid)); event.preventDefault(); recNew( recid ); @@ -216,17 +216,17 @@ function recNew ( recid ) { var request = RecAjax("create", dataString); request.done( function(data) { - $.each(data, function(key, val) { - console.log("Received (K:V) ("+key+":"+val+")") ; - if (key == "msg") { - $("#"+trx_logarea(recid)).html("Nuova Registrazione
(recid:"+recid+")
"); - $("#"+trx_logarea(recid)).append("Inizio: "+ $("#"+rs_inputstart(recid)).val() +"
"); - } - if (key == "error") { - $("#"+trx_logarea( recid )).html("Errore: impossibile creare una nuova registrazione"+val+" "); - } + $.each(data, function(key, val) { + console.log("Received (K:V) ("+key+":"+val+")") ; + if (key == "msg") { + $("#"+trx_logarea(recid)).html("Nuova Registrazione
(recid:"+recid+")
"); + $("#"+trx_logarea(recid)).append("Inizio: "+ $("#"+rs_inputstart(recid)).val() +"
"); + } + if (key == "error") { + $("#"+trx_logarea( recid )).html("Errore: impossibile creare una nuova registrazione"+val+" "); + } - }); + }); } ); return request; } @@ -240,42 +240,40 @@ function recUpdate( recid ) { //event.preventDefault(); var request = RecAjax("update", dataString ); request.done( function(data) { - $.each(data, function(key, val) { - console.log("recUpdate receive (k:v) ("+key+":"+val+")" ); + $.each(data, function(key, val) { + console.log("recUpdate receive (k:v) ("+key+":"+val+")" ); - if (key == "message") { - var str = ""; - str += "RecID "+ recid + "
" - str += "nome "+ $("#"+rs_trxname(recid)).val() + "
" - str += "Inizio "+ $("#"+rs_inputstart(recid)).val() + "
" - str += "Fine "+ $("#"+rs_inputend(recid)).val() + "
" + if (key == "message") { + var str = ""; + str += "RecID "+ recid + "
" + str += "nome "+ $("#"+rs_trxname(recid)).val() + "
" + str += "Inizio "+ $("#"+rs_inputstart(recid)).val() + "
" + str += "Fine "+ $("#"+rs_inputend(recid)).val() + "
" - $("#"+trx_logarea(recid)).html( str ); + $("#"+trx_logarea(recid)).html( str ); // if all elements have been recorded if ($("#"+rs_trxname(recid)).val() != "") { $("#"+trx_logarea(recid)).append( "In Elaborazione" ); } - } + } - if (key == "error") { - $("#"+trx_logarea( recid )).append( "Error:" + val +"
" ); - } - }); // end of each + if (key == "error") { + $("#"+trx_logarea( recid )).append( "Error:" + val +"
" ); + } + }); // end of each }); // end of request.done } /* * - * AJAX REQUEST + * AJAX REQUEST * */ function RecAjax(apipath, dataString ) { - - var srv = srvaddr + apipath ; - + var srv = srvaddr + "api/" + apipath ; var request = $.ajax({ type: "POST", - cache: false, + cache: false, url: srv, data: dataString, dataType: "json" @@ -284,25 +282,25 @@ function RecAjax(apipath, dataString ) { request.fail(function (jqXHR, textStatus, errorThrown){ console.error("The following error occured: "+ jqXHR.status +"-"+ textStatus + "-" + errorThrown ); if (jqXHR.status == 0 && jqXHR.readyState === 4) - { - alert("Errore di connessione, impossibile inviare i dati al server "+ srv); - } else { - alert("Error: "+jqXHR.status +"\nTextStatus: "+ textStatus + "\n Ready State "+jqXHR.readyState+"\n" + errorThrown ); - } - }); + { + alert("Errore di connessione, impossibile inviare i dati al server "+ srv); + } else { + alert("Error: "+jqXHR.status +"\nTextStatus: "+ textStatus + "\n Ready State "+jqXHR.readyState+"\n" + errorThrown ); + } + }); return request; } /* - * GetNow (data parser) + * GetNow (data parser) */ function getnow() { var myDate = new Date() var displayDate = myDate.getFullYear() + '/' + (myDate.getMonth()+1) + '/' + myDate.getDate(); displayDate = displayDate +' '+ myDate.getHours()+':'+myDate.getMinutes()+':'+myDate.getSeconds(); - return displayDate; + return displayDate; } /* @@ -330,9 +328,9 @@ function ChangeState(recid, from, to) { } if ( from == trx_downbut(recid) ) { - $("input[type=submit]").attr("disabled", "disabled"); + $("input[type=submit]").attr("disabled", "disabled"); console.log("ChangeState: set '"+rs_inputend(recid)+ "' to "+ displayDate ); } } // End function ChangeState - +// vim: set ts=4 sw=4 et: diff --git a/server/static/js/tempo.js b/server/static/js/tempo.js index 9c596b2..19a18d4 100644 --- a/server/static/js/tempo.js +++ b/server/static/js/tempo.js @@ -8,7 +8,7 @@ $(document).ready(function(){ function (event) { event.preventDefault(); dataString = $(this).serialize(); - var request = $.getJSON('/search', dataString); + var request = $.getJSON('/api/search', dataString); $("#searchresult").html(" "); request.done( function(data) {