test_tasks.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. import socket
  4. import tempfile
  5. from datetime import datetime, timedelta
  6. try:
  7. from urllib.error import HTTPError
  8. except ImportError: # pragma: no cover
  9. from urllib2 import HTTPError
  10. from case import ContextMock, MagicMock, Mock, patch
  11. from kombu import Queue
  12. from celery import Task, group, uuid
  13. from celery.app.task import _reprtask
  14. from celery.exceptions import Ignore, Retry
  15. from celery.five import items, range, string_t
  16. from celery.result import EagerResult
  17. from celery.utils.time import parse_iso8601
  18. def return_True(*args, **kwargs):
  19. # Task run functions can't be closures/lambdas, as they're pickled.
  20. return True
  21. class MockApplyTask(Task):
  22. abstract = True
  23. applied = 0
  24. def run(self, x, y):
  25. return x * y
  26. def apply_async(self, *args, **kwargs):
  27. self.applied += 1
  28. class TasksCase:
  29. def setup(self):
  30. self.mytask = self.app.task(shared=False)(return_True)
  31. @self.app.task(bind=True, count=0, shared=False)
  32. def increment_counter(self, increment_by=1):
  33. self.count += increment_by or 1
  34. return self.count
  35. self.increment_counter = increment_counter
  36. @self.app.task(shared=False)
  37. def raising():
  38. raise KeyError('foo')
  39. self.raising = raising
  40. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  41. def retry_task(self, arg1, arg2, kwarg=1, max_retries=None, care=True):
  42. self.iterations += 1
  43. rmax = self.max_retries if max_retries is None else max_retries
  44. assert repr(self.request)
  45. retries = self.request.retries
  46. if care and retries >= rmax:
  47. return arg1
  48. else:
  49. raise self.retry(countdown=0, max_retries=rmax)
  50. self.retry_task = retry_task
  51. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  52. def retry_task_noargs(self, **kwargs):
  53. self.iterations += 1
  54. if self.request.retries >= 3:
  55. return 42
  56. else:
  57. raise self.retry(countdown=0)
  58. self.retry_task_noargs = retry_task_noargs
  59. @self.app.task(bind=True, max_retries=3, iterations=0,
  60. base=MockApplyTask, shared=False)
  61. def retry_task_mockapply(self, arg1, arg2, kwarg=1):
  62. self.iterations += 1
  63. retries = self.request.retries
  64. if retries >= 3:
  65. return arg1
  66. raise self.retry(countdown=0)
  67. self.retry_task_mockapply = retry_task_mockapply
  68. @self.app.task(bind=True, max_retries=3, iterations=0, shared=False)
  69. def retry_task_customexc(self, arg1, arg2, kwarg=1, **kwargs):
  70. self.iterations += 1
  71. retries = self.request.retries
  72. if retries >= 3:
  73. return arg1 + kwarg
  74. else:
  75. try:
  76. raise MyCustomException('Elaine Marie Benes')
  77. except MyCustomException as exc:
  78. kwargs.update(kwarg=kwarg)
  79. raise self.retry(countdown=0, exc=exc)
  80. self.retry_task_customexc = retry_task_customexc
  81. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  82. shared=False)
  83. def autoretry_task_no_kwargs(self, a, b):
  84. self.iterations += 1
  85. return a / b
  86. self.autoretry_task_no_kwargs = autoretry_task_no_kwargs
  87. @self.app.task(bind=True, autoretry_for=(ZeroDivisionError,),
  88. retry_kwargs={'max_retries': 5}, shared=False)
  89. def autoretry_task(self, a, b):
  90. self.iterations += 1
  91. return a / b
  92. self.autoretry_task = autoretry_task
  93. @self.app.task(bind=True, autoretry_for=(HTTPError,),
  94. retry_backoff=True, shared=False)
  95. def autoretry_backoff_task(self, url):
  96. self.iterations += 1
  97. if "error" in url:
  98. fp = tempfile.TemporaryFile()
  99. raise HTTPError(url, '500', 'Error', '', fp)
  100. return url
  101. self.autoretry_backoff_task = autoretry_backoff_task
  102. @self.app.task(bind=True, autoretry_for=(HTTPError,),
  103. retry_backoff=True, retry_jitter=True, shared=False)
  104. def autoretry_backoff_jitter_task(self, url):
  105. self.iterations += 1
  106. if "error" in url:
  107. fp = tempfile.TemporaryFile()
  108. raise HTTPError(url, '500', 'Error', '', fp)
  109. return url
  110. self.autoretry_backoff_jitter_task = autoretry_backoff_jitter_task
  111. @self.app.task(bind=True)
  112. def task_check_request_context(self):
  113. assert self.request.hostname == socket.gethostname()
  114. self.task_check_request_context = task_check_request_context
  115. # memove all messages from memory-transport
  116. from kombu.transport.memory import Channel
  117. Channel.queues.clear()
  118. class MyCustomException(Exception):
  119. """Random custom exception."""
  120. class test_task_retries(TasksCase):
  121. def test_retry(self):
  122. self.retry_task.max_retries = 3
  123. self.retry_task.iterations = 0
  124. self.retry_task.apply([0xFF, 0xFFFF])
  125. assert self.retry_task.iterations == 4
  126. self.retry_task.max_retries = 3
  127. self.retry_task.iterations = 0
  128. self.retry_task.apply([0xFF, 0xFFFF], {'max_retries': 10})
  129. assert self.retry_task.iterations == 11
  130. def test_retry_no_args(self):
  131. self.retry_task_noargs.max_retries = 3
  132. self.retry_task_noargs.iterations = 0
  133. self.retry_task_noargs.apply(propagate=True).get()
  134. assert self.retry_task_noargs.iterations == 4
  135. def test_signature_from_request__passes_headers(self):
  136. self.retry_task.push_request()
  137. self.retry_task.request.headers = {'custom': 10.1}
  138. sig = self.retry_task.signature_from_request()
  139. assert sig.options['headers']['custom'] == 10.1
  140. def test_signature_from_request__delivery_info(self):
  141. self.retry_task.push_request()
  142. self.retry_task.request.delivery_info = {
  143. 'exchange': 'testex',
  144. 'routing_key': 'testrk',
  145. }
  146. sig = self.retry_task.signature_from_request()
  147. assert sig.options['exchange'] == 'testex'
  148. assert sig.options['routing_key'] == 'testrk'
  149. def test_retry_kwargs_can_be_empty(self):
  150. self.retry_task_mockapply.push_request()
  151. try:
  152. with pytest.raises(Retry):
  153. import sys
  154. try:
  155. sys.exc_clear()
  156. except AttributeError:
  157. pass
  158. self.retry_task_mockapply.retry(args=[4, 4], kwargs=None)
  159. finally:
  160. self.retry_task_mockapply.pop_request()
  161. def test_retry_not_eager(self):
  162. self.retry_task_mockapply.push_request()
  163. try:
  164. self.retry_task_mockapply.request.called_directly = False
  165. exc = Exception('baz')
  166. try:
  167. self.retry_task_mockapply.retry(
  168. args=[4, 4], kwargs={'task_retries': 0},
  169. exc=exc, throw=False,
  170. )
  171. assert self.retry_task_mockapply.applied
  172. finally:
  173. self.retry_task_mockapply.applied = 0
  174. try:
  175. with pytest.raises(Retry):
  176. self.retry_task_mockapply.retry(
  177. args=[4, 4], kwargs={'task_retries': 0},
  178. exc=exc, throw=True)
  179. assert self.retry_task_mockapply.applied
  180. finally:
  181. self.retry_task_mockapply.applied = 0
  182. finally:
  183. self.retry_task_mockapply.pop_request()
  184. def test_retry_with_kwargs(self):
  185. self.retry_task_customexc.max_retries = 3
  186. self.retry_task_customexc.iterations = 0
  187. self.retry_task_customexc.apply([0xFF, 0xFFFF], {'kwarg': 0xF})
  188. assert self.retry_task_customexc.iterations == 4
  189. def test_retry_with_custom_exception(self):
  190. self.retry_task_customexc.max_retries = 2
  191. self.retry_task_customexc.iterations = 0
  192. result = self.retry_task_customexc.apply(
  193. [0xFF, 0xFFFF], {'kwarg': 0xF},
  194. )
  195. with pytest.raises(MyCustomException):
  196. result.get()
  197. assert self.retry_task_customexc.iterations == 3
  198. def test_max_retries_exceeded(self):
  199. self.retry_task.max_retries = 2
  200. self.retry_task.iterations = 0
  201. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  202. with pytest.raises(self.retry_task.MaxRetriesExceededError):
  203. result.get()
  204. assert self.retry_task.iterations == 3
  205. self.retry_task.max_retries = 1
  206. self.retry_task.iterations = 0
  207. result = self.retry_task.apply([0xFF, 0xFFFF], {'care': False})
  208. with pytest.raises(self.retry_task.MaxRetriesExceededError):
  209. result.get()
  210. assert self.retry_task.iterations == 2
  211. def test_autoretry_no_kwargs(self):
  212. self.autoretry_task_no_kwargs.max_retries = 3
  213. self.autoretry_task_no_kwargs.iterations = 0
  214. self.autoretry_task_no_kwargs.apply((1, 0))
  215. assert self.autoretry_task_no_kwargs.iterations == 4
  216. def test_autoretry(self):
  217. self.autoretry_task.max_retries = 3
  218. self.autoretry_task.iterations = 0
  219. self.autoretry_task.apply((1, 0))
  220. assert self.autoretry_task.iterations == 6
  221. @patch('random.randrange', side_effect=lambda i: i - 1)
  222. def test_autoretry_backoff(self, randrange):
  223. task = self.autoretry_backoff_task
  224. task.max_retries = 3
  225. task.iterations = 0
  226. with patch.object(task, 'retry', wraps=task.retry) as fake_retry:
  227. task.apply(("http://httpbin.org/error",))
  228. assert task.iterations == 4
  229. retry_call_countdowns = [
  230. call[1]['countdown'] for call in fake_retry.call_args_list
  231. ]
  232. assert retry_call_countdowns == [1, 2, 4, 8]
  233. @patch('random.randrange', side_effect=lambda i: i - 2)
  234. def test_autoretry_backoff_jitter(self, randrange):
  235. task = self.autoretry_backoff_jitter_task
  236. task.max_retries = 3
  237. task.iterations = 0
  238. with patch.object(task, 'retry', wraps=task.retry) as fake_retry:
  239. task.apply(("http://httpbin.org/error",))
  240. assert task.iterations == 4
  241. retry_call_countdowns = [
  242. call[1]['countdown'] for call in fake_retry.call_args_list
  243. ]
  244. assert retry_call_countdowns == [0, 1, 3, 7]
  245. def test_retry_wrong_eta_when_not_enable_utc(self):
  246. """Issue #3753"""
  247. self.app.conf.enable_utc = False
  248. self.app.conf.timezone = 'US/Eastern'
  249. self.autoretry_task.iterations = 0
  250. self.autoretry_task.default_retry_delay = 2
  251. self.autoretry_task.apply((1, 0))
  252. assert self.autoretry_task.iterations == 6
  253. class test_canvas_utils(TasksCase):
  254. def test_si(self):
  255. assert self.retry_task.si()
  256. assert self.retry_task.si().immutable
  257. def test_chunks(self):
  258. assert self.retry_task.chunks(range(100), 10)
  259. def test_map(self):
  260. assert self.retry_task.map(range(100))
  261. def test_starmap(self):
  262. assert self.retry_task.starmap(range(100))
  263. def test_on_success(self):
  264. self.retry_task.on_success(1, 1, (), {})
  265. class test_tasks(TasksCase):
  266. def now(self):
  267. return self.app.now()
  268. def test_typing(self):
  269. @self.app.task()
  270. def add(x, y, kw=1):
  271. pass
  272. with pytest.raises(TypeError):
  273. add.delay(1)
  274. with pytest.raises(TypeError):
  275. add.delay(1, kw=2)
  276. with pytest.raises(TypeError):
  277. add.delay(1, 2, foobar=3)
  278. add.delay(2, 2)
  279. def test_typing__disabled(self):
  280. @self.app.task(typing=False)
  281. def add(x, y, kw=1):
  282. pass
  283. add.delay(1)
  284. add.delay(1, kw=2)
  285. add.delay(1, 2, foobar=3)
  286. def test_typing__disabled_by_app(self):
  287. with self.Celery(set_as_current=False, strict_typing=False) as app:
  288. @app.task()
  289. def add(x, y, kw=1):
  290. pass
  291. assert not add.typing
  292. add.delay(1)
  293. add.delay(1, kw=2)
  294. add.delay(1, 2, foobar=3)
  295. @pytest.mark.usefixtures('depends_on_current_app')
  296. def test_unpickle_task(self):
  297. import pickle
  298. @self.app.task(shared=True)
  299. def xxx():
  300. pass
  301. assert pickle.loads(pickle.dumps(xxx)) is xxx.app.tasks[xxx.name]
  302. @patch('celery.app.task.current_app')
  303. @pytest.mark.usefixtures('depends_on_current_app')
  304. def test_bind__no_app(self, current_app):
  305. class XTask(Task):
  306. _app = None
  307. XTask._app = None
  308. XTask.__bound__ = False
  309. XTask.bind = Mock(name='bind')
  310. assert XTask.app is current_app
  311. XTask.bind.assert_called_with(current_app)
  312. def test_reprtask__no_fmt(self):
  313. assert _reprtask(self.mytask)
  314. def test_AsyncResult(self):
  315. task_id = uuid()
  316. result = self.retry_task.AsyncResult(task_id)
  317. assert result.backend == self.retry_task.backend
  318. assert result.id == task_id
  319. def assert_next_task_data_equal(self, consumer, presult, task_name,
  320. test_eta=False, test_expires=False,
  321. properties=None, headers=None, **kwargs):
  322. next_task = consumer.queues[0].get(accept=['pickle', 'json'])
  323. task_properties = next_task.properties
  324. task_headers = next_task.headers
  325. task_body = next_task.decode()
  326. task_args, task_kwargs, embed = task_body
  327. assert task_headers['id'] == presult.id
  328. assert task_headers['task'] == task_name
  329. if test_eta:
  330. assert isinstance(task_headers.get('eta'), string_t)
  331. to_datetime = parse_iso8601(task_headers.get('eta'))
  332. assert isinstance(to_datetime, datetime)
  333. if test_expires:
  334. assert isinstance(task_headers.get('expires'), string_t)
  335. to_datetime = parse_iso8601(task_headers.get('expires'))
  336. assert isinstance(to_datetime, datetime)
  337. properties = properties or {}
  338. for arg_name, arg_value in items(properties):
  339. assert task_properties.get(arg_name) == arg_value
  340. headers = headers or {}
  341. for arg_name, arg_value in items(headers):
  342. assert task_headers.get(arg_name) == arg_value
  343. for arg_name, arg_value in items(kwargs):
  344. assert task_kwargs.get(arg_name) == arg_value
  345. def test_incomplete_task_cls(self):
  346. class IncompleteTask(Task):
  347. app = self.app
  348. name = 'c.unittest.t.itask'
  349. with pytest.raises(NotImplementedError):
  350. IncompleteTask().run()
  351. def test_task_kwargs_must_be_dictionary(self):
  352. with pytest.raises(TypeError):
  353. self.increment_counter.apply_async([], 'str')
  354. def test_task_args_must_be_list(self):
  355. with pytest.raises(TypeError):
  356. self.increment_counter.apply_async('s', {})
  357. def test_regular_task(self):
  358. assert isinstance(self.mytask, Task)
  359. assert self.mytask.run()
  360. assert callable(self.mytask)
  361. assert self.mytask(), 'Task class runs run() when called'
  362. with self.app.connection_or_acquire() as conn:
  363. consumer = self.app.amqp.TaskConsumer(conn)
  364. with pytest.raises(NotImplementedError):
  365. consumer.receive('foo', 'foo')
  366. consumer.purge()
  367. assert consumer.queues[0].get() is None
  368. self.app.amqp.TaskConsumer(conn, queues=[Queue('foo')])
  369. # Without arguments.
  370. presult = self.mytask.delay()
  371. self.assert_next_task_data_equal(
  372. consumer, presult, self.mytask.name)
  373. # With arguments.
  374. presult2 = self.mytask.apply_async(
  375. kwargs={'name': 'George Costanza'},
  376. )
  377. self.assert_next_task_data_equal(
  378. consumer, presult2, self.mytask.name, name='George Costanza',
  379. )
  380. # send_task
  381. sresult = self.app.send_task(self.mytask.name,
  382. kwargs={'name': 'Elaine M. Benes'})
  383. self.assert_next_task_data_equal(
  384. consumer, sresult, self.mytask.name, name='Elaine M. Benes',
  385. )
  386. # With ETA.
  387. presult2 = self.mytask.apply_async(
  388. kwargs={'name': 'George Costanza'},
  389. eta=self.now() + timedelta(days=1),
  390. expires=self.now() + timedelta(days=2),
  391. )
  392. self.assert_next_task_data_equal(
  393. consumer, presult2, self.mytask.name,
  394. name='George Costanza', test_eta=True, test_expires=True,
  395. )
  396. # With countdown.
  397. presult2 = self.mytask.apply_async(
  398. kwargs={'name': 'George Costanza'}, countdown=10, expires=12,
  399. )
  400. self.assert_next_task_data_equal(
  401. consumer, presult2, self.mytask.name,
  402. name='George Costanza', test_eta=True, test_expires=True,
  403. )
  404. # Default argsrepr/kwargsrepr behavior
  405. presult2 = self.mytask.apply_async(
  406. args=('spam',), kwargs={'name': 'Jerry Seinfeld'}
  407. )
  408. self.assert_next_task_data_equal(
  409. consumer, presult2, self.mytask.name,
  410. headers={'argsrepr': "('spam',)",
  411. 'kwargsrepr': "{'name': 'Jerry Seinfeld'}"},
  412. )
  413. # With argsrepr/kwargsrepr
  414. presult2 = self.mytask.apply_async(
  415. args=('secret',), argsrepr="'***'",
  416. kwargs={'password': 'foo'}, kwargsrepr="{'password': '***'}",
  417. )
  418. self.assert_next_task_data_equal(
  419. consumer, presult2, self.mytask.name,
  420. headers={'argsrepr': "'***'",
  421. 'kwargsrepr': "{'password': '***'}"},
  422. )
  423. # Discarding all tasks.
  424. consumer.purge()
  425. self.mytask.apply_async()
  426. assert consumer.purge() == 1
  427. assert consumer.queues[0].get() is None
  428. assert not presult.successful()
  429. self.mytask.backend.mark_as_done(presult.id, result=None)
  430. assert presult.successful()
  431. def test_send_event(self):
  432. mytask = self.mytask._get_current_object()
  433. mytask.app.events = Mock(name='events')
  434. mytask.app.events.attach_mock(ContextMock(), 'default_dispatcher')
  435. mytask.request.id = 'fb'
  436. mytask.send_event('task-foo', id=3122)
  437. mytask.app.events.default_dispatcher().send.assert_called_with(
  438. 'task-foo', uuid='fb', id=3122,
  439. retry=True, retry_policy=self.app.conf.task_publish_retry_policy)
  440. def test_replace(self):
  441. sig1 = Mock(name='sig1')
  442. sig1.options = {}
  443. with pytest.raises(Ignore):
  444. self.mytask.replace(sig1)
  445. @pytest.mark.usefixtures('depends_on_current_app')
  446. def test_replace_callback(self):
  447. c = group([self.mytask.s()], app=self.app)
  448. c.freeze = Mock(name='freeze')
  449. c.delay = Mock(name='delay')
  450. self.mytask.request.id = 'id'
  451. self.mytask.request.group = 'group'
  452. self.mytask.request.root_id = 'root_id'
  453. self.mytask.request.callbacks = 'callbacks'
  454. self.mytask.request.errbacks = 'errbacks'
  455. class JsonMagicMock(MagicMock):
  456. parent = None
  457. def __json__(self):
  458. return 'whatever'
  459. def reprcall(self, *args, **kwargs):
  460. return 'whatever2'
  461. mocked_signature = JsonMagicMock(name='s')
  462. accumulate_mock = JsonMagicMock(name='accumulate', s=mocked_signature)
  463. self.mytask.app.tasks['celery.accumulate'] = accumulate_mock
  464. try:
  465. self.mytask.replace(c)
  466. except Ignore:
  467. mocked_signature.return_value.set.assert_called_with(
  468. chord=None,
  469. link='callbacks',
  470. link_error='errbacks',
  471. )
  472. def test_replace_group(self):
  473. c = group([self.mytask.s()], app=self.app)
  474. c.freeze = Mock(name='freeze')
  475. c.delay = Mock(name='delay')
  476. self.mytask.request.id = 'id'
  477. self.mytask.request.group = 'group'
  478. self.mytask.request.root_id = 'root_id',
  479. with pytest.raises(Ignore):
  480. self.mytask.replace(c)
  481. def test_add_trail__no_trail(self):
  482. mytask = self.increment_counter._get_current_object()
  483. mytask.trail = False
  484. mytask.add_trail('foo')
  485. def test_repr_v2_compat(self):
  486. self.mytask.__v2_compat__ = True
  487. assert 'v2 compatible' in repr(self.mytask)
  488. def test_apply_with_self(self):
  489. @self.app.task(__self__=42, shared=False)
  490. def tawself(self):
  491. return self
  492. assert tawself.apply().get() == 42
  493. assert tawself() == 42
  494. def test_context_get(self):
  495. self.mytask.push_request()
  496. try:
  497. request = self.mytask.request
  498. request.foo = 32
  499. assert request.get('foo') == 32
  500. assert request.get('bar', 36) == 36
  501. request.clear()
  502. finally:
  503. self.mytask.pop_request()
  504. def test_annotate(self):
  505. with patch('celery.app.task.resolve_all_annotations') as anno:
  506. anno.return_value = [{'FOO': 'BAR'}]
  507. @self.app.task(shared=False)
  508. def task():
  509. pass
  510. task.annotate()
  511. assert task.FOO == 'BAR'
  512. def test_after_return(self):
  513. self.mytask.push_request()
  514. try:
  515. self.mytask.request.chord = self.mytask.s()
  516. self.mytask.after_return('SUCCESS', 1.0, 'foobar', (), {}, None)
  517. self.mytask.request.clear()
  518. finally:
  519. self.mytask.pop_request()
  520. def test_update_state(self):
  521. @self.app.task(shared=False)
  522. def yyy():
  523. pass
  524. yyy.push_request()
  525. try:
  526. tid = uuid()
  527. yyy.update_state(tid, 'FROBULATING', {'fooz': 'baaz'})
  528. assert yyy.AsyncResult(tid).status == 'FROBULATING'
  529. assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
  530. yyy.request.id = tid
  531. yyy.update_state(state='FROBUZATING', meta={'fooz': 'baaz'})
  532. assert yyy.AsyncResult(tid).status == 'FROBUZATING'
  533. assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
  534. finally:
  535. yyy.pop_request()
  536. def test_repr(self):
  537. @self.app.task(shared=False)
  538. def task_test_repr():
  539. pass
  540. assert 'task_test_repr' in repr(task_test_repr)
  541. def test_has___name__(self):
  542. @self.app.task(shared=False)
  543. def yyy2():
  544. pass
  545. assert yyy2.__name__
  546. class test_apply_task(TasksCase):
  547. def test_apply_throw(self):
  548. with pytest.raises(KeyError):
  549. self.raising.apply(throw=True)
  550. def test_apply_with_task_eager_propagates(self):
  551. self.app.conf.task_eager_propagates = True
  552. with pytest.raises(KeyError):
  553. self.raising.apply()
  554. def test_apply_request_context_is_ok(self):
  555. self.app.conf.task_eager_propagates = True
  556. self.task_check_request_context.apply()
  557. def test_apply(self):
  558. self.increment_counter.count = 0
  559. e = self.increment_counter.apply()
  560. assert isinstance(e, EagerResult)
  561. assert e.get() == 1
  562. e = self.increment_counter.apply(args=[1])
  563. assert e.get() == 2
  564. e = self.increment_counter.apply(kwargs={'increment_by': 4})
  565. assert e.get() == 6
  566. assert e.successful()
  567. assert e.ready()
  568. assert repr(e).startswith('<EagerResult:')
  569. f = self.raising.apply()
  570. assert f.ready()
  571. assert not f.successful()
  572. assert f.traceback
  573. with pytest.raises(KeyError):
  574. f.get()