test_tasks.py 23 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. import socket
  4. import tempfile
  5. from datetime import datetime, timedelta
  6. try:
  7. from urllib.error import HTTPError
  8. except ImportError: # pragma: no cover
  9. from urllib2 import HTTPError
  10. from case import ContextMock, MagicMock, Mock, patch
  11. from kombu import Queue
  12. from celery import Task, group, uuid
  13. from celery.app.task import _reprtask
  14. from celery.exceptions import Ignore, Retry
  15. from celery.five import items, range, string_t
  16. from celery.result import EagerResult
  17. from celery.utils.time import parse_iso8601
  18. def return_True(*args, **kwargs):
  19. # Task run functions can't be closures/lambdas, as they're pickled.
  20. return True
  21. class MockApplyTask(Task):
  22. abstract = True
  23. applied = 0
  24. def run(self, x, y):
  25. return x * y
  26. def apply_async(self, *args, **kwargs):
  27. self.applied += 1
  28. class TasksCase:
  29. def setup(self):
  30. self.app.conf.task_protocol = 1 # XXX Still using proto1
  31. self.mytask = self.app.task(shared=False)(return_True)
  32. @self.app.task(bind=True, count=0, shared=False)
  33. def increment_counter(self, increment_by=1):
  34. self.count += increment_by or 1
  35. return self.count
  36. self.increment_counter = increment_counter
  37. @self.app.task(shared=False)
  38. def raising():
  39. raise KeyError('foo')
  40. self.raising = raising
  41. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  42. def retry_task(self, arg1, arg2, kwarg=1, max_retries=None, care=True):
  43. self.iterations += 1
  44. rmax = self.max_retries if max_retries is None else max_retries
  45. assert repr(self.request)
  46. retries = self.request.retries
  47. if care and retries >= rmax:
  48. return arg1
  49. else:
  50. raise self.retry(countdown=0, max_retries=rmax)
  51. self.retry_task = retry_task
  52. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  53. def retry_task_noargs(self, **kwargs):
  54. self.iterations += 1
  55. if self.request.retries >= 3:
  56. return 42
  57. else:
  58. raise self.retry(countdown=0)
  59. self.retry_task_noargs = retry_task_noargs
  60. @self.app.task(bind=True, max_retries=3, iterations=0,
  61. base=MockApplyTask, shared=False)
  62. def retry_task_mockapply(self, arg1, arg2, kwarg=1):
  63. self.iterations += 1
  64. retries = self.request.retries
  65. if retries >= 3:
  66. return arg1
  67. raise self.retry(countdown=0)
  68. self.retry_task_mockapply = retry_task_mockapply
  69. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  70. def retry_task_customexc(self, arg1, arg2, kwarg=1, **kwargs):
  71. self.iterations += 1
  72. retries = self.request.retries
  73. if retries >= 3:
  74. return arg1 + kwarg
  75. else:
  76. try:
  77. raise MyCustomException('Elaine Marie Benes')
  78. except MyCustomException as exc:
  79. kwargs.update(kwarg=kwarg)
  80. raise self.retry(countdown=0, exc=exc)
  81. self.retry_task_customexc = retry_task_customexc
  82. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  83. shared=False)
  84. def autoretry_task_no_kwargs(self, a, b):
  85. self.iterations += 1
  86. return a / b
  87. self.autoretry_task_no_kwargs = autoretry_task_no_kwargs
  88. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  89. retry_kwargs={'max_retries': 5}, shared=False)
  90. def autoretry_task(self, a, b):
  91. self.iterations += 1
  92. return a / b
  93. self.autoretry_task = autoretry_task
  94. @self.app.task(bind=True, autoretry_for=(HTTPError,),
  95. retry_backoff=True, shared=False)
  96. def autoretry_backoff_task(self, url):
  97. self.iterations += 1
  98. if "error" in url:
  99. fp = tempfile.TemporaryFile()
  100. raise HTTPError(url, '500', 'Error', '', fp)
  101. return url
  102. self.autoretry_backoff_task = autoretry_backoff_task
  103. @self.app.task(bind=True, autoretry_for=(HTTPError,),
  104. retry_backoff=True, retry_jitter=True, shared=False)
  105. def autoretry_backoff_jitter_task(self, url):
  106. self.iterations += 1
  107. if "error" in url:
  108. fp = tempfile.TemporaryFile()
  109. raise HTTPError(url, '500', 'Error', '', fp)
  110. return url
  111. self.autoretry_backoff_jitter_task = autoretry_backoff_jitter_task
  112. @self.app.task(bind=True)
  113. def task_check_request_context(self):
  114. assert self.request.hostname == socket.gethostname()
  115. self.task_check_request_context = task_check_request_context
  116. # memove all messages from memory-transport
  117. from kombu.transport.memory import Channel
  118. Channel.queues.clear()
  119. class MyCustomException(Exception):
  120. """Random custom exception."""
  121. class test_task_retries(TasksCase):
  122. def test_retry(self):
  123. self.retry_task.max_retries = 3
  124. self.retry_task.iterations = 0
  125. self.retry_task.apply([0xFF, 0xFFFF])
  126. assert self.retry_task.iterations == 4
  127. self.retry_task.max_retries = 3
  128. self.retry_task.iterations = 0
  129. self.retry_task.apply([0xFF, 0xFFFF], {'max_retries': 10})
  130. assert self.retry_task.iterations == 11
  131. def test_retry_no_args(self):
  132. self.retry_task_noargs.max_retries = 3
  133. self.retry_task_noargs.iterations = 0
  134. self.retry_task_noargs.apply(propagate=True).get()
  135. assert self.retry_task_noargs.iterations == 4
  136. def test_signature_from_request__passes_headers(self):
  137. self.retry_task.push_request()
  138. self.retry_task.request.headers = {'custom': 10.1}
  139. sig = self.retry_task.signature_from_request()
  140. assert sig.options['headers']['custom'] == 10.1
  141. def test_signature_from_request__delivery_info(self):
  142. self.retry_task.push_request()
  143. self.retry_task.request.delivery_info = {
  144. 'exchange': 'testex',
  145. 'routing_key': 'testrk',
  146. }
  147. sig = self.retry_task.signature_from_request()
  148. assert sig.options['exchange'] == 'testex'
  149. assert sig.options['routing_key'] == 'testrk'
  150. def test_retry_kwargs_can_be_empty(self):
  151. self.retry_task_mockapply.push_request()
  152. try:
  153. with pytest.raises(Retry):
  154. import sys
  155. try:
  156. sys.exc_clear()
  157. except AttributeError:
  158. pass
  159. self.retry_task_mockapply.retry(args=[4, 4], kwargs=None)
  160. finally:
  161. self.retry_task_mockapply.pop_request()
  162. def test_retry_not_eager(self):
  163. self.retry_task_mockapply.push_request()
  164. try:
  165. self.retry_task_mockapply.request.called_directly = False
  166. exc = Exception('baz')
  167. try:
  168. self.retry_task_mockapply.retry(
  169. args=[4, 4], kwargs={'task_retries': 0},
  170. exc=exc, throw=False,
  171. )
  172. assert self.retry_task_mockapply.applied
  173. finally:
  174. self.retry_task_mockapply.applied = 0
  175. try:
  176. with pytest.raises(Retry):
  177. self.retry_task_mockapply.retry(
  178. args=[4, 4], kwargs={'task_retries': 0},
  179. exc=exc, throw=True)
  180. assert self.retry_task_mockapply.applied
  181. finally:
  182. self.retry_task_mockapply.applied = 0
  183. finally:
  184. self.retry_task_mockapply.pop_request()
  185. def test_retry_with_kwargs(self):
  186. self.retry_task_customexc.max_retries = 3
  187. self.retry_task_customexc.iterations = 0
  188. self.retry_task_customexc.apply([0xFF, 0xFFFF], {'kwarg': 0xF})
  189. assert self.retry_task_customexc.iterations == 4
  190. def test_retry_with_custom_exception(self):
  191. self.retry_task_customexc.max_retries = 2
  192. self.retry_task_customexc.iterations = 0
  193. result = self.retry_task_customexc.apply(
  194. [0xFF, 0xFFFF], {'kwarg': 0xF},
  195. )
  196. with pytest.raises(MyCustomException):
  197. result.get()
  198. assert self.retry_task_customexc.iterations == 3
  199. def test_max_retries_exceeded(self):
  200. self.retry_task.max_retries = 2
  201. self.retry_task.iterations = 0
  202. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  203. with pytest.raises(self.retry_task.MaxRetriesExceededError):
  204. result.get()
  205. assert self.retry_task.iterations == 3
  206. self.retry_task.max_retries = 1
  207. self.retry_task.iterations = 0
  208. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  209. with pytest.raises(self.retry_task.MaxRetriesExceededError):
  210. result.get()
  211. assert self.retry_task.iterations == 2
  212. def test_autoretry_no_kwargs(self):
  213. self.autoretry_task_no_kwargs.max_retries = 3
  214. self.autoretry_task_no_kwargs.iterations = 0
  215. self.autoretry_task_no_kwargs.apply((1, 0))
  216. assert self.autoretry_task_no_kwargs.iterations == 4
  217. def test_autoretry(self):
  218. self.autoretry_task.max_retries = 3
  219. self.autoretry_task.iterations = 0
  220. self.autoretry_task.apply((1, 0))
  221. assert self.autoretry_task.iterations == 6
  222. @patch('random.randrange', side_effect=lambda i: i - 1)
  223. def test_autoretry_backoff(self, randrange):
  224. task = self.autoretry_backoff_task
  225. task.max_retries = 3
  226. task.iterations = 0
  227. with patch.object(task, 'retry', wraps=task.retry) as fake_retry:
  228. task.apply(("http://httpbin.org/error",))
  229. assert task.iterations == 4
  230. retry_call_countdowns = [
  231. call[1]['countdown'] for call in fake_retry.call_args_list
  232. ]
  233. assert retry_call_countdowns == [1, 2, 4, 8]
  234. @patch('random.randrange', side_effect=lambda i: i - 2)
  235. def test_autoretry_backoff_jitter(self, randrange):
  236. task = self.autoretry_backoff_jitter_task
  237. task.max_retries = 3
  238. task.iterations = 0
  239. with patch.object(task, 'retry', wraps=task.retry) as fake_retry:
  240. task.apply(("http://httpbin.org/error",))
  241. assert task.iterations == 4
  242. retry_call_countdowns = [
  243. call[1]['countdown'] for call in fake_retry.call_args_list
  244. ]
  245. assert retry_call_countdowns == [0, 1, 3, 7]
  246. def test_retry_wrong_eta_when_not_enable_utc(self):
  247. """Issue #3753"""
  248. self.app.conf.enable_utc = False
  249. self.app.conf.timezone = 'US/Eastern'
  250. self.autoretry_task.iterations = 0
  251. self.autoretry_task.default_retry_delay = 2
  252. self.autoretry_task.apply((1, 0))
  253. assert self.autoretry_task.iterations == 6
  254. class test_canvas_utils(TasksCase):
  255. def test_si(self):
  256. assert self.retry_task.si()
  257. assert self.retry_task.si().immutable
  258. def test_chunks(self):
  259. assert self.retry_task.chunks(range(100), 10)
  260. def test_map(self):
  261. assert self.retry_task.map(range(100))
  262. def test_starmap(self):
  263. assert self.retry_task.starmap(range(100))
  264. def test_on_success(self):
  265. self.retry_task.on_success(1, 1, (), {})
  266. class test_tasks(TasksCase):
  267. def now(self):
  268. return self.app.now()
  269. def test_typing(self):
  270. @self.app.task()
  271. def add(x, y, kw=1):
  272. pass
  273. with pytest.raises(TypeError):
  274. add.delay(1)
  275. with pytest.raises(TypeError):
  276. add.delay(1, kw=2)
  277. with pytest.raises(TypeError):
  278. add.delay(1, 2, foobar=3)
  279. add.delay(2, 2)
  280. def test_typing__disabled(self):
  281. @self.app.task(typing=False)
  282. def add(x, y, kw=1):
  283. pass
  284. add.delay(1)
  285. add.delay(1, kw=2)
  286. add.delay(1, 2, foobar=3)
  287. def test_typing__disabled_by_app(self):
  288. with self.Celery(set_as_current=False, strict_typing=False) as app:
  289. @app.task()
  290. def add(x, y, kw=1):
  291. pass
  292. assert not add.typing
  293. add.delay(1)
  294. add.delay(1, kw=2)
  295. add.delay(1, 2, foobar=3)
  296. @pytest.mark.usefixtures('depends_on_current_app')
  297. def test_unpickle_task(self):
  298. import pickle
  299. @self.app.task(shared=True)
  300. def xxx():
  301. pass
  302. assert pickle.loads(pickle.dumps(xxx)) is xxx.app.tasks[xxx.name]
  303. @patch('celery.app.task.current_app')
  304. @pytest.mark.usefixtures('depends_on_current_app')
  305. def test_bind__no_app(self, current_app):
  306. class XTask(Task):
  307. _app = None
  308. XTask._app = None
  309. XTask.__bound__ = False
  310. XTask.bind = Mock(name='bind')
  311. assert XTask.app is current_app
  312. XTask.bind.assert_called_with(current_app)
  313. def test_reprtask__no_fmt(self):
  314. assert _reprtask(self.mytask)
  315. def test_AsyncResult(self):
  316. task_id = uuid()
  317. result = self.retry_task.AsyncResult(task_id)
  318. assert result.backend == self.retry_task.backend
  319. assert result.id == task_id
  320. def assert_next_task_data_equal(self, consumer, presult, task_name,
  321. test_eta=False, test_expires=False,
  322. **kwargs):
  323. next_task = consumer.queues[0].get(accept=['pickle', 'json'])
  324. task_data = next_task.decode()
  325. assert task_data['id'] == presult.id
  326. assert task_data['task'] == task_name
  327. task_kwargs = task_data.get('kwargs', {})
  328. if test_eta:
  329. assert isinstance(task_data.get('eta'), string_t)
  330. to_datetime = parse_iso8601(task_data.get('eta'))
  331. assert isinstance(to_datetime, datetime)
  332. if test_expires:
  333. assert isinstance(task_data.get('expires'), string_t)
  334. to_datetime = parse_iso8601(task_data.get('expires'))
  335. assert isinstance(to_datetime, datetime)
  336. for arg_name, arg_value in items(kwargs):
  337. assert task_kwargs.get(arg_name) == arg_value
  338. def test_incomplete_task_cls(self):
  339. class IncompleteTask(Task):
  340. app = self.app
  341. name = 'c.unittest.t.itask'
  342. with pytest.raises(NotImplementedError):
  343. IncompleteTask().run()
  344. def test_task_kwargs_must_be_dictionary(self):
  345. with pytest.raises(TypeError):
  346. self.increment_counter.apply_async([], 'str')
  347. def test_task_args_must_be_list(self):
  348. with pytest.raises(TypeError):
  349. self.increment_counter.apply_async('s', {})
  350. def test_regular_task(self):
  351. assert isinstance(self.mytask, Task)
  352. assert self.mytask.run()
  353. assert callable(self.mytask)
  354. assert self.mytask(), 'Task class runs run() when called'
  355. with self.app.connection_or_acquire() as conn:
  356. consumer = self.app.amqp.TaskConsumer(conn)
  357. with pytest.raises(NotImplementedError):
  358. consumer.receive('foo', 'foo')
  359. consumer.purge()
  360. assert consumer.queues[0].get() is None
  361. self.app.amqp.TaskConsumer(conn, queues=[Queue('foo')])
  362. # Without arguments.
  363. presult = self.mytask.delay()
  364. self.assert_next_task_data_equal(
  365. consumer, presult, self.mytask.name)
  366. # With arguments.
  367. presult2 = self.mytask.apply_async(
  368. kwargs={'name': 'George Costanza'},
  369. )
  370. self.assert_next_task_data_equal(
  371. consumer, presult2, self.mytask.name, name='George Costanza',
  372. )
  373. # send_task
  374. sresult = self.app.send_task(self.mytask.name,
  375. kwargs={'name': 'Elaine M. Benes'})
  376. self.assert_next_task_data_equal(
  377. consumer, sresult, self.mytask.name, name='Elaine M. Benes',
  378. )
  379. # With ETA.
  380. presult2 = self.mytask.apply_async(
  381. kwargs={'name': 'George Costanza'},
  382. eta=self.now() + timedelta(days=1),
  383. expires=self.now() + timedelta(days=2),
  384. )
  385. self.assert_next_task_data_equal(
  386. consumer, presult2, self.mytask.name,
  387. name='George Costanza', test_eta=True, test_expires=True,
  388. )
  389. # With countdown.
  390. presult2 = self.mytask.apply_async(
  391. kwargs={'name': 'George Costanza'}, countdown=10, expires=12,
  392. )
  393. self.assert_next_task_data_equal(
  394. consumer, presult2, self.mytask.name,
  395. name='George Costanza', test_eta=True, test_expires=True,
  396. )
  397. # Discarding all tasks.
  398. consumer.purge()
  399. self.mytask.apply_async()
  400. assert consumer.purge() == 1
  401. assert consumer.queues[0].get() is None
  402. assert not presult.successful()
  403. self.mytask.backend.mark_as_done(presult.id, result=None)
  404. assert presult.successful()
  405. def test_send_event(self):
  406. mytask = self.mytask._get_current_object()
  407. mytask.app.events = Mock(name='events')
  408. mytask.app.events.attach_mock(ContextMock(), 'default_dispatcher')
  409. mytask.request.id = 'fb'
  410. mytask.send_event('task-foo', id=3122)
  411. mytask.app.events.default_dispatcher().send.assert_called_with(
  412. 'task-foo', uuid='fb', id=3122,
  413. retry=True, retry_policy=self.app.conf.task_publish_retry_policy)
  414. def test_replace(self):
  415. sig1 = Mock(name='sig1')
  416. sig1.options = {}
  417. with pytest.raises(Ignore):
  418. self.mytask.replace(sig1)
  419. @pytest.mark.usefixtures('depends_on_current_app')
  420. def test_replace_callback(self):
  421. c = group([self.mytask.s()], app=self.app)
  422. c.freeze = Mock(name='freeze')
  423. c.delay = Mock(name='delay')
  424. self.mytask.request.id = 'id'
  425. self.mytask.request.group = 'group'
  426. self.mytask.request.root_id = 'root_id'
  427. self.mytask.request.callbacks = 'callbacks'
  428. self.mytask.request.errbacks = 'errbacks'
  429. class JsonMagicMock(MagicMock):
  430. parent = None
  431. def __json__(self):
  432. return 'whatever'
  433. def reprcall(self, *args, **kwargs):
  434. return 'whatever2'
  435. mocked_signature = JsonMagicMock(name='s')
  436. accumulate_mock = JsonMagicMock(name='accumulate', s=mocked_signature)
  437. self.mytask.app.tasks['celery.accumulate'] = accumulate_mock
  438. try:
  439. self.mytask.replace(c)
  440. except Ignore:
  441. mocked_signature.return_value.set.assert_called_with(
  442. chord=None,
  443. link='callbacks',
  444. link_error='errbacks',
  445. )
  446. def test_replace_group(self):
  447. c = group([self.mytask.s()], app=self.app)
  448. c.freeze = Mock(name='freeze')
  449. c.delay = Mock(name='delay')
  450. self.mytask.request.id = 'id'
  451. self.mytask.request.group = 'group'
  452. self.mytask.request.root_id = 'root_id',
  453. with pytest.raises(Ignore):
  454. self.mytask.replace(c)
  455. def test_add_trail__no_trail(self):
  456. mytask = self.increment_counter._get_current_object()
  457. mytask.trail = False
  458. mytask.add_trail('foo')
  459. def test_repr_v2_compat(self):
  460. self.mytask.__v2_compat__ = True
  461. assert 'v2 compatible' in repr(self.mytask)
  462. def test_apply_with_self(self):
  463. @self.app.task(__self__=42, shared=False)
  464. def tawself(self):
  465. return self
  466. assert tawself.apply().get() == 42
  467. assert tawself() == 42
  468. def test_context_get(self):
  469. self.mytask.push_request()
  470. try:
  471. request = self.mytask.request
  472. request.foo = 32
  473. assert request.get('foo') == 32
  474. assert request.get('bar', 36) == 36
  475. request.clear()
  476. finally:
  477. self.mytask.pop_request()
  478. def test_annotate(self):
  479. with patch('celery.app.task.resolve_all_annotations') as anno:
  480. anno.return_value = [{'FOO': 'BAR'}]
  481. @self.app.task(shared=False)
  482. def task():
  483. pass
  484. task.annotate()
  485. assert task.FOO == 'BAR'
  486. def test_after_return(self):
  487. self.mytask.push_request()
  488. try:
  489. self.mytask.request.chord = self.mytask.s()
  490. self.mytask.after_return('SUCCESS', 1.0, 'foobar', (), {}, None)
  491. self.mytask.request.clear()
  492. finally:
  493. self.mytask.pop_request()
  494. def test_update_state(self):
  495. @self.app.task(shared=False)
  496. def yyy():
  497. pass
  498. yyy.push_request()
  499. try:
  500. tid = uuid()
  501. yyy.update_state(tid, 'FROBULATING', {'fooz': 'baaz'})
  502. assert yyy.AsyncResult(tid).status == 'FROBULATING'
  503. assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
  504. yyy.request.id = tid
  505. yyy.update_state(state='FROBUZATING', meta={'fooz': 'baaz'})
  506. assert yyy.AsyncResult(tid).status == 'FROBUZATING'
  507. assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
  508. finally:
  509. yyy.pop_request()
  510. def test_repr(self):
  511. @self.app.task(shared=False)
  512. def task_test_repr():
  513. pass
  514. assert 'task_test_repr' in repr(task_test_repr)
  515. def test_has___name__(self):
  516. @self.app.task(shared=False)
  517. def yyy2():
  518. pass
  519. assert yyy2.__name__
  520. class test_apply_task(TasksCase):
  521. def test_apply_throw(self):
  522. with pytest.raises(KeyError):
  523. self.raising.apply(throw=True)
  524. def test_apply_with_task_eager_propagates(self):
  525. self.app.conf.task_eager_propagates = True
  526. with pytest.raises(KeyError):
  527. self.raising.apply()
  528. def test_apply_request_context_is_ok(self):
  529. self.app.conf.task_eager_propagates = True
  530. self.task_check_request_context.apply()
  531. def test_apply(self):
  532. self.increment_counter.count = 0
  533. e = self.increment_counter.apply()
  534. assert isinstance(e, EagerResult)
  535. assert e.get() == 1
  536. e = self.increment_counter.apply(args=[1])
  537. assert e.get() == 2
  538. e = self.increment_counter.apply(kwargs={'increment_by': 4})
  539. assert e.get() == 6
  540. assert e.successful()
  541. assert e.ready()
  542. assert repr(e).startswith('<EagerResult:')
  543. f = self.raising.apply()
  544. assert f.ready()
  545. assert not f.successful()
  546. assert f.traceback
  547. with pytest.raises(KeyError):
  548. f.get()