76 lines
1.8 KiB
Python
76 lines
1.8 KiB
Python
from __future__ import print_function
|
|
from gevent import monkey
|
|
monkey.patch_all(subprocess=True)
|
|
|
|
import pytest
|
|
|
|
import gevent
|
|
|
|
from mpc import Timer, ParentedLet
|
|
|
|
# TODO: implement simple children and check that we will receive the expected
|
|
# messages on the queue
|
|
|
|
|
|
@pytest.fixture
|
|
def range_parentlet():
|
|
class RangeLet(ParentedLet):
|
|
def __init__(self, howmany, queue):
|
|
ParentedLet.__init__(self, queue)
|
|
self.howmany = howmany
|
|
|
|
def do_business(self):
|
|
for i in xrange(self.howmany):
|
|
yield('range', i)
|
|
return RangeLet
|
|
|
|
|
|
@pytest.fixture
|
|
def single_value_parentlet():
|
|
class SingleLet(ParentedLet):
|
|
def __init__(self, val, queue):
|
|
ParentedLet.__init__(self, queue)
|
|
self.val = val
|
|
|
|
def do_business(self):
|
|
yield('single', self.val)
|
|
return SingleLet
|
|
|
|
|
|
def test_parented_range(queue, range_parentlet):
|
|
t = range_parentlet(5, queue)
|
|
t.start()
|
|
gevent.sleep(0.01)
|
|
assert queue.qsize() == 5
|
|
while not queue.empty():
|
|
msg = queue.get()
|
|
assert msg['kind'] == 'range'
|
|
|
|
|
|
def test_parented_single(queue, single_value_parentlet):
|
|
t = single_value_parentlet(123, queue)
|
|
t.start()
|
|
gevent.sleep(0.01)
|
|
msg = queue.get_nowait()
|
|
assert msg['args'][0] == 123
|
|
|
|
|
|
def test_timer_finally(queue):
|
|
'''at somepoint, it will get results'''
|
|
period = 10
|
|
t = Timer(period, queue)
|
|
t.start()
|
|
gevent.sleep(period*3/1000.0)
|
|
queue.get_nowait()
|
|
|
|
|
|
def test_timer_righttime(queue):
|
|
'''not too early, not too late'''
|
|
period = 500
|
|
t = Timer(period, queue)
|
|
t.start()
|
|
gevent.sleep(period/(10*1000.0))
|
|
assert queue.empty() is True
|
|
gevent.sleep(period*(2/1000.0))
|
|
assert not queue.empty()
|
|
queue.get_nowait()
|