tidying things & unit testing!

This commit is contained in:
boyska 2014-10-26 19:42:16 +01:00
parent 8da8c38a8c
commit 441ac2683c
4 changed files with 99 additions and 18 deletions

View file

@ -49,3 +49,19 @@ class CeleryTask(ParentedLet):
**self.apply_async_args)
val = asyncres.get()
yield ('celery', val)
class Timer(ParentedLet):
def __init__(self, milliseconds, queue):
ParentedLet.__init__(self, queue)
self.ms = milliseconds
def parent_msg(self, kind, *args):
msg = ParentedLet.parent_msg(self, kind, *args)
msg['period'] = self.ms
return msg
def do_business(self):
while True:
gevent.sleep(self.ms / 1000.0)
yield ('timer', )

18
mpc.py
View file

@ -12,7 +12,7 @@ import gevent
from gevent.queue import Queue
from eventutils import ParentedLet, CeleryTask
from eventutils import ParentedLet, CeleryTask, Timer
from task import create as create_continous
@ -26,22 +26,6 @@ class MpcWatcher(ParentedLet):
yield ('mpc', status)
class Timer(ParentedLet):
def __init__(self, milliseconds, queue):
ParentedLet.__init__(self, queue)
self.ms = milliseconds
def parent_msg(self, kind, *args):
msg = ParentedLet.parent_msg(self, kind, *args)
msg['period'] = self.ms
return msg
def do_business(self):
while True:
gevent.sleep(self.ms / 1000.0)
yield ('timer', )
class Player(gevent.Greenlet):
def __init__(self):
gevent.Greenlet.__init__(self)

View file

@ -6,7 +6,9 @@ eventlet==0.15.2
gevent==1.0.1
greenlet==0.4.5
kombu==3.0.23
py==1.4.26
pyinotify==0.9.4
pytest==2.6.4
pytz==2014.7
redis==2.10.3
wsgiref==0.1.2

View file

@ -1,4 +1,83 @@
from mpc import ParentedLet
from __future__ import print_function
from gevent import monkey
monkey.patch_all(subprocess=True)
import pytest
import gevent
from gevent.queue import Queue
from mpc import Timer, ParentedLet
# TODO: implement simple children and check that we will receive the expected
# messages on the queue
@pytest.fixture
def queue():
print('fixture q')
return Queue()
@pytest.fixture
def range_parentlet():
class RangeLet(ParentedLet):
def __init__(self, howmany, queue):
ParentedLet.__init__(self, queue)
self.howmany = howmany
def do_business(self):
for i in xrange(self.howmany):
yield('range', i)
return RangeLet
@pytest.fixture
def single_value_parentlet():
class SingleLet(ParentedLet):
def __init__(self, val, queue):
ParentedLet.__init__(self, queue)
self.val = val
def do_business(self):
yield('single', self.val)
return SingleLet
def test_parented_range(queue, range_parentlet):
t = range_parentlet(5, queue)
t.start()
gevent.sleep(0.01)
assert queue.qsize() == 5
while not queue.empty():
msg = queue.get()
assert msg['kind'] == 'range'
def test_parented_single(queue, single_value_parentlet):
t = single_value_parentlet(123, queue)
t.start()
gevent.sleep(0.01)
msg = queue.get_nowait()
assert msg['args'][0] == 123
def test_timer_finally(queue):
'''at somepoint, it will get results'''
period = 10
t = Timer(period, queue)
t.start()
gevent.sleep(period*3/1000.0)
queue.get_nowait()
def test_timer_righttime(queue):
'''not too early, not too late'''
period = 500
t = Timer(period, queue)
t.start()
gevent.sleep(period/(10*1000.0))
assert queue.empty() is True
gevent.sleep(period*(2/1000.0))
assert not queue.empty()
queue.get_nowait()