test_app.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996
  1. from __future__ import absolute_import
  2. import gc
  3. import os
  4. import itertools
  5. from copy import deepcopy
  6. from pickle import loads, dumps
  7. from amqp import promise
  8. from celery import Celery
  9. from celery import shared_task, current_app
  10. from celery import app as _app
  11. from celery import _state
  12. from celery.app import base as _appbase
  13. from celery.app import defaults
  14. from celery.exceptions import ImproperlyConfigured
  15. from celery.five import items, keys
  16. from celery.loaders.base import BaseLoader, unconfigured
  17. from celery.platforms import pyimplementation
  18. from celery.utils.serialization import pickle
  19. from celery.utils.timeutils import timezone
  20. from celery.tests.case import (
  21. CELERY_TEST_CONFIG,
  22. AppCase,
  23. Mock,
  24. Case,
  25. ContextMock,
  26. depends_on_current_app,
  27. mask_modules,
  28. patch,
  29. platform_pyimp,
  30. sys_platform,
  31. pypy_version,
  32. with_environ,
  33. )
  34. from celery.utils import uuid
  35. from celery.utils.mail import ErrorMail
  36. THIS_IS_A_KEY = 'this is a value'
  37. class ObjectConfig(object):
  38. FOO = 1
  39. BAR = 2
  40. object_config = ObjectConfig()
  41. dict_config = dict(FOO=10, BAR=20)
  42. class ObjectConfig2(object):
  43. LEAVE_FOR_WORK = True
  44. MOMENT_TO_STOP = True
  45. CALL_ME_BACK = 123456789
  46. WANT_ME_TO = False
  47. UNDERSTAND_ME = True
  48. class Object(object):
  49. def __init__(self, **kwargs):
  50. for key, value in items(kwargs):
  51. setattr(self, key, value)
  52. def _get_test_config():
  53. return deepcopy(CELERY_TEST_CONFIG)
  54. test_config = _get_test_config()
  55. class test_module(AppCase):
  56. def test_default_app(self):
  57. self.assertEqual(_app.default_app, _state.default_app)
  58. def test_bugreport(self):
  59. self.assertTrue(_app.bugreport(app=self.app))
  60. class test_task_join_will_block(Case):
  61. def test_task_join_will_block(self):
  62. prev, _state._task_join_will_block = _state._task_join_will_block, 0
  63. try:
  64. self.assertEqual(_state._task_join_will_block, 0)
  65. _state._set_task_join_will_block(True)
  66. print(_state.task_join_will_block)
  67. self.assertTrue(_state.task_join_will_block())
  68. finally:
  69. _state._task_join_will_block = prev
  70. class test_App(AppCase):
  71. def setup(self):
  72. self.app.add_defaults(test_config)
  73. def test_task_autofinalize_disabled(self):
  74. with self.Celery('xyzibari', autofinalize=False) as app:
  75. @app.task
  76. def ttafd():
  77. return 42
  78. with self.assertRaises(RuntimeError):
  79. ttafd()
  80. with self.Celery('xyzibari', autofinalize=False) as app:
  81. @app.task
  82. def ttafd2():
  83. return 42
  84. app.finalize()
  85. self.assertEqual(ttafd2(), 42)
  86. def test_registry_autofinalize_disabled(self):
  87. with self.Celery('xyzibari', autofinalize=False) as app:
  88. with self.assertRaises(RuntimeError):
  89. app.tasks['celery.chain']
  90. app.finalize()
  91. self.assertTrue(app.tasks['celery.chain'])
  92. def test_task(self):
  93. with self.Celery('foozibari') as app:
  94. def fun():
  95. pass
  96. fun.__module__ = '__main__'
  97. task = app.task(fun)
  98. self.assertEqual(task.name, app.main + '.fun')
  99. def test_task_too_many_args(self):
  100. with self.assertRaises(TypeError):
  101. self.app.task(Mock(name='fun'), True)
  102. with self.assertRaises(TypeError):
  103. self.app.task(Mock(name='fun'), True, 1, 2)
  104. def test_with_config_source(self):
  105. with self.Celery(config_source=ObjectConfig) as app:
  106. self.assertEqual(app.conf.FOO, 1)
  107. self.assertEqual(app.conf.BAR, 2)
  108. @depends_on_current_app
  109. def test_task_windows_execv(self):
  110. prev, _appbase._EXECV = _appbase._EXECV, True
  111. try:
  112. @self.app.task(shared=False)
  113. def foo():
  114. pass
  115. self.assertTrue(foo._get_current_object()) # is proxy
  116. finally:
  117. _appbase._EXECV = prev
  118. assert not _appbase._EXECV
  119. def test_task_takes_no_args(self):
  120. with self.assertRaises(TypeError):
  121. @self.app.task(1)
  122. def foo():
  123. pass
  124. def test_add_defaults(self):
  125. self.assertFalse(self.app.configured)
  126. _conf = {'FOO': 300}
  127. def conf():
  128. return _conf
  129. self.app.add_defaults(conf)
  130. self.assertIn(conf, self.app._pending_defaults)
  131. self.assertFalse(self.app.configured)
  132. self.assertEqual(self.app.conf.FOO, 300)
  133. self.assertTrue(self.app.configured)
  134. self.assertFalse(self.app._pending_defaults)
  135. # defaults not pickled
  136. appr = loads(dumps(self.app))
  137. with self.assertRaises(AttributeError):
  138. appr.conf.FOO
  139. # add more defaults after configured
  140. conf2 = {'FOO': 'BAR'}
  141. self.app.add_defaults(conf2)
  142. self.assertEqual(self.app.conf.FOO, 'BAR')
  143. self.assertIn(_conf, self.app.conf.defaults)
  144. self.assertIn(conf2, self.app.conf.defaults)
  145. def test_connection_or_acquire(self):
  146. with self.app.connection_or_acquire(block=True):
  147. self.assertTrue(self.app.pool._dirty)
  148. with self.app.connection_or_acquire(pool=False):
  149. self.assertFalse(self.app.pool._dirty)
  150. def test_maybe_close_pool(self):
  151. cpool = self.app._pool = Mock()
  152. amqp = self.app.__dict__['amqp'] = Mock()
  153. ppool = amqp._producer_pool
  154. self.app._maybe_close_pool()
  155. cpool.force_close_all.assert_called_with()
  156. ppool.force_close_all.assert_called_with()
  157. self.assertIsNone(self.app._pool)
  158. self.assertIsNone(self.app.__dict__['amqp']._producer_pool)
  159. self.app._pool = Mock()
  160. self.app._maybe_close_pool()
  161. self.app._maybe_close_pool()
  162. def test_using_v1_reduce(self):
  163. self.app._using_v1_reduce = True
  164. self.assertTrue(loads(dumps(self.app)))
  165. def test_autodiscover_tasks_force(self):
  166. self.app.loader.autodiscover_tasks = Mock()
  167. self.app.autodiscover_tasks(['proj.A', 'proj.B'], force=True)
  168. self.app.loader.autodiscover_tasks.assert_called_with(
  169. ['proj.A', 'proj.B'], 'tasks',
  170. )
  171. self.app.loader.autodiscover_tasks = Mock()
  172. def lazy_list():
  173. return ['proj.A', 'proj.B']
  174. self.app.autodiscover_tasks(
  175. lazy_list,
  176. related_name='george',
  177. force=True,
  178. )
  179. self.app.loader.autodiscover_tasks.assert_called_with(
  180. ['proj.A', 'proj.B'], 'george',
  181. )
  182. def test_autodiscover_tasks_lazy(self):
  183. with patch('celery.signals.import_modules') as import_modules:
  184. def lazy_list():
  185. return [1, 2, 3]
  186. self.app.autodiscover_tasks(lazy_list)
  187. self.assertTrue(import_modules.connect.called)
  188. prom = import_modules.connect.call_args[0][0]
  189. self.assertIsInstance(prom, promise)
  190. self.assertEqual(prom.fun, self.app._autodiscover_tasks)
  191. self.assertEqual(prom.args[0](), [1, 2, 3])
  192. def test_autodiscover_tasks__no_packages(self):
  193. fixup1 = Mock(name='fixup')
  194. fixup2 = Mock(name='fixup')
  195. self.app._autodiscover_tasks_from_names = Mock(name='auto')
  196. self.app._fixups = [fixup1, fixup2]
  197. fixup1.autodiscover_tasks.return_value = ['A', 'B', 'C']
  198. fixup2.autodiscover_tasks.return_value = ['D', 'E', 'F']
  199. self.app.autodiscover_tasks(force=True)
  200. self.app._autodiscover_tasks_from_names.assert_called_with(
  201. ['A', 'B', 'C', 'D', 'E', 'F'], related_name='tasks',
  202. )
  203. @with_environ('CELERY_BROKER_URL', '')
  204. def test_with_broker(self):
  205. with self.Celery(broker='foo://baribaz') as app:
  206. self.assertEqual(app.conf.broker_url, 'foo://baribaz')
  207. def test_pending_configuration__setattr(self):
  208. with self.Celery(broker='foo://bar') as app:
  209. app.conf.task_default_delivery_mode = 44
  210. app.conf.worker_agent = 'foo:Bar'
  211. self.assertFalse(app.configured)
  212. self.assertEqual(app.conf.worker_agent, 'foo:Bar')
  213. self.assertEqual(app.conf.broker_url, 'foo://bar')
  214. self.assertEqual(app._preconf['worker_agent'], 'foo:Bar')
  215. self.assertTrue(app.configured)
  216. reapp = pickle.loads(pickle.dumps(app))
  217. self.assertEqual(reapp._preconf['worker_agent'], 'foo:Bar')
  218. self.assertFalse(reapp.configured)
  219. self.assertEqual(reapp.conf.worker_agent, 'foo:Bar')
  220. self.assertTrue(reapp.configured)
  221. self.assertEqual(reapp.conf.broker_url, 'foo://bar')
  222. self.assertEqual(reapp._preconf['worker_agent'], 'foo:Bar')
  223. def test_pending_configuration__update(self):
  224. with self.Celery(broker='foo://bar') as app:
  225. app.conf.update(
  226. task_default_delivery_mode=44,
  227. worker_agent='foo:Bar',
  228. )
  229. self.assertFalse(app.configured)
  230. self.assertEqual(app.conf.worker_agent, 'foo:Bar')
  231. self.assertEqual(app.conf.broker_url, 'foo://bar')
  232. self.assertEqual(app._preconf['worker_agent'], 'foo:Bar')
  233. def test_pending_configuration__compat_settings(self):
  234. with self.Celery(broker='foo://bar', backend='foo') as app:
  235. app.conf.update(
  236. CELERY_ALWAYS_EAGER=4,
  237. CELERY_DEFAULT_DELIVERY_MODE=63,
  238. CELERYD_AGENT='foo:Barz',
  239. )
  240. self.assertEqual(app.conf.task_always_eager, 4)
  241. self.assertEqual(app.conf.task_default_delivery_mode, 63)
  242. self.assertEqual(app.conf.worker_agent, 'foo:Barz')
  243. self.assertEqual(app.conf.broker_url, 'foo://bar')
  244. self.assertEqual(app.conf.result_backend, 'foo')
  245. def test_pending_configuration__setdefault(self):
  246. with self.Celery(broker='foo://bar') as app:
  247. app.conf.setdefault('worker_agent', 'foo:Bar')
  248. self.assertFalse(app.configured)
  249. def test_pending_configuration__iter(self):
  250. with self.Celery(broker='foo://bar') as app:
  251. app.conf.worker_agent = 'foo:Bar'
  252. self.assertFalse(app.configured)
  253. self.assertTrue(list(keys(app.conf)))
  254. self.assertFalse(app.configured)
  255. self.assertIn('worker_agent', app.conf)
  256. self.assertFalse(app.configured)
  257. self.assertTrue(dict(app.conf))
  258. self.assertTrue(app.configured)
  259. def test_pending_configuration__raises_ImproperlyConfigured(self):
  260. with self.Celery(set_as_current=False) as app:
  261. app.conf.worker_agent = 'foo://bar'
  262. app.conf.task_default_delivery_mode = 44
  263. app.conf.CELERY_ALWAYS_EAGER = 5
  264. with self.assertRaises(ImproperlyConfigured):
  265. app.finalize()
  266. with self.Celery() as app:
  267. self.assertFalse(self.app.conf.task_always_eager)
  268. def test_repr(self):
  269. self.assertTrue(repr(self.app))
  270. def test_custom_task_registry(self):
  271. with self.Celery(tasks=self.app.tasks) as app2:
  272. self.assertIs(app2.tasks, self.app.tasks)
  273. def test_include_argument(self):
  274. with self.Celery(include=('foo', 'bar.foo')) as app:
  275. self.assertEqual(app.conf.include, ('foo', 'bar.foo'))
  276. def test_set_as_current(self):
  277. current = _state._tls.current_app
  278. try:
  279. app = self.Celery(set_as_current=True)
  280. self.assertIs(_state._tls.current_app, app)
  281. finally:
  282. _state._tls.current_app = current
  283. def test_current_task(self):
  284. @self.app.task
  285. def foo(shared=False):
  286. pass
  287. _state._task_stack.push(foo)
  288. try:
  289. self.assertEqual(self.app.current_task.name, foo.name)
  290. finally:
  291. _state._task_stack.pop()
  292. def test_task_not_shared(self):
  293. with patch('celery.app.base.connect_on_app_finalize') as sh:
  294. @self.app.task(shared=False)
  295. def foo():
  296. pass
  297. self.assertFalse(sh.called)
  298. def test_task_compat_with_filter(self):
  299. with self.Celery() as app:
  300. check = Mock()
  301. def filter(task):
  302. check(task)
  303. return task
  304. @app.task(filter=filter, shared=False)
  305. def foo():
  306. pass
  307. check.assert_called_with(foo)
  308. def test_task_with_filter(self):
  309. with self.Celery() as app:
  310. check = Mock()
  311. def filter(task):
  312. check(task)
  313. return task
  314. assert not _appbase._EXECV
  315. @app.task(filter=filter, shared=False)
  316. def foo():
  317. pass
  318. check.assert_called_with(foo)
  319. def test_task_sets_main_name_MP_MAIN_FILE(self):
  320. from celery import utils as _utils
  321. _utils.MP_MAIN_FILE = __file__
  322. try:
  323. with self.Celery('xuzzy') as app:
  324. @app.task
  325. def foo():
  326. pass
  327. self.assertEqual(foo.name, 'xuzzy.foo')
  328. finally:
  329. _utils.MP_MAIN_FILE = None
  330. def test_annotate_decorator(self):
  331. from celery.app.task import Task
  332. class adX(Task):
  333. abstract = True
  334. def run(self, y, z, x):
  335. return y, z, x
  336. check = Mock()
  337. def deco(fun):
  338. def _inner(*args, **kwargs):
  339. check(*args, **kwargs)
  340. return fun(*args, **kwargs)
  341. return _inner
  342. self.app.conf.task_annotations = {
  343. adX.name: {'@__call__': deco}
  344. }
  345. adX.bind(self.app)
  346. self.assertIs(adX.app, self.app)
  347. i = adX()
  348. i(2, 4, x=3)
  349. check.assert_called_with(i, 2, 4, x=3)
  350. i.annotate()
  351. i.annotate()
  352. def test_apply_async_has__self__(self):
  353. @self.app.task(__self__='hello', shared=False)
  354. def aawsX(x, y):
  355. pass
  356. with self.assertRaises(TypeError):
  357. aawsX.apply_async(())
  358. with self.assertRaises(TypeError):
  359. aawsX.apply_async((2,))
  360. with patch('celery.app.amqp.AMQP.create_task_message') as create:
  361. with patch('celery.app.amqp.AMQP.send_task_message') as send:
  362. create.return_value = Mock(), Mock(), Mock(), Mock()
  363. aawsX.apply_async((4, 5))
  364. args = create.call_args[0][2]
  365. self.assertEqual(args, ('hello', 4, 5))
  366. self.assertTrue(send.called)
  367. def test_apply_async_adds_children(self):
  368. from celery._state import _task_stack
  369. @self.app.task(bind=True, shared=False)
  370. def a3cX1(self):
  371. pass
  372. @self.app.task(bind=True, shared=False)
  373. def a3cX2(self):
  374. pass
  375. _task_stack.push(a3cX1)
  376. try:
  377. a3cX1.push_request(called_directly=False)
  378. try:
  379. res = a3cX2.apply_async(add_to_parent=True)
  380. self.assertIn(res, a3cX1.request.children)
  381. finally:
  382. a3cX1.pop_request()
  383. finally:
  384. _task_stack.pop()
  385. def test_pickle_app(self):
  386. changes = dict(THE_FOO_BAR='bars',
  387. THE_MII_MAR='jars')
  388. self.app.conf.update(changes)
  389. saved = pickle.dumps(self.app)
  390. self.assertLess(len(saved), 2048)
  391. restored = pickle.loads(saved)
  392. self.assertDictContainsSubset(changes, restored.conf)
  393. def test_worker_main(self):
  394. from celery.bin import worker as worker_bin
  395. class worker(worker_bin.worker):
  396. def execute_from_commandline(self, argv):
  397. return argv
  398. prev, worker_bin.worker = worker_bin.worker, worker
  399. try:
  400. ret = self.app.worker_main(argv=['--version'])
  401. self.assertListEqual(ret, ['--version'])
  402. finally:
  403. worker_bin.worker = prev
  404. def test_config_from_envvar(self):
  405. os.environ['CELERYTEST_CONFIG_OBJECT'] = 'celery.tests.app.test_app'
  406. self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')
  407. self.assertEqual(self.app.conf.THIS_IS_A_KEY, 'this is a value')
  408. def assert_config2(self):
  409. self.assertTrue(self.app.conf.LEAVE_FOR_WORK)
  410. self.assertTrue(self.app.conf.MOMENT_TO_STOP)
  411. self.assertEqual(self.app.conf.CALL_ME_BACK, 123456789)
  412. self.assertFalse(self.app.conf.WANT_ME_TO)
  413. self.assertTrue(self.app.conf.UNDERSTAND_ME)
  414. def test_config_from_object__lazy(self):
  415. conf = ObjectConfig2()
  416. self.app.config_from_object(conf)
  417. self.assertIs(self.app.loader._conf, unconfigured)
  418. self.assertIs(self.app._config_source, conf)
  419. self.assert_config2()
  420. def test_config_from_object__force(self):
  421. self.app.config_from_object(ObjectConfig2(), force=True)
  422. self.assertTrue(self.app.loader._conf)
  423. self.assert_config2()
  424. def test_config_from_object__compat(self):
  425. class Config(object):
  426. CELERY_ALWAYS_EAGER = 44
  427. CELERY_DEFAULT_DELIVERY_MODE = 30
  428. CELERY_TASK_PUBLISH_RETRY = False
  429. self.app.config_from_object(Config)
  430. self.assertEqual(self.app.conf.task_always_eager, 44)
  431. self.assertEqual(self.app.conf.CELERY_ALWAYS_EAGER, 44)
  432. self.assertFalse(self.app.conf.task_publish_retry)
  433. self.assertEqual(self.app.conf.task_default_routing_key, 'celery')
  434. def test_config_from_object__supports_old_names(self):
  435. class Config(object):
  436. task_always_eager = 45
  437. task_default_delivery_mode = 301
  438. self.app.config_from_object(Config())
  439. self.assertEqual(self.app.conf.CELERY_ALWAYS_EAGER, 45)
  440. self.assertEqual(self.app.conf.task_always_eager, 45)
  441. self.assertEqual(self.app.conf.CELERY_DEFAULT_DELIVERY_MODE, 301)
  442. self.assertEqual(self.app.conf.task_default_delivery_mode, 301)
  443. self.assertEqual(self.app.conf.task_default_routing_key, 'testcelery')
  444. def test_config_from_object__namespace_uppercase(self):
  445. class Config(object):
  446. CELERY_TASK_ALWAYS_EAGER = 44
  447. CELERY_TASK_DEFAULT_DELIVERY_MODE = 301
  448. self.app.config_from_object(Config(), namespace='CELERY_')
  449. self.assertEqual(self.app.conf.task_always_eager, 44)
  450. def test_config_from_object__namespace_lowercase(self):
  451. class Config(object):
  452. celery_task_always_eager = 44
  453. celery_task_default_delivery_mode = 301
  454. self.app.config_from_object(Config(), namespace='celery_')
  455. self.assertEqual(self.app.conf.task_always_eager, 44)
  456. def test_config_from_object__mixing_new_and_old(self):
  457. class Config(object):
  458. task_always_eager = 44
  459. worker_agent = 'foo:Agent'
  460. worker_consumer = 'foo:Consumer'
  461. beat_schedule = '/foo/schedule'
  462. CELERY_DEFAULT_DELIVERY_MODE = 301
  463. with self.assertRaises(ImproperlyConfigured) as exc:
  464. self.app.config_from_object(Config(), force=True)
  465. self.assertTrue(
  466. exc.args[0].startswith('CELERY_DEFAULT_DELIVERY_MODE'))
  467. self.assertIn('task_default_delivery_mode', exc.args[0])
  468. def test_config_from_object__mixing_old_and_new(self):
  469. class Config(object):
  470. CELERY_ALWAYS_EAGER = 46
  471. CELERYD_AGENT = 'foo:Agent'
  472. CELERYD_CONSUMER = 'foo:Consumer'
  473. CELERYBEAT_SCHEDULE = '/foo/schedule'
  474. task_default_delivery_mode = 301
  475. with self.assertRaises(ImproperlyConfigured) as exc:
  476. self.app.config_from_object(Config(), force=True)
  477. self.assertTrue(
  478. exc.args[0].startswith('task_default_delivery_mode'))
  479. self.assertIn('CELERY_DEFAULT_DELIVERY_MODE', exc.args[0])
  480. def test_config_from_cmdline(self):
  481. cmdline = ['task_always_eager=no',
  482. 'result_backend=/dev/null',
  483. 'worker_prefetch_multiplier=368',
  484. '.foobarstring=(string)300',
  485. '.foobarint=(int)300',
  486. 'sqlalchemy_engine_options=(dict){"foo": "bar"}']
  487. self.app.config_from_cmdline(cmdline, namespace='worker')
  488. self.assertFalse(self.app.conf.task_always_eager)
  489. self.assertEqual(self.app.conf.result_backend, '/dev/null')
  490. self.assertEqual(self.app.conf.worker_prefetch_multiplier, 368)
  491. self.assertEqual(self.app.conf.worker_foobarstring, '300')
  492. self.assertEqual(self.app.conf.worker_foobarint, 300)
  493. self.assertDictEqual(self.app.conf.sqlalchemy_engine_options,
  494. {'foo': 'bar'})
  495. def test_setting__broker_transport_options(self):
  496. _args = {'foo': 'bar', 'spam': 'baz'}
  497. self.app.config_from_object(Object())
  498. self.assertEqual(self.app.conf.broker_transport_options, {})
  499. self.app.config_from_object(Object(broker_transport_options=_args))
  500. self.assertEqual(self.app.conf.broker_transport_options, _args)
  501. def test_Windows_log_color_disabled(self):
  502. self.app.IS_WINDOWS = True
  503. self.assertFalse(self.app.log.supports_color(True))
  504. def test_WorkController(self):
  505. x = self.app.WorkController
  506. self.assertIs(x.app, self.app)
  507. def test_Worker(self):
  508. x = self.app.Worker
  509. self.assertIs(x.app, self.app)
  510. @depends_on_current_app
  511. def test_AsyncResult(self):
  512. x = self.app.AsyncResult('1')
  513. self.assertIs(x.app, self.app)
  514. r = loads(dumps(x))
  515. # not set as current, so ends up as default app after reduce
  516. self.assertIs(r.app, current_app._get_current_object())
  517. def test_get_active_apps(self):
  518. self.assertTrue(list(_state._get_active_apps()))
  519. app1 = self.Celery()
  520. appid = id(app1)
  521. self.assertIn(app1, _state._get_active_apps())
  522. app1.close()
  523. del(app1)
  524. gc.collect()
  525. # weakref removed from list when app goes out of scope.
  526. with self.assertRaises(StopIteration):
  527. next(app for app in _state._get_active_apps() if id(app) == appid)
  528. def test_config_from_envvar_more(self, key='CELERY_HARNESS_CFG1'):
  529. self.assertFalse(
  530. self.app.config_from_envvar(
  531. 'HDSAJIHWIQHEWQU', force=True, silent=True),
  532. )
  533. with self.assertRaises(ImproperlyConfigured):
  534. self.app.config_from_envvar(
  535. 'HDSAJIHWIQHEWQU', force=True, silent=False,
  536. )
  537. os.environ[key] = __name__ + '.object_config'
  538. self.assertTrue(self.app.config_from_envvar(key, force=True))
  539. self.assertEqual(self.app.conf['FOO'], 1)
  540. self.assertEqual(self.app.conf['BAR'], 2)
  541. os.environ[key] = 'unknown_asdwqe.asdwqewqe'
  542. with self.assertRaises(ImportError):
  543. self.app.config_from_envvar(key, silent=False)
  544. self.assertFalse(
  545. self.app.config_from_envvar(key, force=True, silent=True),
  546. )
  547. os.environ[key] = __name__ + '.dict_config'
  548. self.assertTrue(self.app.config_from_envvar(key, force=True))
  549. self.assertEqual(self.app.conf['FOO'], 10)
  550. self.assertEqual(self.app.conf['BAR'], 20)
  551. @patch('celery.bin.celery.CeleryCommand.execute_from_commandline')
  552. def test_start(self, execute):
  553. self.app.start()
  554. self.assertTrue(execute.called)
  555. def test_mail_admins(self):
  556. class Loader(BaseLoader):
  557. def mail_admins(*args, **kwargs):
  558. return args, kwargs
  559. self.app.loader = Loader(app=self.app)
  560. self.app.conf.admins = None
  561. self.assertFalse(self.app.mail_admins('Subject', 'Body'))
  562. self.app.conf.admins = [('George Costanza', 'george@vandelay.com')]
  563. self.assertTrue(self.app.mail_admins('Subject', 'Body'))
  564. def test_amqp_get_broker_info(self):
  565. self.assertDictContainsSubset(
  566. {'hostname': 'localhost',
  567. 'userid': 'guest',
  568. 'password': 'guest',
  569. 'virtual_host': '/'},
  570. self.app.connection('pyamqp://').info(),
  571. )
  572. self.app.conf.broker_port = 1978
  573. self.app.conf.broker_vhost = 'foo'
  574. self.assertDictContainsSubset(
  575. {'port': 1978, 'virtual_host': 'foo'},
  576. self.app.connection('pyamqp://:1978/foo').info(),
  577. )
  578. conn = self.app.connection('pyamqp:////value')
  579. self.assertDictContainsSubset({'virtual_host': '/value'},
  580. conn.info())
  581. def test_amqp_failover_strategy_selection(self):
  582. # Test passing in a string and make sure the string
  583. # gets there untouched
  584. self.app.conf.broker_failover_strategy = 'foo-bar'
  585. self.assertEqual(
  586. self.app.connection('amqp:////value').failover_strategy,
  587. 'foo-bar',
  588. )
  589. # Try passing in None
  590. self.app.conf.broker_failover_strategy = None
  591. self.assertEqual(
  592. self.app.connection('amqp:////value').failover_strategy,
  593. itertools.cycle,
  594. )
  595. # Test passing in a method
  596. def my_failover_strategy(it):
  597. yield True
  598. self.app.conf.broker_failover_strategy = my_failover_strategy
  599. self.assertEqual(
  600. self.app.connection('amqp:////value').failover_strategy,
  601. my_failover_strategy,
  602. )
  603. def test_after_fork(self):
  604. p = self.app._pool = Mock()
  605. self.app._after_fork(self.app)
  606. p.force_close_all.assert_called_with()
  607. self.assertIsNone(self.app._pool)
  608. self.app._after_fork(self.app)
  609. def test_global_after_fork(self):
  610. app = Mock(name='app')
  611. prev, _state._apps = _state._apps, [app]
  612. try:
  613. obj = Mock(name='obj')
  614. _appbase._global_after_fork(obj)
  615. app._after_fork.assert_called_with(obj)
  616. finally:
  617. _state._apps = prev
  618. @patch('multiprocessing.util', create=True)
  619. def test_global_after_fork__raises(self, util):
  620. app = Mock(name='app')
  621. prev, _state._apps = _state._apps, [app]
  622. try:
  623. obj = Mock(name='obj')
  624. exc = app._after_fork.side_effect = KeyError()
  625. _appbase._global_after_fork(obj)
  626. util._logger.info.assert_called_with(
  627. 'after forker raised exception: %r', exc, exc_info=1)
  628. util._logger = None
  629. _appbase._global_after_fork(obj)
  630. finally:
  631. _state._apps = prev
  632. def test_ensure_after_fork__no_multiprocessing(self):
  633. prev, _appbase.register_after_fork = (
  634. _appbase.register_after_fork, None)
  635. try:
  636. _appbase._after_fork_registered = False
  637. _appbase._ensure_after_fork()
  638. self.assertTrue(_appbase._after_fork_registered)
  639. finally:
  640. _appbase.register_after_fork = prev
  641. def test_canvas(self):
  642. self.assertTrue(self.app.canvas.Signature)
  643. def test_signature(self):
  644. sig = self.app.signature('foo', (1, 2))
  645. self.assertIs(sig.app, self.app)
  646. def test_timezone__none_set(self):
  647. self.app.conf.timezone = None
  648. tz = self.app.timezone
  649. self.assertEqual(tz, timezone.get_timezone('UTC'))
  650. def test_compat_on_configure(self):
  651. on_configure = Mock(name='on_configure')
  652. class CompatApp(Celery):
  653. def on_configure(self, *args, **kwargs):
  654. on_configure(*args, **kwargs)
  655. with CompatApp(set_as_current=False) as app:
  656. app.loader = Mock()
  657. app.loader.conf = {}
  658. app._load_config()
  659. on_configure.assert_called_with()
  660. def test_add_periodic_task(self):
  661. @self.app.task
  662. def add(x, y):
  663. pass
  664. assert not self.app.configured
  665. self.app.add_periodic_task(
  666. 10, self.app.signature('add', (2, 2)),
  667. name='add1', expires=3,
  668. )
  669. self.assertTrue(self.app._pending_periodic_tasks)
  670. assert not self.app.configured
  671. sig2 = add.s(4, 4)
  672. self.assertTrue(self.app.configured)
  673. self.app.add_periodic_task(20, sig2, name='add2', expires=4)
  674. self.assertIn('add1', self.app.conf.beat_schedule)
  675. self.assertIn('add2', self.app.conf.beat_schedule)
  676. def test_pool_no_multiprocessing(self):
  677. with mask_modules('multiprocessing.util'):
  678. pool = self.app.pool
  679. self.assertIs(pool, self.app._pool)
  680. def test_bugreport(self):
  681. self.assertTrue(self.app.bugreport())
  682. def test_send_task__connection_provided(self):
  683. connection = Mock(name='connection')
  684. router = Mock(name='router')
  685. router.route.return_value = {}
  686. self.app.amqp = Mock(name='amqp')
  687. self.app.amqp.Producer.attach_mock(ContextMock(), 'return_value')
  688. self.app.send_task('foo', (1, 2), connection=connection, router=router)
  689. self.app.amqp.Producer.assert_called_with(connection)
  690. self.app.amqp.send_task_message.assert_called_with(
  691. self.app.amqp.Producer(), 'foo',
  692. self.app.amqp.create_task_message())
  693. def test_send_task_sent_event(self):
  694. class Dispatcher(object):
  695. sent = []
  696. def publish(self, type, fields, *args, **kwargs):
  697. self.sent.append((type, fields))
  698. conn = self.app.connection()
  699. chan = conn.channel()
  700. try:
  701. for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):
  702. chan.exchange_declare(e, 'direct', durable=True)
  703. chan.queue_declare(e, durable=True)
  704. chan.queue_bind(e, e, e)
  705. finally:
  706. chan.close()
  707. assert conn.transport_cls == 'memory'
  708. message = self.app.amqp.create_task_message(
  709. 'id', 'footask', (), {}, create_sent_event=True,
  710. )
  711. prod = self.app.amqp.Producer(conn)
  712. dispatcher = Dispatcher()
  713. self.app.amqp.send_task_message(
  714. prod, 'footask', message,
  715. exchange='moo_exchange', routing_key='moo_exchange',
  716. event_dispatcher=dispatcher,
  717. )
  718. self.assertTrue(dispatcher.sent)
  719. self.assertEqual(dispatcher.sent[0][0], 'task-sent')
  720. self.app.amqp.send_task_message(
  721. prod, 'footask', message, event_dispatcher=dispatcher,
  722. exchange='bar_exchange', routing_key='bar_exchange',
  723. )
  724. def test_error_mail_sender(self):
  725. x = ErrorMail.subject % {'name': 'task_name',
  726. 'id': uuid(),
  727. 'exc': 'FOOBARBAZ',
  728. 'hostname': 'lana'}
  729. self.assertTrue(x)
  730. def test_error_mail_disabled(self):
  731. task = Mock()
  732. x = ErrorMail(task)
  733. x.should_send = Mock()
  734. x.should_send.return_value = False
  735. x.send(Mock(), Mock())
  736. self.assertFalse(task.app.mail_admins.called)
  737. def test_select_queues(self):
  738. self.app.amqp = Mock(name='amqp')
  739. self.app.select_queues({'foo', 'bar'})
  740. self.app.amqp.queues.select.assert_called_with({'foo', 'bar'})
  741. class test_defaults(AppCase):
  742. def test_strtobool(self):
  743. for s in ('false', 'no', '0'):
  744. self.assertFalse(defaults.strtobool(s))
  745. for s in ('true', 'yes', '1'):
  746. self.assertTrue(defaults.strtobool(s))
  747. with self.assertRaises(TypeError):
  748. defaults.strtobool('unsure')
  749. class test_debugging_utils(AppCase):
  750. def test_enable_disable_trace(self):
  751. try:
  752. _app.enable_trace()
  753. self.assertEqual(_app.app_or_default, _app._app_or_default_trace)
  754. _app.disable_trace()
  755. self.assertEqual(_app.app_or_default, _app._app_or_default)
  756. finally:
  757. _app.disable_trace()
  758. class test_pyimplementation(AppCase):
  759. def test_platform_python_implementation(self):
  760. with platform_pyimp(lambda: 'Xython'):
  761. self.assertEqual(pyimplementation(), 'Xython')
  762. def test_platform_jython(self):
  763. with platform_pyimp():
  764. with sys_platform('java 1.6.51'):
  765. self.assertIn('Jython', pyimplementation())
  766. def test_platform_pypy(self):
  767. with platform_pyimp():
  768. with sys_platform('darwin'):
  769. with pypy_version((1, 4, 3)):
  770. self.assertIn('PyPy', pyimplementation())
  771. with pypy_version((1, 4, 3, 'a4')):
  772. self.assertIn('PyPy', pyimplementation())
  773. def test_platform_fallback(self):
  774. with platform_pyimp():
  775. with sys_platform('darwin'):
  776. with pypy_version():
  777. self.assertEqual('CPython', pyimplementation())
  778. class test_shared_task(AppCase):
  779. def test_registers_to_all_apps(self):
  780. with self.Celery('xproj', set_as_current=True) as xproj:
  781. xproj.finalize()
  782. @shared_task
  783. def foo():
  784. return 42
  785. @shared_task()
  786. def bar():
  787. return 84
  788. self.assertIs(foo.app, xproj)
  789. self.assertIs(bar.app, xproj)
  790. self.assertTrue(foo._get_current_object())
  791. with self.Celery('yproj', set_as_current=True) as yproj:
  792. self.assertIs(foo.app, yproj)
  793. self.assertIs(bar.app, yproj)
  794. @shared_task()
  795. def baz():
  796. return 168
  797. self.assertIs(baz.app, yproj)