test_tasks.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. import pytest
  2. from datetime import datetime, timedelta
  3. from case import ContextMock, MagicMock, Mock, patch
  4. from kombu import Queue
  5. from celery import Task, group, uuid
  6. from celery.app.task import _reprtask
  7. from celery.exceptions import Ignore, Retry
  8. from celery.result import EagerResult
  9. from celery.utils.time import parse_iso8601
  10. def return_True(*args, **kwargs):
  11. # Task run functions can't be closures/lambdas, as they're pickled.
  12. return True
  13. class MockApplyTask(Task):
  14. abstract = True
  15. applied = 0
  16. def run(self, x, y):
  17. return x * y
  18. def apply_async(self, *args, **kwargs):
  19. self.applied += 1
  20. class TasksCase:
  21. def setup(self):
  22. self.app.conf.task_protocol = 1 # XXX Still using proto1
  23. self.mytask = self.app.task(shared=False)(return_True)
  24. @self.app.task(bind=True, count=0, shared=False)
  25. def increment_counter(self, increment_by=1):
  26. self.count += increment_by or 1
  27. return self.count
  28. self.increment_counter = increment_counter
  29. @self.app.task(shared=False)
  30. def raising():
  31. raise KeyError('foo')
  32. self.raising = raising
  33. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  34. def retry_task(self, arg1, arg2, kwarg=1, max_retries=None, care=True):
  35. self.iterations += 1
  36. rmax = self.max_retries if max_retries is None else max_retries
  37. assert repr(self.request)
  38. retries = self.request.retries
  39. if care and retries >= rmax:
  40. return arg1
  41. else:
  42. raise self.retry(countdown=0, max_retries=rmax)
  43. self.retry_task = retry_task
  44. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  45. def retry_task_noargs(self, **kwargs):
  46. self.iterations += 1
  47. if self.request.retries >= 3:
  48. return 42
  49. else:
  50. raise self.retry(countdown=0)
  51. self.retry_task_noargs = retry_task_noargs
  52. @self.app.task(bind=True, max_retries=3, iterations=0,
  53. base=MockApplyTask, shared=False)
  54. def retry_task_mockapply(self, arg1, arg2, kwarg=1):
  55. self.iterations += 1
  56. retries = self.request.retries
  57. if retries >= 3:
  58. return arg1
  59. raise self.retry(countdown=0)
  60. self.retry_task_mockapply = retry_task_mockapply
  61. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  62. def retry_task_customexc(self, arg1, arg2, kwarg=1, **kwargs):
  63. self.iterations += 1
  64. retries = self.request.retries
  65. if retries >= 3:
  66. return arg1 + kwarg
  67. else:
  68. try:
  69. raise MyCustomException('Elaine Marie Benes')
  70. except MyCustomException as exc:
  71. kwargs.update(kwarg=kwarg)
  72. raise self.retry(countdown=0, exc=exc)
  73. self.retry_task_customexc = retry_task_customexc
  74. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  75. shared=False)
  76. def autoretry_task_no_kwargs(self, a, b):
  77. self.iterations += 1
  78. return a / b
  79. self.autoretry_task_no_kwargs = autoretry_task_no_kwargs
  80. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  81. retry_kwargs={'max_retries': 5}, shared=False)
  82. def autoretry_task(self, a, b):
  83. self.iterations += 1
  84. return a / b
  85. self.autoretry_task = autoretry_task
  86. # memove all messages from memory-transport
  87. from kombu.transport.memory import Channel
  88. Channel.queues.clear()
  89. class MyCustomException(Exception):
  90. """Random custom exception."""
  91. class test_task_retries(TasksCase):
  92. def test_retry(self):
  93. self.retry_task.max_retries = 3
  94. self.retry_task.iterations = 0
  95. self.retry_task.apply([0xFF, 0xFFFF])
  96. assert self.retry_task.iterations == 4
  97. self.retry_task.max_retries = 3
  98. self.retry_task.iterations = 0
  99. self.retry_task.apply([0xFF, 0xFFFF], {'max_retries': 10})
  100. assert self.retry_task.iterations == 11
  101. def test_retry_no_args(self):
  102. self.retry_task_noargs.max_retries = 3
  103. self.retry_task_noargs.iterations = 0
  104. self.retry_task_noargs.apply(propagate=True).get()
  105. assert self.retry_task_noargs.iterations == 4
  106. def test_signature_from_request__passes_headers(self):
  107. self.retry_task.push_request()
  108. self.retry_task.request.headers = {'custom': 10.1}
  109. sig = self.retry_task.signature_from_request()
  110. assert sig.options['headers']['custom'] == 10.1
  111. def test_signature_from_request__delivery_info(self):
  112. self.retry_task.push_request()
  113. self.retry_task.request.delivery_info = {
  114. 'exchange': 'testex',
  115. 'routing_key': 'testrk',
  116. }
  117. sig = self.retry_task.signature_from_request()
  118. assert sig.options['exchange'] == 'testex'
  119. assert sig.options['routing_key'] == 'testrk'
  120. def test_retry_kwargs_can_be_empty(self):
  121. self.retry_task_mockapply.push_request()
  122. try:
  123. with pytest.raises(Retry):
  124. import sys
  125. try:
  126. sys.exc_clear()
  127. except AttributeError:
  128. pass
  129. self.retry_task_mockapply.retry(args=[4, 4], kwargs=None)
  130. finally:
  131. self.retry_task_mockapply.pop_request()
  132. def test_retry_not_eager(self):
  133. self.retry_task_mockapply.push_request()
  134. try:
  135. self.retry_task_mockapply.request.called_directly = False
  136. exc = Exception('baz')
  137. try:
  138. self.retry_task_mockapply.retry(
  139. args=[4, 4], kwargs={'task_retries': 0},
  140. exc=exc, throw=False,
  141. )
  142. assert self.retry_task_mockapply.applied
  143. finally:
  144. self.retry_task_mockapply.applied = 0
  145. try:
  146. with pytest.raises(Retry):
  147. self.retry_task_mockapply.retry(
  148. args=[4, 4], kwargs={'task_retries': 0},
  149. exc=exc, throw=True)
  150. assert self.retry_task_mockapply.applied
  151. finally:
  152. self.retry_task_mockapply.applied = 0
  153. finally:
  154. self.retry_task_mockapply.pop_request()
  155. def test_retry_with_kwargs(self):
  156. self.retry_task_customexc.max_retries = 3
  157. self.retry_task_customexc.iterations = 0
  158. self.retry_task_customexc.apply([0xFF, 0xFFFF], {'kwarg': 0xF})
  159. assert self.retry_task_customexc.iterations == 4
  160. def test_retry_with_custom_exception(self):
  161. self.retry_task_customexc.max_retries = 2
  162. self.retry_task_customexc.iterations = 0
  163. result = self.retry_task_customexc.apply(
  164. [0xFF, 0xFFFF], {'kwarg': 0xF},
  165. )
  166. with pytest.raises(MyCustomException):
  167. result.get()
  168. assert self.retry_task_customexc.iterations == 3
  169. def test_max_retries_exceeded(self):
  170. self.retry_task.max_retries = 2
  171. self.retry_task.iterations = 0
  172. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  173. with pytest.raises(self.retry_task.MaxRetriesExceededError):
  174. result.get()
  175. assert self.retry_task.iterations == 3
  176. self.retry_task.max_retries = 1
  177. self.retry_task.iterations = 0
  178. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  179. with pytest.raises(self.retry_task.MaxRetriesExceededError):
  180. result.get()
  181. assert self.retry_task.iterations == 2
  182. def test_autoretry_no_kwargs(self):
  183. self.autoretry_task_no_kwargs.max_retries = 3
  184. self.autoretry_task_no_kwargs.iterations = 0
  185. self.autoretry_task_no_kwargs.apply((1, 0))
  186. assert self.autoretry_task_no_kwargs.iterations == 4
  187. def test_autoretry(self):
  188. self.autoretry_task.max_retries = 3
  189. self.autoretry_task.iterations = 0
  190. self.autoretry_task.apply((1, 0))
  191. assert self.autoretry_task.iterations == 6
  192. class test_canvas_utils(TasksCase):
  193. def test_si(self):
  194. assert self.retry_task.si()
  195. assert self.retry_task.si().immutable
  196. def test_chunks(self):
  197. assert self.retry_task.chunks(range(100), 10)
  198. def test_map(self):
  199. assert self.retry_task.map(range(100))
  200. def test_starmap(self):
  201. assert self.retry_task.starmap(range(100))
  202. def test_on_success(self):
  203. self.retry_task.on_success(1, 1, (), {})
  204. class test_tasks(TasksCase):
  205. def now(self):
  206. return self.app.now()
  207. def test_typing(self):
  208. @self.app.task()
  209. def add(x, y, kw=1):
  210. pass
  211. with pytest.raises(TypeError):
  212. add.delay(1)
  213. with pytest.raises(TypeError):
  214. add.delay(1, kw=2)
  215. with pytest.raises(TypeError):
  216. add.delay(1, 2, foobar=3)
  217. add.delay(2, 2)
  218. def test_typing__disabled(self):
  219. @self.app.task(typing=False)
  220. def add(x, y, kw=1):
  221. pass
  222. add.delay(1)
  223. add.delay(1, kw=2)
  224. add.delay(1, 2, foobar=3)
  225. def test_typing__disabled_by_app(self):
  226. with self.Celery(set_as_current=False, strict_typing=False) as app:
  227. @app.task()
  228. def add(x, y, kw=1):
  229. pass
  230. assert not add.typing
  231. add.delay(1)
  232. add.delay(1, kw=2)
  233. add.delay(1, 2, foobar=3)
  234. @pytest.mark.usefixtures('depends_on_current_app')
  235. def test_unpickle_task(self):
  236. import pickle
  237. @self.app.task(shared=True)
  238. def xxx():
  239. pass
  240. assert pickle.loads(pickle.dumps(xxx)) is xxx.app.tasks[xxx.name]
  241. @patch('celery.app.task.current_app')
  242. @pytest.mark.usefixtures('depends_on_current_app')
  243. def test_bind__no_app(self, current_app):
  244. class XTask(Task):
  245. _app = None
  246. XTask._app = None
  247. XTask.__bound__ = False
  248. XTask.bind = Mock(name='bind')
  249. assert XTask.app is current_app
  250. XTask.bind.assert_called_with(current_app)
  251. def test_reprtask__no_fmt(self):
  252. assert _reprtask(self.mytask)
  253. def test_AsyncResult(self):
  254. task_id = uuid()
  255. result = self.retry_task.AsyncResult(task_id)
  256. assert result.backend == self.retry_task.backend
  257. assert result.id == task_id
  258. def assert_next_task_data_equal(self, consumer, presult, task_name,
  259. test_eta=False, test_expires=False,
  260. **kwargs):
  261. next_task = consumer.queues[0].get(accept=['pickle', 'json'])
  262. task_data = next_task.decode()
  263. assert task_data['id'] == presult.id
  264. assert task_data['task'] == task_name
  265. task_kwargs = task_data.get('kwargs', {})
  266. if test_eta:
  267. assert isinstance(task_data.get('eta'), str)
  268. to_datetime = parse_iso8601(task_data.get('eta'))
  269. assert isinstance(to_datetime, datetime)
  270. if test_expires:
  271. assert isinstance(task_data.get('expires'), str)
  272. to_datetime = parse_iso8601(task_data.get('expires'))
  273. assert isinstance(to_datetime, datetime)
  274. for arg_name, arg_value in kwargs.items():
  275. assert task_kwargs.get(arg_name) == arg_value
  276. def test_incomplete_task_cls(self):
  277. class IncompleteTask(Task):
  278. app = self.app
  279. name = 'c.unittest.t.itask'
  280. with pytest.raises(NotImplementedError):
  281. IncompleteTask().run()
  282. def test_task_kwargs_must_be_dictionary(self):
  283. with pytest.raises(TypeError):
  284. self.increment_counter.apply_async([], 'str')
  285. def test_task_args_must_be_list(self):
  286. with pytest.raises(TypeError):
  287. self.increment_counter.apply_async('s', {})
  288. def test_regular_task(self):
  289. assert isinstance(self.mytask, Task)
  290. assert self.mytask.run()
  291. assert callable(self.mytask)
  292. assert self.mytask(), 'Task class runs run() when called'
  293. with self.app.connection_or_acquire() as conn:
  294. consumer = self.app.amqp.TaskConsumer(conn)
  295. with pytest.raises(NotImplementedError):
  296. consumer.receive('foo', 'foo')
  297. consumer.purge()
  298. assert consumer.queues[0].get() is None
  299. self.app.amqp.TaskConsumer(conn, queues=[Queue('foo')])
  300. # Without arguments.
  301. presult = self.mytask.delay()
  302. self.assert_next_task_data_equal(
  303. consumer, presult, self.mytask.name)
  304. # With arguments.
  305. presult2 = self.mytask.apply_async(
  306. kwargs=dict(name='George Costanza'),
  307. )
  308. self.assert_next_task_data_equal(
  309. consumer, presult2, self.mytask.name, name='George Costanza',
  310. )
  311. # send_task
  312. sresult = self.app.send_task(self.mytask.name,
  313. kwargs=dict(name='Elaine M. Benes'))
  314. self.assert_next_task_data_equal(
  315. consumer, sresult, self.mytask.name, name='Elaine M. Benes',
  316. )
  317. # With ETA.
  318. presult2 = self.mytask.apply_async(
  319. kwargs=dict(name='George Costanza'),
  320. eta=self.now() + timedelta(days=1),
  321. expires=self.now() + timedelta(days=2),
  322. )
  323. self.assert_next_task_data_equal(
  324. consumer, presult2, self.mytask.name,
  325. name='George Costanza', test_eta=True, test_expires=True,
  326. )
  327. # With countdown.
  328. presult2 = self.mytask.apply_async(
  329. kwargs=dict(name='George Costanza'), countdown=10, expires=12,
  330. )
  331. self.assert_next_task_data_equal(
  332. consumer, presult2, self.mytask.name,
  333. name='George Costanza', test_eta=True, test_expires=True,
  334. )
  335. # Discarding all tasks.
  336. consumer.purge()
  337. self.mytask.apply_async()
  338. assert consumer.purge() == 1
  339. assert consumer.queues[0].get() is None
  340. assert not presult.successful()
  341. self.mytask.backend.mark_as_done(presult.id, result=None)
  342. assert presult.successful()
  343. def test_send_event(self):
  344. mytask = self.mytask._get_current_object()
  345. mytask.app.events = Mock(name='events')
  346. mytask.app.events.attach_mock(ContextMock(), 'default_dispatcher')
  347. mytask.request.id = 'fb'
  348. mytask.send_event('task-foo', id=3122)
  349. mytask.app.events.default_dispatcher().send.assert_called_with(
  350. 'task-foo', uuid='fb', id=3122,
  351. retry=True, retry_policy=self.app.conf.task_publish_retry_policy)
  352. def test_replace(self):
  353. sig1 = Mock(name='sig1')
  354. sig1.options = {}
  355. with pytest.raises(Ignore):
  356. self.mytask.replace(sig1)
  357. @pytest.mark.usefixtures('depends_on_current_app')
  358. def test_replace_callback(self):
  359. c = group([self.mytask.s()], app=self.app)
  360. c.freeze = Mock(name='freeze')
  361. c.delay = Mock(name='delay')
  362. self.mytask.request.id = 'id'
  363. self.mytask.request.group = 'group'
  364. self.mytask.request.root_id = 'root_id'
  365. self.mytask.request.callbacks = 'callbacks'
  366. self.mytask.request.errbacks = 'errbacks'
  367. class JsonMagicMock(MagicMock):
  368. parent = None
  369. def __json__(self):
  370. return 'whatever'
  371. def reprcall(self, *args, **kwargs):
  372. return 'whatever2'
  373. mocked_signature = JsonMagicMock(name='s')
  374. accumulate_mock = JsonMagicMock(name='accumulate', s=mocked_signature)
  375. self.mytask.app.tasks['celery.accumulate'] = accumulate_mock
  376. try:
  377. self.mytask.replace(c)
  378. except Ignore:
  379. mocked_signature.return_value.set.assert_called_with(
  380. chord=None,
  381. link='callbacks',
  382. link_error='errbacks',
  383. )
  384. def test_replace_group(self):
  385. c = group([self.mytask.s()], app=self.app)
  386. c.freeze = Mock(name='freeze')
  387. c.delay = Mock(name='delay')
  388. self.mytask.request.id = 'id'
  389. self.mytask.request.group = 'group'
  390. self.mytask.request.root_id = 'root_id',
  391. with pytest.raises(Ignore):
  392. self.mytask.replace(c)
  393. def test_add_trail__no_trail(self):
  394. mytask = self.increment_counter._get_current_object()
  395. mytask.trail = False
  396. mytask.add_trail('foo')
  397. def test_apply_with_self(self):
  398. @self.app.task(__self__=42, shared=False)
  399. def tawself(self):
  400. return self
  401. assert tawself.apply().get() == 42
  402. assert tawself() == 42
  403. def test_context_get(self):
  404. self.mytask.push_request()
  405. try:
  406. request = self.mytask.request
  407. request.foo = 32
  408. assert request.get('foo') == 32
  409. assert request.get('bar', 36) == 36
  410. request.clear()
  411. finally:
  412. self.mytask.pop_request()
  413. def test_annotate(self):
  414. with patch('celery.app.task.resolve_all_annotations') as anno:
  415. anno.return_value = [{'FOO': 'BAR'}]
  416. @self.app.task(shared=False)
  417. def task():
  418. pass
  419. task.annotate()
  420. assert task.FOO == 'BAR'
  421. def test_after_return(self):
  422. self.mytask.push_request()
  423. try:
  424. self.mytask.request.chord = self.mytask.s()
  425. self.mytask.after_return('SUCCESS', 1.0, 'foobar', (), {}, None)
  426. self.mytask.request.clear()
  427. finally:
  428. self.mytask.pop_request()
  429. def test_update_state(self):
  430. @self.app.task(shared=False)
  431. def yyy():
  432. pass
  433. yyy.push_request()
  434. try:
  435. tid = uuid()
  436. yyy.update_state(tid, 'FROBULATING', {'fooz': 'baaz'})
  437. assert yyy.AsyncResult(tid).state == 'FROBULATING'
  438. assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
  439. yyy.request.id = tid
  440. yyy.update_state(state='FROBUZATING', meta={'fooz': 'baaz'})
  441. assert yyy.AsyncResult(tid).state == 'FROBUZATING'
  442. assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
  443. finally:
  444. yyy.pop_request()
  445. def test_repr(self):
  446. @self.app.task(shared=False)
  447. def task_test_repr():
  448. pass
  449. assert 'task_test_repr' in repr(task_test_repr)
  450. def test_has___name__(self):
  451. @self.app.task(shared=False)
  452. def yyy2():
  453. pass
  454. assert yyy2.__name__
  455. class test_apply_task(TasksCase):
  456. def test_apply_throw(self):
  457. with pytest.raises(KeyError):
  458. self.raising.apply(throw=True)
  459. def test_apply_with_task_eager_propagates(self):
  460. self.app.conf.task_eager_propagates = True
  461. with pytest.raises(KeyError):
  462. self.raising.apply()
  463. def test_apply(self):
  464. self.increment_counter.count = 0
  465. e = self.increment_counter.apply()
  466. assert isinstance(e, EagerResult)
  467. assert e.get() == 1
  468. e = self.increment_counter.apply(args=[1])
  469. assert e.get() == 2
  470. e = self.increment_counter.apply(kwargs={'increment_by': 4})
  471. assert e.get() == 6
  472. assert e.successful()
  473. assert e.ready()
  474. assert repr(e).startswith('<EagerResult:')
  475. f = self.raising.apply()
  476. assert f.ready()
  477. assert not f.successful()
  478. assert f.traceback
  479. with pytest.raises(KeyError):
  480. f.get()