test_tasks.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. from __future__ import absolute_import, unicode_literals
  2. import socket
  3. import tempfile
  4. from datetime import datetime, timedelta
  5. import pytest
  6. from case import ANY, ContextMock, MagicMock, Mock, patch
  7. from kombu import Queue
  8. from kombu.exceptions import EncodeError
  9. from celery import Task, group, uuid
  10. from celery.app.task import _reprtask
  11. from celery.exceptions import Ignore, ImproperlyConfigured, Retry
  12. from celery.five import items, range, string_t
  13. from celery.result import EagerResult
  14. from celery.task.base import Task as OldTask
  15. from celery.utils.time import parse_iso8601
  16. try:
  17. from urllib.error import HTTPError
  18. except ImportError: # pragma: no cover
  19. from urllib2 import HTTPError
  20. def return_True(*args, **kwargs):
  21. # Task run functions can't be closures/lambdas, as they're pickled.
  22. return True
  23. class MockApplyTask(Task):
  24. abstract = True
  25. applied = 0
  26. def run(self, x, y):
  27. return x * y
  28. def apply_async(self, *args, **kwargs):
  29. self.applied += 1
  30. class TasksCase:
  31. def setup(self):
  32. self.mytask = self.app.task(shared=False)(return_True)
  33. @self.app.task(bind=True, count=0, shared=False)
  34. def increment_counter(self, increment_by=1):
  35. self.count += increment_by or 1
  36. return self.count
  37. self.increment_counter = increment_counter
  38. @self.app.task(shared=False)
  39. def raising():
  40. raise KeyError('foo')
  41. self.raising = raising
  42. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  43. def retry_task(self, arg1, arg2, kwarg=1, max_retries=None, care=True):
  44. self.iterations += 1
  45. rmax = self.max_retries if max_retries is None else max_retries
  46. assert repr(self.request)
  47. retries = self.request.retries
  48. if care and retries >= rmax:
  49. return arg1
  50. else:
  51. raise self.retry(countdown=0, max_retries=rmax)
  52. self.retry_task = retry_task
  53. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  54. def retry_task_noargs(self, **kwargs):
  55. self.iterations += 1
  56. if self.request.retries >= 3:
  57. return 42
  58. else:
  59. raise self.retry(countdown=0)
  60. self.retry_task_noargs = retry_task_noargs
  61. @self.app.task(bind=True, max_retries=3, iterations=0,
  62. base=MockApplyTask, shared=False)
  63. def retry_task_mockapply(self, arg1, arg2, kwarg=1):
  64. self.iterations += 1
  65. retries = self.request.retries
  66. if retries >= 3:
  67. return arg1
  68. raise self.retry(countdown=0)
  69. self.retry_task_mockapply = retry_task_mockapply
  70. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  71. def retry_task_customexc(self, arg1, arg2, kwarg=1, **kwargs):
  72. self.iterations += 1
  73. retries = self.request.retries
  74. if retries >= 3:
  75. return arg1 + kwarg
  76. else:
  77. try:
  78. raise MyCustomException('Elaine Marie Benes')
  79. except MyCustomException as exc:
  80. kwargs.update(kwarg=kwarg)
  81. raise self.retry(countdown=0, exc=exc)
  82. self.retry_task_customexc = retry_task_customexc
  83. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  84. shared=False)
  85. def autoretry_task_no_kwargs(self, a, b):
  86. self.iterations += 1
  87. return a / b
  88. self.autoretry_task_no_kwargs = autoretry_task_no_kwargs
  89. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  90. retry_kwargs={'max_retries': 5}, shared=False)
  91. def autoretry_task(self, a, b):
  92. self.iterations += 1
  93. return a / b
  94. self.autoretry_task = autoretry_task
  95. @self.app.task(bind=True, autoretry_for=(HTTPError,),
  96. retry_backoff=True, shared=False)
  97. def autoretry_backoff_task(self, url):
  98. self.iterations += 1
  99. if "error" in url:
  100. fp = tempfile.TemporaryFile()
  101. raise HTTPError(url, '500', 'Error', '', fp)
  102. return url
  103. self.autoretry_backoff_task = autoretry_backoff_task
  104. @self.app.task(bind=True, autoretry_for=(HTTPError,),
  105. retry_backoff=True, retry_jitter=True, shared=False)
  106. def autoretry_backoff_jitter_task(self, url):
  107. self.iterations += 1
  108. if "error" in url:
  109. fp = tempfile.TemporaryFile()
  110. raise HTTPError(url, '500', 'Error', '', fp)
  111. return url
  112. self.autoretry_backoff_jitter_task = autoretry_backoff_jitter_task
  113. @self.app.task(bind=True)
  114. def task_check_request_context(self):
  115. assert self.request.hostname == socket.gethostname()
  116. self.task_check_request_context = task_check_request_context
  117. @self.app.task(ignore_result=True)
  118. def task_with_ignored_result():
  119. pass
  120. self.task_with_ignored_result = task_with_ignored_result
  121. # Remove all messages from memory-transport
  122. from kombu.transport.memory import Channel
  123. Channel.queues.clear()
  124. class MyCustomException(Exception):
  125. """Random custom exception."""
  126. class test_task_retries(TasksCase):
  127. def test_retry(self):
  128. self.retry_task.max_retries = 3
  129. self.retry_task.iterations = 0
  130. self.retry_task.apply([0xFF, 0xFFFF])
  131. assert self.retry_task.iterations == 4
  132. self.retry_task.max_retries = 3
  133. self.retry_task.iterations = 0
  134. self.retry_task.apply([0xFF, 0xFFFF], {'max_retries': 10})
  135. assert self.retry_task.iterations == 11
  136. def test_retry_no_args(self):
  137. self.retry_task_noargs.max_retries = 3
  138. self.retry_task_noargs.iterations = 0
  139. self.retry_task_noargs.apply(propagate=True).get()
  140. assert self.retry_task_noargs.iterations == 4
  141. def test_signature_from_request__passes_headers(self):
  142. self.retry_task.push_request()
  143. self.retry_task.request.headers = {'custom': 10.1}
  144. sig = self.retry_task.signature_from_request()
  145. assert sig.options['headers']['custom'] == 10.1
  146. def test_signature_from_request__delivery_info(self):
  147. self.retry_task.push_request()
  148. self.retry_task.request.delivery_info = {
  149. 'exchange': 'testex',
  150. 'routing_key': 'testrk',
  151. }
  152. sig = self.retry_task.signature_from_request()
  153. assert sig.options['exchange'] == 'testex'
  154. assert sig.options['routing_key'] == 'testrk'
  155. def test_retry_kwargs_can_be_empty(self):
  156. self.retry_task_mockapply.push_request()
  157. try:
  158. with pytest.raises(Retry):
  159. import sys
  160. try:
  161. sys.exc_clear()
  162. except AttributeError:
  163. pass
  164. self.retry_task_mockapply.retry(args=[4, 4], kwargs=None)
  165. finally:
  166. self.retry_task_mockapply.pop_request()
  167. def test_retry_not_eager(self):
  168. self.retry_task_mockapply.push_request()
  169. try:
  170. self.retry_task_mockapply.request.called_directly = False
  171. exc = Exception('baz')
  172. try:
  173. self.retry_task_mockapply.retry(
  174. args=[4, 4], kwargs={'task_retries': 0},
  175. exc=exc, throw=False,
  176. )
  177. assert self.retry_task_mockapply.applied
  178. finally:
  179. self.retry_task_mockapply.applied = 0
  180. try:
  181. with pytest.raises(Retry):
  182. self.retry_task_mockapply.retry(
  183. args=[4, 4], kwargs={'task_retries': 0},
  184. exc=exc, throw=True)
  185. assert self.retry_task_mockapply.applied
  186. finally:
  187. self.retry_task_mockapply.applied = 0
  188. finally:
  189. self.retry_task_mockapply.pop_request()
  190. def test_retry_with_kwargs(self):
  191. self.retry_task_customexc.max_retries = 3
  192. self.retry_task_customexc.iterations = 0
  193. self.retry_task_customexc.apply([0xFF, 0xFFFF], {'kwarg': 0xF})
  194. assert self.retry_task_customexc.iterations == 4
  195. def test_retry_with_custom_exception(self):
  196. self.retry_task_customexc.max_retries = 2
  197. self.retry_task_customexc.iterations = 0
  198. result = self.retry_task_customexc.apply(
  199. [0xFF, 0xFFFF], {'kwarg': 0xF},
  200. )
  201. with pytest.raises(MyCustomException):
  202. result.get()
  203. assert self.retry_task_customexc.iterations == 3
  204. def test_max_retries_exceeded(self):
  205. self.retry_task.max_retries = 2
  206. self.retry_task.iterations = 0
  207. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  208. with pytest.raises(self.retry_task.MaxRetriesExceededError):
  209. result.get()
  210. assert self.retry_task.iterations == 3
  211. self.retry_task.max_retries = 1
  212. self.retry_task.iterations = 0
  213. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  214. with pytest.raises(self.retry_task.MaxRetriesExceededError):
  215. result.get()
  216. assert self.retry_task.iterations == 2
  217. def test_autoretry_no_kwargs(self):
  218. self.autoretry_task_no_kwargs.max_retries = 3
  219. self.autoretry_task_no_kwargs.iterations = 0
  220. self.autoretry_task_no_kwargs.apply((1, 0))
  221. assert self.autoretry_task_no_kwargs.iterations == 4
  222. def test_autoretry(self):
  223. self.autoretry_task.max_retries = 3
  224. self.autoretry_task.iterations = 0
  225. self.autoretry_task.apply((1, 0))
  226. assert self.autoretry_task.iterations == 6
  227. @patch('random.randrange', side_effect=lambda i: i - 1)
  228. def test_autoretry_backoff(self, randrange):
  229. task = self.autoretry_backoff_task
  230. task.max_retries = 3
  231. task.iterations = 0
  232. with patch.object(task, 'retry', wraps=task.retry) as fake_retry:
  233. task.apply(("http://httpbin.org/error",))
  234. assert task.iterations == 4
  235. retry_call_countdowns = [
  236. call[1]['countdown'] for call in fake_retry.call_args_list
  237. ]
  238. assert retry_call_countdowns == [1, 2, 4, 8]
  239. @patch('random.randrange', side_effect=lambda i: i - 2)
  240. def test_autoretry_backoff_jitter(self, randrange):
  241. task = self.autoretry_backoff_jitter_task
  242. task.max_retries = 3
  243. task.iterations = 0
  244. with patch.object(task, 'retry', wraps=task.retry) as fake_retry:
  245. task.apply(("http://httpbin.org/error",))
  246. assert task.iterations == 4
  247. retry_call_countdowns = [
  248. call[1]['countdown'] for call in fake_retry.call_args_list
  249. ]
  250. assert retry_call_countdowns == [0, 1, 3, 7]
  251. def test_retry_wrong_eta_when_not_enable_utc(self):
  252. """Issue #3753"""
  253. self.app.conf.enable_utc = False
  254. self.app.conf.timezone = 'US/Eastern'
  255. self.autoretry_task.iterations = 0
  256. self.autoretry_task.default_retry_delay = 2
  257. self.autoretry_task.apply((1, 0))
  258. assert self.autoretry_task.iterations == 6
  259. class test_canvas_utils(TasksCase):
  260. def test_si(self):
  261. assert self.retry_task.si()
  262. assert self.retry_task.si().immutable
  263. def test_chunks(self):
  264. assert self.retry_task.chunks(range(100), 10)
  265. def test_map(self):
  266. assert self.retry_task.map(range(100))
  267. def test_starmap(self):
  268. assert self.retry_task.starmap(range(100))
  269. def test_on_success(self):
  270. self.retry_task.on_success(1, 1, (), {})
  271. class test_tasks(TasksCase):
  272. def now(self):
  273. return self.app.now()
  274. def test_typing(self):
  275. @self.app.task()
  276. def add(x, y, kw=1):
  277. pass
  278. with pytest.raises(TypeError):
  279. add.delay(1)
  280. with pytest.raises(TypeError):
  281. add.delay(1, kw=2)
  282. with pytest.raises(TypeError):
  283. add.delay(1, 2, foobar=3)
  284. add.delay(2, 2)
  285. def test_shadow_name(self):
  286. def shadow_name(task, args, kwargs, options):
  287. return 'fooxyz'
  288. @self.app.task(shadow_name=shadow_name)
  289. def shadowed():
  290. pass
  291. old_send_task = self.app.send_task
  292. self.app.send_task = Mock()
  293. shadowed.delay()
  294. self.app.send_task.assert_called_once_with(ANY, ANY, ANY,
  295. compression=ANY,
  296. delivery_mode=ANY,
  297. exchange=ANY,
  298. expires=ANY,
  299. immediate=ANY,
  300. link=ANY,
  301. link_error=ANY,
  302. mandatory=ANY,
  303. priority=ANY,
  304. producer=ANY,
  305. queue=ANY,
  306. result_cls=ANY,
  307. routing_key=ANY,
  308. serializer=ANY,
  309. soft_time_limit=ANY,
  310. task_id=ANY,
  311. task_type=ANY,
  312. time_limit=ANY,
  313. shadow='fooxyz',
  314. ignore_result=False)
  315. self.app.send_task = old_send_task
  316. def test_shadow_name_old_task_class(self):
  317. def shadow_name(task, args, kwargs, options):
  318. return 'fooxyz'
  319. @self.app.task(base=OldTask, shadow_name=shadow_name)
  320. def shadowed():
  321. pass
  322. old_send_task = self.app.send_task
  323. self.app.send_task = Mock()
  324. shadowed.delay()
  325. self.app.send_task.assert_called_once_with(ANY, ANY, ANY,
  326. compression=ANY,
  327. delivery_mode=ANY,
  328. exchange=ANY,
  329. expires=ANY,
  330. immediate=ANY,
  331. link=ANY,
  332. link_error=ANY,
  333. mandatory=ANY,
  334. priority=ANY,
  335. producer=ANY,
  336. queue=ANY,
  337. result_cls=ANY,
  338. routing_key=ANY,
  339. serializer=ANY,
  340. soft_time_limit=ANY,
  341. task_id=ANY,
  342. task_type=ANY,
  343. time_limit=ANY,
  344. shadow='fooxyz',
  345. ignore_result=False)
  346. self.app.send_task = old_send_task
  347. def test_typing__disabled(self):
  348. @self.app.task(typing=False)
  349. def add(x, y, kw=1):
  350. pass
  351. add.delay(1)
  352. add.delay(1, kw=2)
  353. add.delay(1, 2, foobar=3)
  354. def test_typing__disabled_by_app(self):
  355. with self.Celery(set_as_current=False, strict_typing=False) as app:
  356. @app.task()
  357. def add(x, y, kw=1):
  358. pass
  359. assert not add.typing
  360. add.delay(1)
  361. add.delay(1, kw=2)
  362. add.delay(1, 2, foobar=3)
  363. @pytest.mark.usefixtures('depends_on_current_app')
  364. def test_unpickle_task(self):
  365. import pickle
  366. @self.app.task(shared=True)
  367. def xxx():
  368. pass
  369. assert pickle.loads(pickle.dumps(xxx)) is xxx.app.tasks[xxx.name]
  370. @patch('celery.app.task.current_app')
  371. @pytest.mark.usefixtures('depends_on_current_app')
  372. def test_bind__no_app(self, current_app):
  373. class XTask(Task):
  374. _app = None
  375. XTask._app = None
  376. XTask.__bound__ = False
  377. XTask.bind = Mock(name='bind')
  378. assert XTask.app is current_app
  379. XTask.bind.assert_called_with(current_app)
  380. def test_reprtask__no_fmt(self):
  381. assert _reprtask(self.mytask)
  382. def test_AsyncResult(self):
  383. task_id = uuid()
  384. result = self.retry_task.AsyncResult(task_id)
  385. assert result.backend == self.retry_task.backend
  386. assert result.id == task_id
  387. def assert_next_task_data_equal(self, consumer, presult, task_name,
  388. test_eta=False, test_expires=False,
  389. properties=None, headers=None, **kwargs):
  390. next_task = consumer.queues[0].get(accept=['pickle', 'json'])
  391. task_properties = next_task.properties
  392. task_headers = next_task.headers
  393. task_body = next_task.decode()
  394. task_args, task_kwargs, embed = task_body
  395. assert task_headers['id'] == presult.id
  396. assert task_headers['task'] == task_name
  397. if test_eta:
  398. assert isinstance(task_headers.get('eta'), string_t)
  399. to_datetime = parse_iso8601(task_headers.get('eta'))
  400. assert isinstance(to_datetime, datetime)
  401. if test_expires:
  402. assert isinstance(task_headers.get('expires'), string_t)
  403. to_datetime = parse_iso8601(task_headers.get('expires'))
  404. assert isinstance(to_datetime, datetime)
  405. properties = properties or {}
  406. for arg_name, arg_value in items(properties):
  407. assert task_properties.get(arg_name) == arg_value
  408. headers = headers or {}
  409. for arg_name, arg_value in items(headers):
  410. assert task_headers.get(arg_name) == arg_value
  411. for arg_name, arg_value in items(kwargs):
  412. assert task_kwargs.get(arg_name) == arg_value
  413. def test_incomplete_task_cls(self):
  414. class IncompleteTask(Task):
  415. app = self.app
  416. name = 'c.unittest.t.itask'
  417. with pytest.raises(NotImplementedError):
  418. IncompleteTask().run()
  419. def test_task_kwargs_must_be_dictionary(self):
  420. with pytest.raises(TypeError):
  421. self.increment_counter.apply_async([], 'str')
  422. def test_task_args_must_be_list(self):
  423. with pytest.raises(TypeError):
  424. self.increment_counter.apply_async('s', {})
  425. def test_regular_task(self):
  426. assert isinstance(self.mytask, Task)
  427. assert self.mytask.run()
  428. assert callable(self.mytask)
  429. assert self.mytask(), 'Task class runs run() when called'
  430. with self.app.connection_or_acquire() as conn:
  431. consumer = self.app.amqp.TaskConsumer(conn)
  432. with pytest.raises(NotImplementedError):
  433. consumer.receive('foo', 'foo')
  434. consumer.purge()
  435. assert consumer.queues[0].get() is None
  436. self.app.amqp.TaskConsumer(conn, queues=[Queue('foo')])
  437. # Without arguments.
  438. presult = self.mytask.delay()
  439. self.assert_next_task_data_equal(
  440. consumer, presult, self.mytask.name)
  441. # With arguments.
  442. presult2 = self.mytask.apply_async(
  443. kwargs={'name': 'George Costanza'},
  444. )
  445. self.assert_next_task_data_equal(
  446. consumer, presult2, self.mytask.name, name='George Costanza',
  447. )
  448. # send_task
  449. sresult = self.app.send_task(self.mytask.name,
  450. kwargs={'name': 'Elaine M. Benes'})
  451. self.assert_next_task_data_equal(
  452. consumer, sresult, self.mytask.name, name='Elaine M. Benes',
  453. )
  454. # With ETA.
  455. presult2 = self.mytask.apply_async(
  456. kwargs={'name': 'George Costanza'},
  457. eta=self.now() + timedelta(days=1),
  458. expires=self.now() + timedelta(days=2),
  459. )
  460. self.assert_next_task_data_equal(
  461. consumer, presult2, self.mytask.name,
  462. name='George Costanza', test_eta=True, test_expires=True,
  463. )
  464. # With countdown.
  465. presult2 = self.mytask.apply_async(
  466. kwargs={'name': 'George Costanza'}, countdown=10, expires=12,
  467. )
  468. self.assert_next_task_data_equal(
  469. consumer, presult2, self.mytask.name,
  470. name='George Costanza', test_eta=True, test_expires=True,
  471. )
  472. # Default argsrepr/kwargsrepr behavior
  473. presult2 = self.mytask.apply_async(
  474. args=('spam',), kwargs={'name': 'Jerry Seinfeld'}
  475. )
  476. self.assert_next_task_data_equal(
  477. consumer, presult2, self.mytask.name,
  478. headers={'argsrepr': "('spam',)",
  479. 'kwargsrepr': "{'name': 'Jerry Seinfeld'}"},
  480. )
  481. # With argsrepr/kwargsrepr
  482. presult2 = self.mytask.apply_async(
  483. args=('secret',), argsrepr="'***'",
  484. kwargs={'password': 'foo'}, kwargsrepr="{'password': '***'}",
  485. )
  486. self.assert_next_task_data_equal(
  487. consumer, presult2, self.mytask.name,
  488. headers={'argsrepr': "'***'",
  489. 'kwargsrepr': "{'password': '***'}"},
  490. )
  491. # Discarding all tasks.
  492. consumer.purge()
  493. self.mytask.apply_async()
  494. assert consumer.purge() == 1
  495. assert consumer.queues[0].get() is None
  496. assert not presult.successful()
  497. self.mytask.backend.mark_as_done(presult.id, result=None)
  498. assert presult.successful()
  499. def test_send_event(self):
  500. mytask = self.mytask._get_current_object()
  501. mytask.app.events = Mock(name='events')
  502. mytask.app.events.attach_mock(ContextMock(), 'default_dispatcher')
  503. mytask.request.id = 'fb'
  504. mytask.send_event('task-foo', id=3122)
  505. mytask.app.events.default_dispatcher().send.assert_called_with(
  506. 'task-foo', uuid='fb', id=3122,
  507. retry=True, retry_policy=self.app.conf.task_publish_retry_policy)
  508. def test_replace(self):
  509. sig1 = Mock(name='sig1')
  510. sig1.options = {}
  511. with pytest.raises(Ignore):
  512. self.mytask.replace(sig1)
  513. def test_replace_with_chord(self):
  514. sig1 = Mock(name='sig1')
  515. sig1.options = {'chord': None}
  516. with pytest.raises(ImproperlyConfigured):
  517. self.mytask.replace(sig1)
  518. @pytest.mark.usefixtures('depends_on_current_app')
  519. def test_replace_callback(self):
  520. c = group([self.mytask.s()], app=self.app)
  521. c.freeze = Mock(name='freeze')
  522. c.delay = Mock(name='delay')
  523. self.mytask.request.id = 'id'
  524. self.mytask.request.group = 'group'
  525. self.mytask.request.root_id = 'root_id'
  526. self.mytask.request.callbacks = 'callbacks'
  527. self.mytask.request.errbacks = 'errbacks'
  528. class JsonMagicMock(MagicMock):
  529. parent = None
  530. def __json__(self):
  531. return 'whatever'
  532. def reprcall(self, *args, **kwargs):
  533. return 'whatever2'
  534. mocked_signature = JsonMagicMock(name='s')
  535. accumulate_mock = JsonMagicMock(name='accumulate', s=mocked_signature)
  536. self.mytask.app.tasks['celery.accumulate'] = accumulate_mock
  537. try:
  538. self.mytask.replace(c)
  539. except Ignore:
  540. mocked_signature.return_value.set.assert_called_with(
  541. link='callbacks',
  542. link_error='errbacks',
  543. )
  544. def test_replace_group(self):
  545. c = group([self.mytask.s()], app=self.app)
  546. c.freeze = Mock(name='freeze')
  547. c.delay = Mock(name='delay')
  548. self.mytask.request.id = 'id'
  549. self.mytask.request.group = 'group'
  550. self.mytask.request.root_id = 'root_id',
  551. with pytest.raises(Ignore):
  552. self.mytask.replace(c)
  553. def test_add_trail__no_trail(self):
  554. mytask = self.increment_counter._get_current_object()
  555. mytask.trail = False
  556. mytask.add_trail('foo')
  557. def test_repr_v2_compat(self):
  558. self.mytask.__v2_compat__ = True
  559. assert 'v2 compatible' in repr(self.mytask)
  560. def test_context_get(self):
  561. self.mytask.push_request()
  562. try:
  563. request = self.mytask.request
  564. request.foo = 32
  565. assert request.get('foo') == 32
  566. assert request.get('bar', 36) == 36
  567. request.clear()
  568. finally:
  569. self.mytask.pop_request()
  570. def test_annotate(self):
  571. with patch('celery.app.task.resolve_all_annotations') as anno:
  572. anno.return_value = [{'FOO': 'BAR'}]
  573. @self.app.task(shared=False)
  574. def task():
  575. pass
  576. task.annotate()
  577. assert task.FOO == 'BAR'
  578. def test_after_return(self):
  579. self.mytask.push_request()
  580. try:
  581. self.mytask.request.chord = self.mytask.s()
  582. self.mytask.after_return('SUCCESS', 1.0, 'foobar', (), {}, None)
  583. self.mytask.request.clear()
  584. finally:
  585. self.mytask.pop_request()
  586. def test_update_state(self):
  587. @self.app.task(shared=False)
  588. def yyy():
  589. pass
  590. yyy.push_request()
  591. try:
  592. tid = uuid()
  593. # update_state should accept arbitrary kwargs, which are passed to
  594. # the backend store_result method
  595. yyy.update_state(tid, 'FROBULATING', {'fooz': 'baaz'},
  596. arbitrary_kwarg=None)
  597. assert yyy.AsyncResult(tid).status == 'FROBULATING'
  598. assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
  599. yyy.request.id = tid
  600. yyy.update_state(state='FROBUZATING', meta={'fooz': 'baaz'})
  601. assert yyy.AsyncResult(tid).status == 'FROBUZATING'
  602. assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
  603. finally:
  604. yyy.pop_request()
  605. def test_repr(self):
  606. @self.app.task(shared=False)
  607. def task_test_repr():
  608. pass
  609. assert 'task_test_repr' in repr(task_test_repr)
  610. def test_has___name__(self):
  611. @self.app.task(shared=False)
  612. def yyy2():
  613. pass
  614. assert yyy2.__name__
  615. class test_apply_task(TasksCase):
  616. def test_apply_throw(self):
  617. with pytest.raises(KeyError):
  618. self.raising.apply(throw=True)
  619. def test_apply_with_task_eager_propagates(self):
  620. self.app.conf.task_eager_propagates = True
  621. with pytest.raises(KeyError):
  622. self.raising.apply()
  623. def test_apply_request_context_is_ok(self):
  624. self.app.conf.task_eager_propagates = True
  625. self.task_check_request_context.apply()
  626. def test_apply(self):
  627. self.increment_counter.count = 0
  628. e = self.increment_counter.apply()
  629. assert isinstance(e, EagerResult)
  630. assert e.get() == 1
  631. e = self.increment_counter.apply(args=[1])
  632. assert e.get() == 2
  633. e = self.increment_counter.apply(kwargs={'increment_by': 4})
  634. assert e.get() == 6
  635. assert e.successful()
  636. assert e.ready()
  637. assert repr(e).startswith('<EagerResult:')
  638. f = self.raising.apply()
  639. assert f.ready()
  640. assert not f.successful()
  641. assert f.traceback
  642. with pytest.raises(KeyError):
  643. f.get()
  644. class test_apply_async(TasksCase):
  645. def common_send_task_arguments(self):
  646. return (ANY, ANY, ANY), dict(
  647. compression=ANY,
  648. delivery_mode=ANY,
  649. exchange=ANY,
  650. expires=ANY,
  651. immediate=ANY,
  652. link=ANY,
  653. link_error=ANY,
  654. mandatory=ANY,
  655. priority=ANY,
  656. producer=ANY,
  657. queue=ANY,
  658. result_cls=ANY,
  659. routing_key=ANY,
  660. serializer=ANY,
  661. soft_time_limit=ANY,
  662. task_id=ANY,
  663. task_type=ANY,
  664. time_limit=ANY,
  665. shadow=None,
  666. ignore_result=False
  667. )
  668. def test_eager_serialization_failure(self):
  669. @self.app.task
  670. def task(*args, **kwargs):
  671. pass
  672. with pytest.raises(EncodeError):
  673. task.apply_async((1, 2, 3, 4, {1}))
  674. def test_task_with_ignored_result(self):
  675. with patch.object(self.app, 'send_task') as send_task:
  676. self.task_with_ignored_result.apply_async()
  677. expected_args, expected_kwargs = self.common_send_task_arguments()
  678. expected_kwargs['ignore_result'] = True
  679. send_task.assert_called_once_with(
  680. *expected_args,
  681. **expected_kwargs
  682. )
  683. def test_task_with_result(self):
  684. with patch.object(self.app, 'send_task') as send_task:
  685. self.mytask.apply_async()
  686. expected_args, expected_kwargs = self.common_send_task_arguments()
  687. send_task.assert_called_once_with(
  688. *expected_args,
  689. **expected_kwargs
  690. )
  691. def test_task_with_result_ignoring_on_call(self):
  692. with patch.object(self.app, 'send_task') as send_task:
  693. self.mytask.apply_async(ignore_result=True)
  694. expected_args, expected_kwargs = self.common_send_task_arguments()
  695. expected_kwargs['ignore_result'] = True
  696. send_task.assert_called_once_with(
  697. *expected_args,
  698. **expected_kwargs
  699. )