test_app.py 23 KB


  1. from __future__ import absolute_import
  2. import gc
  3. import os
  4. import itertools
  5. from copy import deepcopy
  6. from pickle import loads, dumps
  7. from amqp import promise
  8. from kombu import Exchange
  9. from celery import shared_task, current_app
  10. from celery import app as _app
  11. from celery import _state
  12. from celery.app import base as _appbase
  13. from celery.app import defaults
  14. from celery.exceptions import ImproperlyConfigured
  15. from celery.five import items
  16. from celery.loaders.base import BaseLoader
  17. from celery.platforms import pyimplementation
  18. from celery.utils.serialization import pickle
  19. from celery.tests.case import (
  20. CELERY_TEST_CONFIG,
  21. AppCase,
  22. Mock,
  23. depends_on_current_app,
  24. mask_modules,
  25. patch,
  26. platform_pyimp,
  27. sys_platform,
  28. pypy_version,
  29. with_environ,
  30. )
  31. from celery.utils import uuid
  32. from celery.utils.mail import ErrorMail
  33. THIS_IS_A_KEY = 'this is a value'
  34. class ObjectConfig(object):
  35. FOO = 1
  36. BAR = 2
  37. object_config = ObjectConfig()
  38. dict_config = dict(FOO=10, BAR=20)
  39. class ObjectConfig2(object):
  40. LEAVE_FOR_WORK = True
  41. MOMENT_TO_STOP = True
  42. CALL_ME_BACK = 123456789
  43. WANT_ME_TO = False
  44. UNDERSTAND_ME = True
  45. class Object(object):
  46. def __init__(self, **kwargs):
  47. for key, value in items(kwargs):
  48. setattr(self, key, value)
  49. def _get_test_config():
  50. return deepcopy(CELERY_TEST_CONFIG)
  51. test_config = _get_test_config()
  52. class test_module(AppCase):
  53. def test_default_app(self):
  54. self.assertEqual(_app.default_app, _state.default_app)
  55. def test_bugreport(self):
  56. self.assertTrue(_app.bugreport(app=self.app))
  57. class test_App(AppCase):
  58. def setup(self):
  59. self.app.add_defaults(test_config)
  60. def test_task_autofinalize_disabled(self):
  61. with self.Celery('xyzibari', autofinalize=False) as app:
  62. @app.task
  63. def ttafd():
  64. return 42
  65. with self.assertRaises(RuntimeError):
  66. ttafd()
  67. with self.Celery('xyzibari', autofinalize=False) as app:
  68. @app.task
  69. def ttafd2():
  70. return 42
  71. app.finalize()
  72. self.assertEqual(ttafd2(), 42)
  73. def test_registry_autofinalize_disabled(self):
  74. with self.Celery('xyzibari', autofinalize=False) as app:
  75. with self.assertRaises(RuntimeError):
  76. app.tasks['celery.chain']
  77. app.finalize()
  78. self.assertTrue(app.tasks['celery.chain'])
  79. def test_task(self):
  80. with self.Celery('foozibari') as app:
  81. def fun():
  82. pass
  83. fun.__module__ = '__main__'
  84. task = app.task(fun)
  85. self.assertEqual(task.name, app.main + '.fun')
  86. def test_with_config_source(self):
  87. with self.Celery(config_source=ObjectConfig) as app:
  88. self.assertEqual(app.conf.FOO, 1)
  89. self.assertEqual(app.conf.BAR, 2)
  90. @depends_on_current_app
  91. def test_task_windows_execv(self):
  92. prev, _appbase._EXECV = _appbase._EXECV, True
  93. try:
  94. @self.app.task(shared=False)
  95. def foo():
  96. pass
  97. self.assertTrue(foo._get_current_object()) # is proxy
  98. finally:
  99. _appbase._EXECV = prev
  100. assert not _appbase._EXECV
  101. def test_task_takes_no_args(self):
  102. with self.assertRaises(TypeError):
  103. @self.app.task(1)
  104. def foo():
  105. pass
  106. def test_add_defaults(self):
  107. self.assertFalse(self.app.configured)
  108. _conf = {'FOO': 300}
  109. conf = lambda: _conf
  110. self.app.add_defaults(conf)
  111. self.assertIn(conf, self.app._pending_defaults)
  112. self.assertFalse(self.app.configured)
  113. self.assertEqual(self.app.conf.FOO, 300)
  114. self.assertTrue(self.app.configured)
  115. self.assertFalse(self.app._pending_defaults)
  116. # defaults not pickled
  117. appr = loads(dumps(self.app))
  118. with self.assertRaises(AttributeError):
  119. appr.conf.FOO
  120. # add more defaults after configured
  121. conf2 = {'FOO': 'BAR'}
  122. self.app.add_defaults(conf2)
  123. self.assertEqual(self.app.conf.FOO, 'BAR')
  124. self.assertIn(_conf, self.app.conf.defaults)
  125. self.assertIn(conf2, self.app.conf.defaults)
  126. def test_connection_or_acquire(self):
  127. with self.app.connection_or_acquire(block=True):
  128. self.assertTrue(self.app.pool._dirty)
  129. with self.app.connection_or_acquire(pool=False):
  130. self.assertFalse(self.app.pool._dirty)
  131. def test_maybe_close_pool(self):
  132. cpool = self.app._pool = Mock()
  133. amqp = self.app.__dict__['amqp'] = Mock()
  134. ppool = amqp._producer_pool
  135. self.app._maybe_close_pool()
  136. cpool.force_close_all.assert_called_with()
  137. ppool.force_close_all.assert_called_with()
  138. self.assertIsNone(self.app._pool)
  139. self.assertIsNone(self.app.__dict__['amqp']._producer_pool)
  140. self.app._pool = Mock()
  141. self.app._maybe_close_pool()
  142. self.app._maybe_close_pool()
  143. def test_using_v1_reduce(self):
  144. self.app._using_v1_reduce = True
  145. self.assertTrue(loads(dumps(self.app)))
  146. def test_autodiscover_tasks_force(self):
  147. self.app.loader.autodiscover_tasks = Mock()
  148. self.app.autodiscover_tasks(['proj.A', 'proj.B'], force=True)
  149. self.app.loader.autodiscover_tasks.assert_called_with(
  150. ['proj.A', 'proj.B'], 'tasks',
  151. )
  152. self.app.loader.autodiscover_tasks = Mock()
  153. self.app.autodiscover_tasks(
  154. lambda: ['proj.A', 'proj.B'],
  155. related_name='george',
  156. force=True,
  157. )
  158. self.app.loader.autodiscover_tasks.assert_called_with(
  159. ['proj.A', 'proj.B'], 'george',
  160. )
  161. def test_autodiscover_tasks_lazy(self):
  162. with patch('celery.signals.import_modules') as import_modules:
  163. packages = lambda: [1, 2, 3]
  164. self.app.autodiscover_tasks(packages)
  165. self.assertTrue(import_modules.connect.called)
  166. prom = import_modules.connect.call_args[0][0]
  167. self.assertIsInstance(prom, promise)
  168. self.assertEqual(prom.fun, self.app._autodiscover_tasks)
  169. self.assertEqual(prom.args[0](), [1, 2, 3])
  170. @with_environ('CELERY_BROKER_URL', '')
  171. def test_with_broker(self):
  172. with self.Celery(broker='foo://baribaz') as app:
  173. self.assertEqual(app.conf.BROKER_URL, 'foo://baribaz')
  174. def test_repr(self):
  175. self.assertTrue(repr(self.app))
  176. def test_custom_task_registry(self):
  177. with self.Celery(tasks=self.app.tasks) as app2:
  178. self.assertIs(app2.tasks, self.app.tasks)
  179. def test_include_argument(self):
  180. with self.Celery(include=('foo', 'bar.foo')) as app:
  181. self.assertEqual(app.conf.CELERY_IMPORTS, ('foo', 'bar.foo'))
  182. def test_set_as_current(self):
  183. current = _state._tls.current_app
  184. try:
  185. app = self.Celery(set_as_current=True)
  186. self.assertIs(_state._tls.current_app, app)
  187. finally:
  188. _state._tls.current_app = current
  189. def test_current_task(self):
  190. @self.app.task
  191. def foo(shared=False):
  192. pass
  193. _state._task_stack.push(foo)
  194. try:
  195. self.assertEqual(self.app.current_task.name, foo.name)
  196. finally:
  197. _state._task_stack.pop()
  198. def test_task_not_shared(self):
  199. with patch('celery.app.base.shared_task') as sh:
  200. @self.app.task(shared=False)
  201. def foo():
  202. pass
  203. self.assertFalse(sh.called)
  204. def test_task_compat_with_filter(self):
  205. with self.Celery(accept_magic_kwargs=True) as app:
  206. check = Mock()
  207. def filter(task):
  208. check(task)
  209. return task
  210. @app.task(filter=filter, shared=False)
  211. def foo():
  212. pass
  213. check.assert_called_with(foo)
  214. def test_task_with_filter(self):
  215. with self.Celery(accept_magic_kwargs=False) as app:
  216. check = Mock()
  217. def filter(task):
  218. check(task)
  219. return task
  220. assert not _appbase._EXECV
  221. @app.task(filter=filter, shared=False)
  222. def foo():
  223. pass
  224. check.assert_called_with(foo)
  225. def test_task_sets_main_name_MP_MAIN_FILE(self):
  226. from celery import utils as _utils
  227. _utils.MP_MAIN_FILE = __file__
  228. try:
  229. with self.Celery('xuzzy') as app:
  230. @app.task
  231. def foo():
  232. pass
  233. self.assertEqual(foo.name, 'xuzzy.foo')
  234. finally:
  235. _utils.MP_MAIN_FILE = None
  236. def test_annotate_decorator(self):
  237. from celery.app.task import Task
  238. class adX(Task):
  239. abstract = True
  240. def run(self, y, z, x):
  241. return y, z, x
  242. check = Mock()
  243. def deco(fun):
  244. def _inner(*args, **kwargs):
  245. check(*args, **kwargs)
  246. return fun(*args, **kwargs)
  247. return _inner
  248. self.app.conf.CELERY_ANNOTATIONS = {
  249. adX.name: {'@__call__': deco}
  250. }
  251. adX.bind(self.app)
  252. self.assertIs(adX.app, self.app)
  253. i = adX()
  254. i(2, 4, x=3)
  255. check.assert_called_with(i, 2, 4, x=3)
  256. i.annotate()
  257. i.annotate()
  258. def test_apply_async_has__self__(self):
  259. @self.app.task(__self__='hello', shared=False)
  260. def aawsX():
  261. pass
  262. with patch('celery.app.amqp.TaskProducer.publish_task') as dt:
  263. aawsX.apply_async((4, 5))
  264. args = dt.call_args[0][1]
  265. self.assertEqual(args, ('hello', 4, 5))
  266. def test_apply_async_adds_children(self):
  267. from celery._state import _task_stack
  268. @self.app.task(shared=False)
  269. def a3cX1(self):
  270. pass
  271. @self.app.task(shared=False)
  272. def a3cX2(self):
  273. pass
  274. _task_stack.push(a3cX1)
  275. try:
  276. a3cX1.push_request(called_directly=False)
  277. try:
  278. res = a3cX2.apply_async(add_to_parent=True)
  279. self.assertIn(res, a3cX1.request.children)
  280. finally:
  281. a3cX1.pop_request()
  282. finally:
  283. _task_stack.pop()
  284. def test_pickle_app(self):
  285. changes = dict(THE_FOO_BAR='bars',
  286. THE_MII_MAR='jars')
  287. self.app.conf.update(changes)
  288. saved = pickle.dumps(self.app)
  289. self.assertLess(len(saved), 2048)
  290. restored = pickle.loads(saved)
  291. self.assertDictContainsSubset(changes, restored.conf)
  292. def test_worker_main(self):
  293. from celery.bin import worker as worker_bin
  294. class worker(worker_bin.worker):
  295. def execute_from_commandline(self, argv):
  296. return argv
  297. prev, worker_bin.worker = worker_bin.worker, worker
  298. try:
  299. ret = self.app.worker_main(argv=['--version'])
  300. self.assertListEqual(ret, ['--version'])
  301. finally:
  302. worker_bin.worker = prev
  303. def test_config_from_envvar(self):
  304. os.environ['CELERYTEST_CONFIG_OBJECT'] = 'celery.tests.app.test_app'
  305. self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')
  306. self.assertEqual(self.app.conf.THIS_IS_A_KEY, 'this is a value')
  307. def assert_config2(self):
  308. self.assertTrue(self.app.conf.LEAVE_FOR_WORK)
  309. self.assertTrue(self.app.conf.MOMENT_TO_STOP)
  310. self.assertEqual(self.app.conf.CALL_ME_BACK, 123456789)
  311. self.assertFalse(self.app.conf.WANT_ME_TO)
  312. self.assertTrue(self.app.conf.UNDERSTAND_ME)
  313. def test_config_from_object__lazy(self):
  314. conf = ObjectConfig2()
  315. self.app.config_from_object(conf)
  316. self.assertFalse(self.app.loader._conf)
  317. self.assertIs(self.app._config_source, conf)
  318. self.assert_config2()
  319. def test_config_from_object__force(self):
  320. self.app.config_from_object(ObjectConfig2(), force=True)
  321. self.assertTrue(self.app.loader._conf)
  322. self.assert_config2()
  323. def test_config_from_cmdline(self):
  324. cmdline = ['.always_eager=no',
  325. '.result_backend=/dev/null',
  326. 'celeryd.prefetch_multiplier=368',
  327. '.foobarstring=(string)300',
  328. '.foobarint=(int)300',
  329. '.result_engine_options=(dict){"foo": "bar"}']
  330. self.app.config_from_cmdline(cmdline, namespace='celery')
  331. self.assertFalse(self.app.conf.CELERY_ALWAYS_EAGER)
  332. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, '/dev/null')
  333. self.assertEqual(self.app.conf.CELERYD_PREFETCH_MULTIPLIER, 368)
  334. self.assertEqual(self.app.conf.CELERY_FOOBARSTRING, '300')
  335. self.assertEqual(self.app.conf.CELERY_FOOBARINT, 300)
  336. self.assertDictEqual(self.app.conf.CELERY_RESULT_ENGINE_OPTIONS,
  337. {'foo': 'bar'})
  338. def test_compat_setting_CELERY_BACKEND(self):
  339. self.app.config_from_object(Object(CELERY_BACKEND='set_by_us'))
  340. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, 'set_by_us')
  341. def test_setting_BROKER_TRANSPORT_OPTIONS(self):
  342. _args = {'foo': 'bar', 'spam': 'baz'}
  343. self.app.config_from_object(Object())
  344. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, {})
  345. self.app.config_from_object(Object(BROKER_TRANSPORT_OPTIONS=_args))
  346. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, _args)
  347. def test_Windows_log_color_disabled(self):
  348. self.app.IS_WINDOWS = True
  349. self.assertFalse(self.app.log.supports_color(True))
  350. def test_compat_setting_CARROT_BACKEND(self):
  351. self.app.config_from_object(Object(CARROT_BACKEND='set_by_us'))
  352. self.assertEqual(self.app.conf.BROKER_TRANSPORT, 'set_by_us')
  353. def test_WorkController(self):
  354. x = self.app.WorkController
  355. self.assertIs(x.app, self.app)
  356. def test_Worker(self):
  357. x = self.app.Worker
  358. self.assertIs(x.app, self.app)
  359. @depends_on_current_app
  360. def test_AsyncResult(self):
  361. x = self.app.AsyncResult('1')
  362. self.assertIs(x.app, self.app)
  363. r = loads(dumps(x))
  364. # not set as current, so ends up as default app after reduce
  365. self.assertIs(r.app, current_app._get_current_object())
  366. def test_get_active_apps(self):
  367. self.assertTrue(list(_state._get_active_apps()))
  368. app1 = self.Celery()
  369. appid = id(app1)
  370. self.assertIn(app1, _state._get_active_apps())
  371. app1.close()
  372. del(app1)
  373. gc.collect()
  374. # weakref removed from list when app goes out of scope.
  375. with self.assertRaises(StopIteration):
  376. next(app for app in _state._get_active_apps() if id(app) == appid)
  377. def test_config_from_envvar_more(self, key='CELERY_HARNESS_CFG1'):
  378. self.assertFalse(
  379. self.app.config_from_envvar(
  380. 'HDSAJIHWIQHEWQU', force=True, silent=True),
  381. )
  382. with self.assertRaises(ImproperlyConfigured):
  383. self.app.config_from_envvar(
  384. 'HDSAJIHWIQHEWQU', force=True, silent=False,
  385. )
  386. os.environ[key] = __name__ + '.object_config'
  387. self.assertTrue(self.app.config_from_envvar(key, force=True))
  388. self.assertEqual(self.app.conf['FOO'], 1)
  389. self.assertEqual(self.app.conf['BAR'], 2)
  390. os.environ[key] = 'unknown_asdwqe.asdwqewqe'
  391. with self.assertRaises(ImportError):
  392. self.app.config_from_envvar(key, silent=False)
  393. self.assertFalse(
  394. self.app.config_from_envvar(key, force=True, silent=True),
  395. )
  396. os.environ[key] = __name__ + '.dict_config'
  397. self.assertTrue(self.app.config_from_envvar(key, force=True))
  398. self.assertEqual(self.app.conf['FOO'], 10)
  399. self.assertEqual(self.app.conf['BAR'], 20)
  400. @patch('celery.bin.celery.CeleryCommand.execute_from_commandline')
  401. def test_start(self, execute):
  402. self.app.start()
  403. self.assertTrue(execute.called)
  404. def test_mail_admins(self):
  405. class Loader(BaseLoader):
  406. def mail_admins(*args, **kwargs):
  407. return args, kwargs
  408. self.app.loader = Loader(app=self.app)
  409. self.app.conf.ADMINS = None
  410. self.assertFalse(self.app.mail_admins('Subject', 'Body'))
  411. self.app.conf.ADMINS = [('George Costanza', 'george@vandelay.com')]
  412. self.assertTrue(self.app.mail_admins('Subject', 'Body'))
  413. def test_amqp_get_broker_info(self):
  414. self.assertDictContainsSubset(
  415. {'hostname': 'localhost',
  416. 'userid': 'guest',
  417. 'password': 'guest',
  418. 'virtual_host': '/'},
  419. self.app.connection('pyamqp://').info(),
  420. )
  421. self.app.conf.BROKER_PORT = 1978
  422. self.app.conf.BROKER_VHOST = 'foo'
  423. self.assertDictContainsSubset(
  424. {'port': 1978, 'virtual_host': 'foo'},
  425. self.app.connection('pyamqp://:1978/foo').info(),
  426. )
  427. conn = self.app.connection('pyamqp:////value')
  428. self.assertDictContainsSubset({'virtual_host': '/value'},
  429. conn.info())
  430. def test_amqp_failover_strategy_selection(self):
  431. # Test passing in a string and make sure the string
  432. # gets there untouched
  433. self.app.conf.BROKER_FAILOVER_STRATEGY = 'foo-bar'
  434. self.assertEqual(
  435. self.app.connection('amqp:////value').failover_strategy,
  436. 'foo-bar',
  437. )
  438. # Try passing in None
  439. self.app.conf.BROKER_FAILOVER_STRATEGY = None
  440. self.assertEqual(
  441. self.app.connection('amqp:////value').failover_strategy,
  442. itertools.cycle,
  443. )
  444. # Test passing in a method
  445. def my_failover_strategy(it):
  446. yield True
  447. self.app.conf.BROKER_FAILOVER_STRATEGY = my_failover_strategy
  448. self.assertEqual(
  449. self.app.connection('amqp:////value').failover_strategy,
  450. my_failover_strategy,
  451. )
  452. def test_BROKER_BACKEND_alias(self):
  453. self.assertEqual(self.app.conf.BROKER_BACKEND,
  454. self.app.conf.BROKER_TRANSPORT)
  455. def test_after_fork(self):
  456. p = self.app._pool = Mock()
  457. self.app._after_fork(self.app)
  458. p.force_close_all.assert_called_with()
  459. self.assertIsNone(self.app._pool)
  460. self.app._after_fork(self.app)
  461. def test_pool_no_multiprocessing(self):
  462. with mask_modules('multiprocessing.util'):
  463. pool = self.app.pool
  464. self.assertIs(pool, self.app._pool)
  465. def test_bugreport(self):
  466. self.assertTrue(self.app.bugreport())
  467. def test_send_task_sent_event(self):
  468. class Dispatcher(object):
  469. sent = []
  470. def publish(self, type, fields, *args, **kwargs):
  471. self.sent.append((type, fields))
  472. conn = self.app.connection()
  473. chan = conn.channel()
  474. try:
  475. for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):
  476. chan.exchange_declare(e, 'direct', durable=True)
  477. chan.queue_declare(e, durable=True)
  478. chan.queue_bind(e, e, e)
  479. finally:
  480. chan.close()
  481. assert conn.transport_cls == 'memory'
  482. prod = self.app.amqp.TaskProducer(
  483. conn, exchange=Exchange('foo_exchange'),
  484. send_sent_event=True,
  485. )
  486. dispatcher = Dispatcher()
  487. self.assertTrue(prod.publish_task('footask', (), {},
  488. exchange='moo_exchange',
  489. routing_key='moo_exchange',
  490. event_dispatcher=dispatcher))
  491. self.assertTrue(dispatcher.sent)
  492. self.assertEqual(dispatcher.sent[0][0], 'task-sent')
  493. self.assertTrue(prod.publish_task('footask', (), {},
  494. event_dispatcher=dispatcher,
  495. exchange='bar_exchange',
  496. routing_key='bar_exchange'))
  497. def test_error_mail_sender(self):
  498. x = ErrorMail.subject % {'name': 'task_name',
  499. 'id': uuid(),
  500. 'exc': 'FOOBARBAZ',
  501. 'hostname': 'lana'}
  502. self.assertTrue(x)
  503. def test_error_mail_disabled(self):
  504. task = Mock()
  505. x = ErrorMail(task)
  506. x.should_send = Mock()
  507. x.should_send.return_value = False
  508. x.send(Mock(), Mock())
  509. self.assertFalse(task.app.mail_admins.called)
  510. class test_defaults(AppCase):
  511. def test_str_to_bool(self):
  512. for s in ('false', 'no', '0'):
  513. self.assertFalse(defaults.strtobool(s))
  514. for s in ('true', 'yes', '1'):
  515. self.assertTrue(defaults.strtobool(s))
  516. with self.assertRaises(TypeError):
  517. defaults.strtobool('unsure')
  518. class test_debugging_utils(AppCase):
  519. def test_enable_disable_trace(self):
  520. try:
  521. _app.enable_trace()
  522. self.assertEqual(_app.app_or_default, _app._app_or_default_trace)
  523. _app.disable_trace()
  524. self.assertEqual(_app.app_or_default, _app._app_or_default)
  525. finally:
  526. _app.disable_trace()
  527. class test_pyimplementation(AppCase):
  528. def test_platform_python_implementation(self):
  529. with platform_pyimp(lambda: 'Xython'):
  530. self.assertEqual(pyimplementation(), 'Xython')
  531. def test_platform_jython(self):
  532. with platform_pyimp():
  533. with sys_platform('java 1.6.51'):
  534. self.assertIn('Jython', pyimplementation())
  535. def test_platform_pypy(self):
  536. with platform_pyimp():
  537. with sys_platform('darwin'):
  538. with pypy_version((1, 4, 3)):
  539. self.assertIn('PyPy', pyimplementation())
  540. with pypy_version((1, 4, 3, 'a4')):
  541. self.assertIn('PyPy', pyimplementation())
  542. def test_platform_fallback(self):
  543. with platform_pyimp():
  544. with sys_platform('darwin'):
  545. with pypy_version():
  546. self.assertEqual('CPython', pyimplementation())
  547. class test_shared_task(AppCase):
  548. def test_registers_to_all_apps(self):
  549. with self.Celery('xproj', set_as_current=True) as xproj:
  550. xproj.finalize()
  551. @shared_task
  552. def foo():
  553. return 42
  554. @shared_task()
  555. def bar():
  556. return 84
  557. self.assertIs(foo.app, xproj)
  558. self.assertIs(bar.app, xproj)
  559. self.assertTrue(foo._get_current_object())
  560. with self.Celery('yproj', set_as_current=True) as yproj:
  561. self.assertIs(foo.app, yproj)
  562. self.assertIs(bar.app, yproj)
  563. @shared_task()
  564. def baz():
  565. return 168
  566. self.assertIs(baz.app, yproj)