test_app.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038
  1. from __future__ import absolute_import, unicode_literals
  2. import gc
  3. import itertools
  4. import os
  5. from copy import deepcopy
  6. from datetime import datetime, timedelta
  7. from pickle import dumps, loads
  8. import pytest
  9. from case import ContextMock, Mock, mock, patch
  10. from vine import promise
  11. from celery import Celery, _state
  12. from celery import app as _app
  13. from celery import current_app, shared_task
  14. from celery.app import base as _appbase
  15. from celery.app import defaults
  16. from celery.exceptions import ImproperlyConfigured
  17. from celery.five import items, keys
  18. from celery.loaders.base import unconfigured
  19. from celery.platforms import pyimplementation
  20. from celery.utils.collections import DictAttribute
  21. from celery.utils.objects import Bunch
  22. from celery.utils.serialization import pickle
  23. from celery.utils.time import localize, timezone, to_utc
  24. THIS_IS_A_KEY = 'this is a value'
  25. class ObjectConfig(object):
  26. FOO = 1
  27. BAR = 2
  28. object_config = ObjectConfig()
  29. dict_config = {'FOO': 10, 'BAR': 20}
  30. class ObjectConfig2(object):
  31. LEAVE_FOR_WORK = True
  32. MOMENT_TO_STOP = True
  33. CALL_ME_BACK = 123456789
  34. WANT_ME_TO = False
  35. UNDERSTAND_ME = True
  36. class test_module:
  37. def test_default_app(self):
  38. assert _app.default_app == _state.default_app
  39. def test_bugreport(self, app):
  40. assert _app.bugreport(app=app)
  41. class test_task_join_will_block:
  42. def test_task_join_will_block(self, patching):
  43. patching('celery._state._task_join_will_block', 0)
  44. assert _state._task_join_will_block == 0
  45. _state._set_task_join_will_block(True)
  46. assert _state._task_join_will_block is True
  47. # fixture 'app' sets this, so need to use orig_ function
  48. # set there by that fixture.
  49. res = _state.orig_task_join_will_block()
  50. assert res is True
  51. class test_App:
  52. def setup(self):
  53. self.app.add_defaults(deepcopy(self.CELERY_TEST_CONFIG))
  54. def test_now(self):
  55. timezone_setting_value = 'US/Eastern'
  56. tz_utc = timezone.get_timezone('UTC')
  57. tz_us_eastern = timezone.get_timezone(timezone_setting_value)
  58. now = to_utc(datetime.utcnow())
  59. app_now = self.app.now()
  60. assert app_now.tzinfo is tz_utc
  61. assert app_now - now <= timedelta(seconds=1)
  62. # Check that timezone conversion is applied from configuration
  63. self.app.conf.enable_utc = False
  64. self.app.conf.timezone = timezone_setting_value
  65. # timezone is a cached property
  66. del self.app.timezone
  67. app_now = self.app.now()
  68. assert app_now.tzinfo.zone == tz_us_eastern.zone
  69. diff = to_utc(datetime.utcnow()) - localize(app_now, tz_utc)
  70. assert diff <= timedelta(seconds=1)
  71. # Verify that timezone setting overrides enable_utc=on setting
  72. self.app.conf.enable_utc = True
  73. del self.app.timezone
  74. app_now = self.app.now()
  75. assert self.app.timezone == tz_us_eastern
  76. assert app_now.tzinfo.zone == tz_us_eastern.zone
  77. @patch('celery.app.base.set_default_app')
  78. def test_set_default(self, set_default_app):
  79. self.app.set_default()
  80. set_default_app.assert_called_with(self.app)
  81. @patch('celery.security.setup_security')
  82. def test_setup_security(self, setup_security):
  83. self.app.setup_security(
  84. {'json'}, 'key', 'cert', 'store', 'digest', 'serializer')
  85. setup_security.assert_called_with(
  86. {'json'}, 'key', 'cert', 'store', 'digest', 'serializer',
  87. app=self.app)
  88. def test_task_autofinalize_disabled(self):
  89. with self.Celery('xyzibari', autofinalize=False) as app:
  90. @app.task
  91. def ttafd():
  92. return 42
  93. with pytest.raises(RuntimeError):
  94. ttafd()
  95. with self.Celery('xyzibari', autofinalize=False) as app:
  96. @app.task
  97. def ttafd2():
  98. return 42
  99. app.finalize()
  100. assert ttafd2() == 42
  101. def test_registry_autofinalize_disabled(self):
  102. with self.Celery('xyzibari', autofinalize=False) as app:
  103. with pytest.raises(RuntimeError):
  104. app.tasks['celery.chain']
  105. app.finalize()
  106. assert app.tasks['celery.chain']
  107. def test_task(self):
  108. with self.Celery('foozibari') as app:
  109. def fun():
  110. pass
  111. fun.__module__ = '__main__'
  112. task = app.task(fun)
  113. assert task.name == app.main + '.fun'
  114. def test_task_too_many_args(self):
  115. with pytest.raises(TypeError):
  116. self.app.task(Mock(name='fun'), True)
  117. with pytest.raises(TypeError):
  118. self.app.task(Mock(name='fun'), True, 1, 2)
  119. def test_with_config_source(self):
  120. with self.Celery(config_source=ObjectConfig) as app:
  121. assert app.conf.FOO == 1
  122. assert app.conf.BAR == 2
  123. @pytest.mark.usefixtures('depends_on_current_app')
  124. def test_task_windows_execv(self):
  125. prev, _appbase.USING_EXECV = _appbase.USING_EXECV, True
  126. try:
  127. @self.app.task(shared=False)
  128. def foo():
  129. pass
  130. assert foo._get_current_object() # is proxy
  131. finally:
  132. _appbase.USING_EXECV = prev
  133. assert not _appbase.USING_EXECV
  134. def test_task_takes_no_args(self):
  135. with pytest.raises(TypeError):
  136. @self.app.task(1)
  137. def foo():
  138. pass
  139. def test_add_defaults(self):
  140. assert not self.app.configured
  141. _conf = {'foo': 300}
  142. def conf():
  143. return _conf
  144. self.app.add_defaults(conf)
  145. assert conf in self.app._pending_defaults
  146. assert not self.app.configured
  147. assert self.app.conf.foo == 300
  148. assert self.app.configured
  149. assert not self.app._pending_defaults
  150. # defaults not pickled
  151. appr = loads(dumps(self.app))
  152. with pytest.raises(AttributeError):
  153. appr.conf.foo
  154. # add more defaults after configured
  155. conf2 = {'foo': 'BAR'}
  156. self.app.add_defaults(conf2)
  157. assert self.app.conf.foo == 'BAR'
  158. assert _conf in self.app.conf.defaults
  159. assert conf2 in self.app.conf.defaults
  160. def test_connection_or_acquire(self):
  161. with self.app.connection_or_acquire(block=True):
  162. assert self.app.pool._dirty
  163. with self.app.connection_or_acquire(pool=False):
  164. assert not self.app.pool._dirty
  165. def test_using_v1_reduce(self):
  166. self.app._using_v1_reduce = True
  167. assert loads(dumps(self.app))
  168. def test_autodiscover_tasks_force(self):
  169. self.app.loader.autodiscover_tasks = Mock()
  170. self.app.autodiscover_tasks(['proj.A', 'proj.B'], force=True)
  171. self.app.loader.autodiscover_tasks.assert_called_with(
  172. ['proj.A', 'proj.B'], 'tasks',
  173. )
  174. self.app.loader.autodiscover_tasks = Mock()
  175. def lazy_list():
  176. return ['proj.A', 'proj.B']
  177. self.app.autodiscover_tasks(
  178. lazy_list,
  179. related_name='george',
  180. force=True,
  181. )
  182. self.app.loader.autodiscover_tasks.assert_called_with(
  183. ['proj.A', 'proj.B'], 'george',
  184. )
  185. def test_autodiscover_tasks_lazy(self):
  186. with patch('celery.signals.import_modules') as import_modules:
  187. def lazy_list():
  188. return [1, 2, 3]
  189. self.app.autodiscover_tasks(lazy_list)
  190. import_modules.connect.assert_called()
  191. prom = import_modules.connect.call_args[0][0]
  192. assert isinstance(prom, promise)
  193. assert prom.fun == self.app._autodiscover_tasks
  194. assert prom.args[0](), [1, 2 == 3]
  195. def test_autodiscover_tasks__no_packages(self):
  196. fixup1 = Mock(name='fixup')
  197. fixup2 = Mock(name='fixup')
  198. self.app._autodiscover_tasks_from_names = Mock(name='auto')
  199. self.app._fixups = [fixup1, fixup2]
  200. fixup1.autodiscover_tasks.return_value = ['A', 'B', 'C']
  201. fixup2.autodiscover_tasks.return_value = ['D', 'E', 'F']
  202. self.app.autodiscover_tasks(force=True)
  203. self.app._autodiscover_tasks_from_names.assert_called_with(
  204. ['A', 'B', 'C', 'D', 'E', 'F'], related_name='tasks',
  205. )
  206. def test_with_broker(self, patching):
  207. patching.setenv('CELERY_BROKER_URL', '')
  208. with self.Celery(broker='foo://baribaz') as app:
  209. assert app.conf.broker_url == 'foo://baribaz'
  210. def test_pending_configuration__setattr(self):
  211. with self.Celery(broker='foo://bar') as app:
  212. app.conf.task_default_delivery_mode = 44
  213. app.conf.worker_agent = 'foo:Bar'
  214. assert not app.configured
  215. assert app.conf.worker_agent == 'foo:Bar'
  216. assert app.conf.broker_url == 'foo://bar'
  217. assert app._preconf['worker_agent'] == 'foo:Bar'
  218. assert app.configured
  219. reapp = pickle.loads(pickle.dumps(app))
  220. assert reapp._preconf['worker_agent'] == 'foo:Bar'
  221. assert not reapp.configured
  222. assert reapp.conf.worker_agent == 'foo:Bar'
  223. assert reapp.configured
  224. assert reapp.conf.broker_url == 'foo://bar'
  225. assert reapp._preconf['worker_agent'] == 'foo:Bar'
  226. def test_pending_configuration__update(self):
  227. with self.Celery(broker='foo://bar') as app:
  228. app.conf.update(
  229. task_default_delivery_mode=44,
  230. worker_agent='foo:Bar',
  231. )
  232. assert not app.configured
  233. assert app.conf.worker_agent == 'foo:Bar'
  234. assert app.conf.broker_url == 'foo://bar'
  235. assert app._preconf['worker_agent'] == 'foo:Bar'
  236. def test_pending_configuration__compat_settings(self):
  237. with self.Celery(broker='foo://bar', backend='foo') as app:
  238. app.conf.update(
  239. CELERY_ALWAYS_EAGER=4,
  240. CELERY_DEFAULT_DELIVERY_MODE=63,
  241. CELERYD_AGENT='foo:Barz',
  242. )
  243. assert app.conf.task_always_eager == 4
  244. assert app.conf.task_default_delivery_mode == 63
  245. assert app.conf.worker_agent == 'foo:Barz'
  246. assert app.conf.broker_url == 'foo://bar'
  247. assert app.conf.result_backend == 'foo'
  248. def test_pending_configuration__compat_settings_mixing(self):
  249. with self.Celery(broker='foo://bar', backend='foo') as app:
  250. app.conf.update(
  251. CELERY_ALWAYS_EAGER=4,
  252. CELERY_DEFAULT_DELIVERY_MODE=63,
  253. CELERYD_AGENT='foo:Barz',
  254. worker_consumer='foo:Fooz',
  255. )
  256. with pytest.raises(ImproperlyConfigured):
  257. assert app.conf.task_always_eager == 4
  258. def test_pending_configuration__django_settings(self):
  259. with self.Celery(broker='foo://bar', backend='foo') as app:
  260. app.config_from_object(DictAttribute(Bunch(
  261. CELERY_TASK_ALWAYS_EAGER=4,
  262. CELERY_TASK_DEFAULT_DELIVERY_MODE=63,
  263. CELERY_WORKER_AGENT='foo:Barz',
  264. CELERY_RESULT_SERIALIZER='pickle',
  265. )), namespace='CELERY')
  266. assert app.conf.result_serializer == 'pickle'
  267. assert app.conf.CELERY_RESULT_SERIALIZER == 'pickle'
  268. assert app.conf.task_always_eager == 4
  269. assert app.conf.task_default_delivery_mode == 63
  270. assert app.conf.worker_agent == 'foo:Barz'
  271. assert app.conf.broker_url == 'foo://bar'
  272. assert app.conf.result_backend == 'foo'
  273. def test_pending_configuration__compat_settings_mixing_new(self):
  274. with self.Celery(broker='foo://bar', backend='foo') as app:
  275. app.conf.update(
  276. task_always_eager=4,
  277. task_default_delivery_mode=63,
  278. worker_agent='foo:Barz',
  279. CELERYD_CONSUMER='foo:Fooz',
  280. CELERYD_AUTOSCALER='foo:Xuzzy',
  281. )
  282. with pytest.raises(ImproperlyConfigured):
  283. assert app.conf.worker_consumer == 'foo:Fooz'
  284. def test_pending_configuration__compat_settings_mixing_alt(self):
  285. with self.Celery(broker='foo://bar', backend='foo') as app:
  286. app.conf.update(
  287. task_always_eager=4,
  288. task_default_delivery_mode=63,
  289. worker_agent='foo:Barz',
  290. CELERYD_CONSUMER='foo:Fooz',
  291. worker_consumer='foo:Fooz',
  292. CELERYD_AUTOSCALER='foo:Xuzzy',
  293. worker_autoscaler='foo:Xuzzy'
  294. )
  295. def test_pending_configuration__setdefault(self):
  296. with self.Celery(broker='foo://bar') as app:
  297. assert not app.configured
  298. app.conf.setdefault('worker_agent', 'foo:Bar')
  299. assert not app.configured
  300. def test_pending_configuration__iter(self):
  301. with self.Celery(broker='foo://bar') as app:
  302. app.conf.worker_agent = 'foo:Bar'
  303. assert not app.configured
  304. assert list(keys(app.conf))
  305. assert app.configured
  306. assert 'worker_agent' in app.conf
  307. assert dict(app.conf)
  308. def test_pending_configuration__raises_ImproperlyConfigured(self):
  309. with self.Celery(set_as_current=False) as app:
  310. app.conf.worker_agent = 'foo://bar'
  311. app.conf.task_default_delivery_mode = 44
  312. app.conf.CELERY_ALWAYS_EAGER = 5
  313. with pytest.raises(ImproperlyConfigured):
  314. app.finalize()
  315. with self.Celery() as app:
  316. assert not self.app.conf.task_always_eager
  317. def test_repr(self):
  318. assert repr(self.app)
  319. def test_custom_task_registry(self):
  320. with self.Celery(tasks=self.app.tasks) as app2:
  321. assert app2.tasks is self.app.tasks
  322. def test_include_argument(self):
  323. with self.Celery(include=('foo', 'bar.foo')) as app:
  324. assert app.conf.include, ('foo' == 'bar.foo')
  325. def test_set_as_current(self):
  326. current = _state._tls.current_app
  327. try:
  328. app = self.Celery(set_as_current=True)
  329. assert _state._tls.current_app is app
  330. finally:
  331. _state._tls.current_app = current
  332. def test_current_task(self):
  333. @self.app.task
  334. def foo(shared=False):
  335. pass
  336. _state._task_stack.push(foo)
  337. try:
  338. assert self.app.current_task.name == foo.name
  339. finally:
  340. _state._task_stack.pop()
  341. def test_task_not_shared(self):
  342. with patch('celery.app.base.connect_on_app_finalize') as sh:
  343. @self.app.task(shared=False)
  344. def foo():
  345. pass
  346. sh.assert_not_called()
  347. def test_task_compat_with_filter(self):
  348. with self.Celery() as app:
  349. check = Mock()
  350. def filter(task):
  351. check(task)
  352. return task
  353. @app.task(filter=filter, shared=False)
  354. def foo():
  355. pass
  356. check.assert_called_with(foo)
  357. def test_task_with_filter(self):
  358. with self.Celery() as app:
  359. check = Mock()
  360. def filter(task):
  361. check(task)
  362. return task
  363. assert not _appbase.USING_EXECV
  364. @app.task(filter=filter, shared=False)
  365. def foo():
  366. pass
  367. check.assert_called_with(foo)
  368. def test_task_sets_main_name_MP_MAIN_FILE(self):
  369. from celery.utils import imports as _imports
  370. _imports.MP_MAIN_FILE = __file__
  371. try:
  372. with self.Celery('xuzzy') as app:
  373. @app.task
  374. def foo():
  375. pass
  376. assert foo.name == 'xuzzy.foo'
  377. finally:
  378. _imports.MP_MAIN_FILE = None
  379. def test_annotate_decorator(self):
  380. from celery.app.task import Task
  381. class adX(Task):
  382. def run(self, y, z, x):
  383. return y, z, x
  384. check = Mock()
  385. def deco(fun):
  386. def _inner(*args, **kwargs):
  387. check(*args, **kwargs)
  388. return fun(*args, **kwargs)
  389. return _inner
  390. self.app.conf.task_annotations = {
  391. adX.name: {'@__call__': deco}
  392. }
  393. adX.bind(self.app)
  394. assert adX.app is self.app
  395. i = adX()
  396. i(2, 4, x=3)
  397. check.assert_called_with(i, 2, 4, x=3)
  398. i.annotate()
  399. i.annotate()
  400. def test_apply_async_adds_children(self):
  401. from celery._state import _task_stack
  402. @self.app.task(bind=True, shared=False)
  403. def a3cX1(self):
  404. pass
  405. @self.app.task(bind=True, shared=False)
  406. def a3cX2(self):
  407. pass
  408. _task_stack.push(a3cX1)
  409. try:
  410. a3cX1.push_request(called_directly=False)
  411. try:
  412. res = a3cX2.apply_async(add_to_parent=True)
  413. assert res in a3cX1.request.children
  414. finally:
  415. a3cX1.pop_request()
  416. finally:
  417. _task_stack.pop()
  418. def test_pickle_app(self):
  419. changes = {'THE_FOO_BAR': 'bars',
  420. 'THE_MII_MAR': 'jars'}
  421. self.app.conf.update(changes)
  422. saved = pickle.dumps(self.app)
  423. assert len(saved) < 2048
  424. restored = pickle.loads(saved)
  425. for key, value in items(changes):
  426. assert restored.conf[key] == value
  427. def test_worker_main(self):
  428. from celery.bin import worker as worker_bin
  429. class worker(worker_bin.worker):
  430. def execute_from_commandline(self, argv):
  431. return argv
  432. prev, worker_bin.worker = worker_bin.worker, worker
  433. try:
  434. ret = self.app.worker_main(argv=['--version'])
  435. assert ret == ['--version']
  436. finally:
  437. worker_bin.worker = prev
  438. def test_config_from_envvar(self):
  439. os.environ['CELERYTEST_CONFIG_OBJECT'] = 't.unit.app.test_app'
  440. self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')
  441. assert self.app.conf.THIS_IS_A_KEY == 'this is a value'
  442. def assert_config2(self):
  443. assert self.app.conf.LEAVE_FOR_WORK
  444. assert self.app.conf.MOMENT_TO_STOP
  445. assert self.app.conf.CALL_ME_BACK == 123456789
  446. assert not self.app.conf.WANT_ME_TO
  447. assert self.app.conf.UNDERSTAND_ME
  448. def test_config_from_object__lazy(self):
  449. conf = ObjectConfig2()
  450. self.app.config_from_object(conf)
  451. assert self.app.loader._conf is unconfigured
  452. assert self.app._config_source is conf
  453. self.assert_config2()
  454. def test_config_from_object__force(self):
  455. self.app.config_from_object(ObjectConfig2(), force=True)
  456. assert self.app.loader._conf
  457. self.assert_config2()
  458. def test_config_from_object__compat(self):
  459. class Config(object):
  460. CELERY_ALWAYS_EAGER = 44
  461. CELERY_DEFAULT_DELIVERY_MODE = 30
  462. CELERY_TASK_PUBLISH_RETRY = False
  463. self.app.config_from_object(Config)
  464. assert self.app.conf.task_always_eager == 44
  465. assert self.app.conf.CELERY_ALWAYS_EAGER == 44
  466. assert not self.app.conf.task_publish_retry
  467. assert self.app.conf.task_default_routing_key == 'testcelery'
  468. def test_config_from_object__supports_old_names(self):
  469. class Config(object):
  470. task_always_eager = 45
  471. task_default_delivery_mode = 301
  472. self.app.config_from_object(Config())
  473. assert self.app.conf.CELERY_ALWAYS_EAGER == 45
  474. assert self.app.conf.task_always_eager == 45
  475. assert self.app.conf.CELERY_DEFAULT_DELIVERY_MODE == 301
  476. assert self.app.conf.task_default_delivery_mode == 301
  477. assert self.app.conf.task_default_routing_key == 'testcelery'
  478. def test_config_from_object__namespace_uppercase(self):
  479. class Config(object):
  480. CELERY_TASK_ALWAYS_EAGER = 44
  481. CELERY_TASK_DEFAULT_DELIVERY_MODE = 301
  482. self.app.config_from_object(Config(), namespace='CELERY')
  483. assert self.app.conf.task_always_eager == 44
  484. def test_config_from_object__namespace_lowercase(self):
  485. class Config(object):
  486. celery_task_always_eager = 44
  487. celery_task_default_delivery_mode = 301
  488. self.app.config_from_object(Config(), namespace='celery')
  489. assert self.app.conf.task_always_eager == 44
  490. def test_config_from_object__mixing_new_and_old(self):
  491. class Config(object):
  492. task_always_eager = 44
  493. worker_agent = 'foo:Agent'
  494. worker_consumer = 'foo:Consumer'
  495. beat_schedule = '/foo/schedule'
  496. CELERY_DEFAULT_DELIVERY_MODE = 301
  497. with pytest.raises(ImproperlyConfigured) as exc:
  498. self.app.config_from_object(Config(), force=True)
  499. assert exc.args[0].startswith('CELERY_DEFAULT_DELIVERY_MODE')
  500. assert 'task_default_delivery_mode' in exc.args[0]
  501. def test_config_from_object__mixing_old_and_new(self):
  502. class Config(object):
  503. CELERY_ALWAYS_EAGER = 46
  504. CELERYD_AGENT = 'foo:Agent'
  505. CELERYD_CONSUMER = 'foo:Consumer'
  506. CELERYBEAT_SCHEDULE = '/foo/schedule'
  507. task_default_delivery_mode = 301
  508. with pytest.raises(ImproperlyConfigured) as exc:
  509. self.app.config_from_object(Config(), force=True)
  510. assert exc.args[0].startswith('task_default_delivery_mode')
  511. assert 'CELERY_DEFAULT_DELIVERY_MODE' in exc.args[0]
  512. def test_config_from_cmdline(self):
  513. cmdline = ['task_always_eager=no',
  514. 'result_backend=/dev/null',
  515. 'worker_prefetch_multiplier=368',
  516. '.foobarstring=(string)300',
  517. '.foobarint=(int)300',
  518. 'database_engine_options=(dict){"foo": "bar"}']
  519. self.app.config_from_cmdline(cmdline, namespace='worker')
  520. assert not self.app.conf.task_always_eager
  521. assert self.app.conf.result_backend == '/dev/null'
  522. assert self.app.conf.worker_prefetch_multiplier == 368
  523. assert self.app.conf.worker_foobarstring == '300'
  524. assert self.app.conf.worker_foobarint == 300
  525. assert self.app.conf.database_engine_options == {'foo': 'bar'}
  526. def test_setting__broker_transport_options(self):
  527. _args = {'foo': 'bar', 'spam': 'baz'}
  528. self.app.config_from_object(Bunch())
  529. assert self.app.conf.broker_transport_options == \
  530. {'polling_interval': 0.1}
  531. self.app.config_from_object(Bunch(broker_transport_options=_args))
  532. assert self.app.conf.broker_transport_options == _args
  533. def test_Windows_log_color_disabled(self):
  534. self.app.IS_WINDOWS = True
  535. assert not self.app.log.supports_color(True)
  536. def test_WorkController(self):
  537. x = self.app.WorkController
  538. assert x.app is self.app
  539. def test_Worker(self):
  540. x = self.app.Worker
  541. assert x.app is self.app
  542. @pytest.mark.usefixtures('depends_on_current_app')
  543. def test_AsyncResult(self):
  544. x = self.app.AsyncResult('1')
  545. assert x.app is self.app
  546. r = loads(dumps(x))
  547. # not set as current, so ends up as default app after reduce
  548. assert r.app is current_app._get_current_object()
  549. def test_get_active_apps(self):
  550. assert list(_state._get_active_apps())
  551. app1 = self.Celery()
  552. appid = id(app1)
  553. assert app1 in _state._get_active_apps()
  554. app1.close()
  555. del(app1)
  556. gc.collect()
  557. # weakref removed from list when app goes out of scope.
  558. with pytest.raises(StopIteration):
  559. next(app for app in _state._get_active_apps() if id(app) == appid)
  560. def test_config_from_envvar_more(self, key='CELERY_HARNESS_CFG1'):
  561. assert not self.app.config_from_envvar(
  562. 'HDSAJIHWIQHEWQU', force=True, silent=True)
  563. with pytest.raises(ImproperlyConfigured):
  564. self.app.config_from_envvar(
  565. 'HDSAJIHWIQHEWQU', force=True, silent=False,
  566. )
  567. os.environ[key] = __name__ + '.object_config'
  568. assert self.app.config_from_envvar(key, force=True)
  569. assert self.app.conf['FOO'] == 1
  570. assert self.app.conf['BAR'] == 2
  571. os.environ[key] = 'unknown_asdwqe.asdwqewqe'
  572. with pytest.raises(ImportError):
  573. self.app.config_from_envvar(key, silent=False)
  574. assert not self.app.config_from_envvar(key, force=True, silent=True)
  575. os.environ[key] = __name__ + '.dict_config'
  576. assert self.app.config_from_envvar(key, force=True)
  577. assert self.app.conf['FOO'] == 10
  578. assert self.app.conf['BAR'] == 20
  579. @patch('celery.bin.celery.CeleryCommand.execute_from_commandline')
  580. def test_start(self, execute):
  581. self.app.start()
  582. execute.assert_called()
  583. @pytest.mark.parametrize('url,expected_fields', [
  584. ('pyamqp://', {
  585. 'hostname': 'localhost',
  586. 'userid': 'guest',
  587. 'password': 'guest',
  588. 'virtual_host': '/',
  589. }),
  590. ('pyamqp://:1978/foo', {
  591. 'port': 1978,
  592. 'virtual_host': 'foo',
  593. }),
  594. ('pyamqp:////value', {
  595. 'virtual_host': '/value',
  596. })
  597. ])
  598. def test_amqp_get_broker_info(self, url, expected_fields):
  599. info = self.app.connection(url).info()
  600. for key, expected_value in items(expected_fields):
  601. assert info[key] == expected_value
  602. def test_amqp_failover_strategy_selection(self):
  603. # Test passing in a string and make sure the string
  604. # gets there untouched
  605. self.app.conf.broker_failover_strategy = 'foo-bar'
  606. assert self.app.connection('amqp:////value') \
  607. .failover_strategy == 'foo-bar'
  608. # Try passing in None
  609. self.app.conf.broker_failover_strategy = None
  610. assert self.app.connection('amqp:////value') \
  611. .failover_strategy == itertools.cycle
  612. # Test passing in a method
  613. def my_failover_strategy(it):
  614. yield True
  615. self.app.conf.broker_failover_strategy = my_failover_strategy
  616. assert self.app.connection('amqp:////value') \
  617. .failover_strategy == my_failover_strategy
  618. def test_amqp_heartbeat_settings(self):
  619. # Test default broker_heartbeat value
  620. assert self.app.connection('amqp:////value') \
  621. .heartbeat == 0
  622. # Test passing heartbeat through app configuration
  623. self.app.conf.broker_heartbeat = 60
  624. assert self.app.connection('amqp:////value') \
  625. .heartbeat == 60
  626. # Test passing heartbeat as connection argument
  627. assert self.app.connection('amqp:////value', heartbeat=30) \
  628. .heartbeat == 30
  629. def test_after_fork(self):
  630. self.app._pool = Mock()
  631. self.app.on_after_fork = Mock(name='on_after_fork')
  632. self.app._after_fork()
  633. assert self.app._pool is None
  634. self.app.on_after_fork.send.assert_called_with(sender=self.app)
  635. self.app._after_fork()
  636. def test_global_after_fork(self):
  637. self.app._after_fork = Mock(name='_after_fork')
  638. _appbase._after_fork_cleanup_app(self.app)
  639. self.app._after_fork.assert_called_with()
  640. @patch('celery.app.base.logger')
  641. def test_after_fork_cleanup_app__raises(self, logger):
  642. self.app._after_fork = Mock(name='_after_fork')
  643. exc = self.app._after_fork.side_effect = KeyError()
  644. _appbase._after_fork_cleanup_app(self.app)
  645. logger.info.assert_called_with(
  646. 'after forker raised exception: %r', exc, exc_info=1)
  647. def test_ensure_after_fork__no_multiprocessing(self):
  648. prev, _appbase.register_after_fork = (
  649. _appbase.register_after_fork, None)
  650. try:
  651. self.app._after_fork_registered = False
  652. self.app._ensure_after_fork()
  653. assert self.app._after_fork_registered
  654. finally:
  655. _appbase.register_after_fork = prev
  656. def test_canvas(self):
  657. assert self.app._canvas.Signature
  658. def test_signature(self):
  659. sig = self.app.signature('foo', (1, 2))
  660. assert sig.app is self.app
  661. def test_timezone__none_set(self):
  662. self.app.conf.timezone = None
  663. self.app.conf.enable_utc = True
  664. assert self.app.timezone == timezone.utc
  665. del self.app.timezone
  666. self.app.conf.enable_utc = False
  667. assert self.app.timezone == timezone.local
  668. def test_uses_utc_timezone(self):
  669. self.app.conf.timezone = None
  670. self.app.conf.enable_utc = True
  671. assert self.app.uses_utc_timezone() is True
  672. self.app.conf.enable_utc = False
  673. del self.app.timezone
  674. assert self.app.uses_utc_timezone() is False
  675. self.app.conf.timezone = 'US/Eastern'
  676. del self.app.timezone
  677. assert self.app.uses_utc_timezone() is False
  678. self.app.conf.timezone = 'UTC'
  679. del self.app.timezone
  680. assert self.app.uses_utc_timezone() is True
  681. def test_compat_on_configure(self):
  682. _on_configure = Mock(name='on_configure')
  683. class CompatApp(Celery):
  684. def on_configure(self, *args, **kwargs):
  685. # on pypy3 if named on_configure the class function
  686. # will be called, instead of the mock defined above,
  687. # so we add the underscore.
  688. _on_configure(*args, **kwargs)
  689. with CompatApp(set_as_current=False) as app:
  690. app.loader = Mock()
  691. app.loader.conf = {}
  692. app._load_config()
  693. _on_configure.assert_called_with()
  694. def test_add_periodic_task(self):
  695. @self.app.task
  696. def add(x, y):
  697. pass
  698. assert not self.app.configured
  699. self.app.add_periodic_task(
  700. 10, self.app.signature('add', (2, 2)),
  701. name='add1', expires=3,
  702. )
  703. assert self.app._pending_periodic_tasks
  704. assert not self.app.configured
  705. sig2 = add.s(4, 4)
  706. assert self.app.configured
  707. self.app.add_periodic_task(20, sig2, name='add2', expires=4)
  708. assert 'add1' in self.app.conf.beat_schedule
  709. assert 'add2' in self.app.conf.beat_schedule
  710. def test_pool_no_multiprocessing(self):
  711. with mock.mask_modules('multiprocessing.util'):
  712. pool = self.app.pool
  713. assert pool is self.app._pool
  714. def test_bugreport(self):
  715. assert self.app.bugreport()
  716. def test_send_task__connection_provided(self):
  717. connection = Mock(name='connection')
  718. router = Mock(name='router')
  719. router.route.return_value = {}
  720. self.app.amqp = Mock(name='amqp')
  721. self.app.amqp.Producer.attach_mock(ContextMock(), 'return_value')
  722. self.app.send_task('foo', (1, 2), connection=connection, router=router)
  723. self.app.amqp.Producer.assert_called_with(
  724. connection, auto_declare=False)
  725. self.app.amqp.send_task_message.assert_called_with(
  726. self.app.amqp.Producer(), 'foo',
  727. self.app.amqp.create_task_message())
  728. def test_send_task_sent_event(self):
  729. class Dispatcher(object):
  730. sent = []
  731. def publish(self, type, fields, *args, **kwargs):
  732. self.sent.append((type, fields))
  733. conn = self.app.connection()
  734. chan = conn.channel()
  735. try:
  736. for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):
  737. chan.exchange_declare(e, 'direct', durable=True)
  738. chan.queue_declare(e, durable=True)
  739. chan.queue_bind(e, e, e)
  740. finally:
  741. chan.close()
  742. assert conn.transport_cls == 'memory'
  743. message = self.app.amqp.create_task_message(
  744. 'id', 'footask', (), {}, create_sent_event=True,
  745. )
  746. prod = self.app.amqp.Producer(conn)
  747. dispatcher = Dispatcher()
  748. self.app.amqp.send_task_message(
  749. prod, 'footask', message,
  750. exchange='moo_exchange', routing_key='moo_exchange',
  751. event_dispatcher=dispatcher,
  752. )
  753. assert dispatcher.sent
  754. assert dispatcher.sent[0][0] == 'task-sent'
  755. self.app.amqp.send_task_message(
  756. prod, 'footask', message, event_dispatcher=dispatcher,
  757. exchange='bar_exchange', routing_key='bar_exchange',
  758. )
  759. def test_select_queues(self):
  760. self.app.amqp = Mock(name='amqp')
  761. self.app.select_queues({'foo', 'bar'})
  762. self.app.amqp.queues.select.assert_called_with({'foo', 'bar'})
  763. def test_Beat(self):
  764. from celery.apps.beat import Beat
  765. beat = self.app.Beat()
  766. assert isinstance(beat, Beat)
  767. def test_registry_cls(self):
  768. class TaskRegistry(self.app.registry_cls):
  769. pass
  770. class CustomCelery(type(self.app)):
  771. registry_cls = TaskRegistry
  772. app = CustomCelery(set_as_current=False)
  773. assert isinstance(app.tasks, TaskRegistry)
  774. class test_defaults:
  775. def test_strtobool(self):
  776. for s in ('false', 'no', '0'):
  777. assert not defaults.strtobool(s)
  778. for s in ('true', 'yes', '1'):
  779. assert defaults.strtobool(s)
  780. with pytest.raises(TypeError):
  781. defaults.strtobool('unsure')
  782. class test_debugging_utils:
  783. def test_enable_disable_trace(self):
  784. try:
  785. _app.enable_trace()
  786. assert _state.app_or_default == _state._app_or_default_trace
  787. _app.disable_trace()
  788. assert _state.app_or_default == _state._app_or_default
  789. finally:
  790. _app.disable_trace()
  791. class test_pyimplementation:
  792. def test_platform_python_implementation(self):
  793. with mock.platform_pyimp(lambda: 'Xython'):
  794. assert pyimplementation() == 'Xython'
  795. def test_platform_jython(self):
  796. with mock.platform_pyimp():
  797. with mock.sys_platform('java 1.6.51'):
  798. assert 'Jython' in pyimplementation()
  799. def test_platform_pypy(self):
  800. with mock.platform_pyimp():
  801. with mock.sys_platform('darwin'):
  802. with mock.pypy_version((1, 4, 3)):
  803. assert 'PyPy' in pyimplementation()
  804. with mock.pypy_version((1, 4, 3, 'a4')):
  805. assert 'PyPy' in pyimplementation()
  806. def test_platform_fallback(self):
  807. with mock.platform_pyimp():
  808. with mock.sys_platform('darwin'):
  809. with mock.pypy_version():
  810. assert 'CPython' == pyimplementation()
  811. class test_shared_task:
  812. def test_registers_to_all_apps(self):
  813. with self.Celery('xproj', set_as_current=True) as xproj:
  814. xproj.finalize()
  815. @shared_task
  816. def foo():
  817. return 42
  818. @shared_task()
  819. def bar():
  820. return 84
  821. assert foo.app is xproj
  822. assert bar.app is xproj
  823. assert foo._get_current_object()
  824. with self.Celery('yproj', set_as_current=True) as yproj:
  825. assert foo.app is yproj
  826. assert bar.app is yproj
  827. @shared_task()
  828. def baz():
  829. return 168
  830. assert baz.app is yproj