processqueue.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import multiprocessing
  2. class JobQueue(object):
  3. def __init__(self):
  4. self.pool = multiprocessing.Pool(processes=1)
  5. self.last_job_id = 0
  6. self.jobs = {} # job_id: AsyncResult
  7. def submit(self, function, *args, **kwargs):
  8. self.last_job_id += 1
  9. job_id = self.last_job_id
  10. def clean_jobs(res):
  11. '''this callback will remove the job from the queue'''
  12. del self.jobs[job_id]
  13. self.jobs[job_id] = self.pool.apply_async(function, args, kwargs,
  14. clean_jobs)
  15. return job_id
  16. def check_job(self, job_id):
  17. '''
  18. If the job is running, return the asyncResult.
  19. If it has already completed, returns True.
  20. If no such job_id exists at all, returns False
  21. '''
  22. if job_id <= 0:
  23. raise ValueError("non-valid job_id")
  24. if self.last_job_id < job_id:
  25. return False
  26. if job_id in self.jobs:
  27. return self.jobs[job_id]
  28. return True
  29. def join(self):
  30. self.pool.close()
  31. self.pool.join()
  32. self.pool = None
  33. def simulate_long_job(recid=None, starttime=None, endtime=None, name='', filename=None):
  34. from time import sleep
  35. print "evviva " + name
  36. sleep(2)
  37. print "lavoro su " + name
  38. sleep(2)
  39. print "done su " + name
  40. _queue = None
  41. def get_process_queue():
  42. global _queue
  43. if _queue is None:
  44. _queue = JobQueue()
  45. return _queue
  46. if __name__ == '__main__':
  47. from datetime import datetime
  48. n = datetime.now()
  49. def sleep(n):
  50. import time
  51. print "Inizio %d" % n
  52. time.sleep(n)
  53. print "Finisco %d" % n
  54. return n
  55. get_process_queue().submit(sleep, 3)
  56. get_process_queue().submit(sleep, 3)
  57. get_process_queue().join()
  58. print get_process_queue().jobs
  59. delta = (datetime.now() - n).total_seconds()
  60. print delta
  61. assert 5 < delta < 7