larigira/test_parented.py

83 lines
1.9 KiB
Python

from __future__ import print_function
from gevent import monkey
monkey.patch_all(subprocess=True)
import pytest
import gevent
from gevent.queue import Queue
from mpc import Timer, ParentedLet
# TODO: implement simple children and check that we will receive the expected
# messages on the queue
@pytest.fixture
def queue():
print('fixture q')
return Queue()
@pytest.fixture
def range_parentlet():
class RangeLet(ParentedLet):
def __init__(self, howmany, queue):
ParentedLet.__init__(self, queue)
self.howmany = howmany
def do_business(self):
for i in xrange(self.howmany):
yield('range', i)
return RangeLet
@pytest.fixture
def single_value_parentlet():
class SingleLet(ParentedLet):
def __init__(self, val, queue):
ParentedLet.__init__(self, queue)
self.val = val
def do_business(self):
yield('single', self.val)
return SingleLet
def test_parented_range(queue, range_parentlet):
t = range_parentlet(5, queue)
t.start()
gevent.sleep(0.01)
assert queue.qsize() == 5
while not queue.empty():
msg = queue.get()
assert msg['kind'] == 'range'
def test_parented_single(queue, single_value_parentlet):
t = single_value_parentlet(123, queue)
t.start()
gevent.sleep(0.01)
msg = queue.get_nowait()
assert msg['args'][0] == 123
def test_timer_finally(queue):
'''at somepoint, it will get results'''
period = 10
t = Timer(period, queue)
t.start()
gevent.sleep(period*3/1000.0)
queue.get_nowait()
def test_timer_righttime(queue):
'''not too early, not too late'''
period = 500
t = Timer(period, queue)
t.start()
gevent.sleep(period/(10*1000.0))
assert queue.empty() is True
gevent.sleep(period*(2/1000.0))
assert not queue.empty()
queue.get_nowait()