test_parented.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from __future__ import print_function
  2. from gevent import monkey
  3. monkey.patch_all(subprocess=True)
  4. import pytest
  5. import gevent
  6. from gevent.queue import Queue
  7. from mpc import Timer, ParentedLet
  8. # TODO: implement simple children and check that we will receive the expected
  9. # messages on the queue
  10. @pytest.fixture
  11. def queue():
  12. print('fixture q')
  13. return Queue()
  14. @pytest.fixture
  15. def range_parentlet():
  16. class RangeLet(ParentedLet):
  17. def __init__(self, howmany, queue):
  18. ParentedLet.__init__(self, queue)
  19. self.howmany = howmany
  20. def do_business(self):
  21. for i in xrange(self.howmany):
  22. yield('range', i)
  23. return RangeLet
  24. @pytest.fixture
  25. def single_value_parentlet():
  26. class SingleLet(ParentedLet):
  27. def __init__(self, val, queue):
  28. ParentedLet.__init__(self, queue)
  29. self.val = val
  30. def do_business(self):
  31. yield('single', self.val)
  32. return SingleLet
  33. def test_parented_range(queue, range_parentlet):
  34. t = range_parentlet(5, queue)
  35. t.start()
  36. gevent.sleep(0.01)
  37. assert queue.qsize() == 5
  38. while not queue.empty():
  39. msg = queue.get()
  40. assert msg['kind'] == 'range'
  41. def test_parented_single(queue, single_value_parentlet):
  42. t = single_value_parentlet(123, queue)
  43. t.start()
  44. gevent.sleep(0.01)
  45. msg = queue.get_nowait()
  46. assert msg['args'][0] == 123
  47. def test_timer_finally(queue):
  48. '''at somepoint, it will get results'''
  49. period = 10
  50. t = Timer(period, queue)
  51. t.start()
  52. gevent.sleep(period*3/1000.0)
  53. queue.get_nowait()
  54. def test_timer_righttime(queue):
  55. '''not too early, not too late'''
  56. period = 500
  57. t = Timer(period, queue)
  58. t.start()
  59. gevent.sleep(period/(10*1000.0))
  60. assert queue.empty() is True
  61. gevent.sleep(period*(2/1000.0))
  62. assert not queue.empty()
  63. queue.get_nowait()