processqueue.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import multiprocessing
  2. class JobQueue(object):
  3. def __init__(self):
  4. self.pool = multiprocessing.Pool(processes=1)
  5. self.last_job_id = 0
  6. self.jobs = {} # job_id: AsyncResult
  7. def submit(self, function, *args, **kwargs):
  8. self.last_job_id += 1
  9. job_id = self.last_job_id
  10. def clean_jobs(res):
  11. """this callback will remove the job from the queue"""
  12. del self.jobs[job_id]
  13. self.jobs[job_id] = self.pool.apply_async(function, args, kwargs, clean_jobs)
  14. return job_id
  15. def check_job(self, job_id):
  16. """
  17. If the job is running, return the asyncResult.
  18. If it has already completed, returns True.
  19. If no such job_id exists at all, returns False
  20. """
  21. if job_id <= 0:
  22. raise ValueError("non-valid job_id")
  23. if self.last_job_id < job_id:
  24. return False
  25. if job_id in self.jobs:
  26. return self.jobs[job_id]
  27. return True
  28. def join(self):
  29. self.pool.close()
  30. self.pool.join()
  31. self.pool = None
  32. def simulate_long_job(recid=None, starttime=None, endtime=None, name="", filename=None):
  33. from time import sleep
  34. print("evviva " + name)
  35. sleep(2)
  36. print("lavoro su " + name)
  37. sleep(2)
  38. print("done su " + name)
  39. _queue = None
  40. def get_process_queue():
  41. global _queue
  42. if _queue is None:
  43. _queue = JobQueue()
  44. return _queue
  45. if __name__ == "__main__":
  46. from datetime import datetime
  47. n = datetime.now()
  48. def sleep(n):
  49. import time
  50. print("Inizio %d" % n)
  51. time.sleep(n)
  52. print("Finisco %d" % n)
  53. return n
  54. get_process_queue().submit(sleep, 3)
  55. get_process_queue().submit(sleep, 3)
  56. get_process_queue().join()
  57. print(get_process_queue().jobs)
  58. delta = (datetime.now() - n).total_seconds()
  59. print(delta)
  60. assert 5 < delta < 7