12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- from __future__ import absolute_import, unicode_literals
- import itertools
- import time
- from billiard.einfo import ExceptionInfo
- from case import skip
- def do_something(i):
- return i * i
- def long_something():
- time.sleep(1)
- def raise_something(i):
- try:
- raise KeyError('FOO EXCEPTION')
- except KeyError:
- return ExceptionInfo()
- @skip.unless_module('multiprocessing')
- class test_TaskPool:
- def setup(self):
- from celery.concurrency.prefork import TaskPool
- self.TaskPool = TaskPool
- def test_attrs(self):
- p = self.TaskPool(2)
- assert p.limit == 2
- assert p._pool is None
- def x_apply(self):
- p = self.TaskPool(2)
- p.start()
- scratchpad = {}
- proc_counter = itertools.count()
- def mycallback(ret_value):
- process = next(proc_counter)
- scratchpad[process] = {}
- scratchpad[process]['ret_value'] = ret_value
- myerrback = mycallback
- res = p.apply_async(do_something, args=[10], callback=mycallback)
- res2 = p.apply_async(raise_something, args=[10], errback=myerrback)
- res3 = p.apply_async(do_something, args=[20], callback=mycallback)
- assert res.get() == 100
- time.sleep(0.5)
- assert scratchpad.get(0)['ret_value'] == 100
- assert isinstance(res2.get(), ExceptionInfo)
- assert scratchpad.get(1)
- time.sleep(1)
- assert isinstance(scratchpad[1]['ret_value'], ExceptionInfo)
- assert scratchpad[1]['ret_value'].exception.args == ('FOO EXCEPTION',)
- assert res3.get() == 400
- time.sleep(0.5)
- assert scratchpad.get(2)['ret_value'] == 400
- res3 = p.apply_async(do_something, args=[30], callback=mycallback)
- assert res3.get() == 900
- time.sleep(0.5)
- assert scratchpad.get(3)['ret_value'] == 900
- p.stop()
|