test_app.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import os
  4. from mock import Mock, patch
  5. from pickle import loads, dumps
  6. from kombu import Exchange
  7. from celery import Celery
  8. from celery import app as _app
  9. from celery import _state
  10. from celery.app import defaults
  11. from celery.loaders.base import BaseLoader
  12. from celery.platforms import pyimplementation
  13. from celery.utils.serialization import pickle
  14. from celery.tests import config
  15. from celery.tests.utils import (Case, mask_modules, platform_pyimp,
  16. sys_platform, pypy_version)
  17. from celery.utils import uuid
  18. from celery.utils.mail import ErrorMail
  19. THIS_IS_A_KEY = 'this is a value'
  20. class Object(object):
  21. def __init__(self, **kwargs):
  22. for key, value in kwargs.items():
  23. setattr(self, key, value)
  24. def _get_test_config():
  25. return dict((key, getattr(config, key))
  26. for key in dir(config)
  27. if key.isupper() and not key.startswith('_'))
  28. test_config = _get_test_config()
  29. class test_module(Case):
  30. def test_default_app(self):
  31. self.assertEqual(_app.default_app, _state.default_app)
  32. def test_bugreport(self):
  33. self.assertTrue(_app.bugreport())
  34. class test_App(Case):
  35. def setUp(self):
  36. self.app = Celery(set_as_current=False)
  37. self.app.conf.update(test_config)
  38. def test_task(self):
  39. app = Celery('foozibari', set_as_current=False)
  40. def fun():
  41. pass
  42. fun.__module__ = '__main__'
  43. task = app.task(fun)
  44. self.assertEqual(task.name, app.main + '.fun')
  45. def test_with_broker(self):
  46. prev = os.environ.get('CELERY_BROKER_URL')
  47. os.environ.pop('CELERY_BROKER_URL', None)
  48. try:
  49. app = Celery(set_as_current=False, broker='foo://baribaz')
  50. self.assertEqual(app.conf.BROKER_HOST, 'foo://baribaz')
  51. finally:
  52. os.environ['CELERY_BROKER_URL'] = prev
  53. def test_repr(self):
  54. self.assertTrue(repr(self.app))
  55. def test_custom_task_registry(self):
  56. app1 = Celery(set_as_current=False)
  57. app2 = Celery(set_as_current=False, tasks=app1.tasks)
  58. self.assertIs(app2.tasks, app1.tasks)
  59. def test_include_argument(self):
  60. app = Celery(set_as_current=False, include=('foo', 'bar.foo'))
  61. self.assertEqual(app.conf.CELERY_IMPORTS, ('foo', 'bar.foo'))
  62. def test_set_as_current(self):
  63. current = _state._tls.current_app
  64. try:
  65. app = Celery(set_as_current=True)
  66. self.assertIs(_state._tls.current_app, app)
  67. finally:
  68. _state._tls.current_app = current
  69. def test_current_task(self):
  70. app = Celery(set_as_current=False)
  71. @app.task
  72. def foo():
  73. pass
  74. _state._task_stack.push(foo)
  75. try:
  76. self.assertEqual(app.current_task.name, foo.name)
  77. finally:
  78. _state._task_stack.pop()
  79. def test_task_not_shared(self):
  80. with patch('celery.app.base.shared_task') as shared_task:
  81. app = Celery(set_as_current=False)
  82. @app.task(shared=False)
  83. def foo():
  84. pass
  85. self.assertFalse(shared_task.called)
  86. def test_task_compat_with_filter(self):
  87. app = Celery(set_as_current=False, accept_magic_kwargs=True)
  88. check = Mock()
  89. def filter(task):
  90. check(task)
  91. return task
  92. @app.task(filter=filter)
  93. def foo():
  94. pass
  95. check.assert_called_with(foo)
  96. def test_task_with_filter(self):
  97. app = Celery(set_as_current=False, accept_magic_kwargs=False)
  98. check = Mock()
  99. def filter(task):
  100. check(task)
  101. return task
  102. @app.task(filter=filter)
  103. def foo():
  104. pass
  105. check.assert_called_with(foo)
  106. def test_task_sets_main_name_MP_MAIN_FILE(self):
  107. from celery import utils as _utils
  108. _utils.MP_MAIN_FILE = __file__
  109. try:
  110. app = Celery('xuzzy', set_as_current=False)
  111. @app.task
  112. def foo():
  113. pass
  114. self.assertEqual(foo.name, 'xuzzy.foo')
  115. finally:
  116. _utils.MP_MAIN_FILE = None
  117. def test_base_task_inherits_magic_kwargs_from_app(self):
  118. from celery.task import Task as OldTask
  119. class timkX(OldTask):
  120. abstract = True
  121. app = Celery(set_as_current=False, accept_magic_kwargs=True)
  122. timkX.bind(app)
  123. # see #918
  124. self.assertFalse(timkX.accept_magic_kwargs)
  125. from celery import Task as NewTask
  126. class timkY(NewTask):
  127. abstract = True
  128. timkY.bind(app)
  129. self.assertFalse(timkY.accept_magic_kwargs)
  130. def test_annotate_decorator(self):
  131. from celery.app.task import Task
  132. class adX(Task):
  133. abstract = True
  134. def run(self, y, z, x):
  135. return y, z, x
  136. check = Mock()
  137. def deco(fun):
  138. def _inner(*args, **kwargs):
  139. check(*args, **kwargs)
  140. return fun(*args, **kwargs)
  141. return _inner
  142. app = Celery(set_as_current=False)
  143. app.conf.CELERY_ANNOTATIONS = {
  144. adX.name: {'@__call__': deco}
  145. }
  146. adX.bind(app)
  147. self.assertIs(adX.app, app)
  148. i = adX()
  149. i(2, 4, x=3)
  150. check.assert_called_with(i, 2, 4, x=3)
  151. i.annotate()
  152. i.annotate()
  153. def test_apply_async_has__self__(self):
  154. app = Celery(set_as_current=False)
  155. @app.task(__self__='hello')
  156. def aawsX():
  157. pass
  158. with patch('celery.app.amqp.TaskProducer.publish_task') as dt:
  159. aawsX.apply_async((4, 5))
  160. args = dt.call_args[0][1]
  161. self.assertEqual(args, ('hello', 4, 5))
  162. def test_apply_async__connection_arg(self):
  163. app = Celery(set_as_current=False)
  164. @app.task()
  165. def aacaX():
  166. pass
  167. connection = app.connection('asd://')
  168. with self.assertRaises(KeyError):
  169. aacaX.apply_async(connection=connection)
  170. def test_apply_async_adds_children(self):
  171. from celery._state import _task_stack
  172. app = Celery(set_as_current=False)
  173. @app.task()
  174. def a3cX1(self):
  175. pass
  176. @app.task()
  177. def a3cX2(self):
  178. pass
  179. _task_stack.push(a3cX1)
  180. try:
  181. a3cX1.push_request(called_directly=False)
  182. try:
  183. res = a3cX2.apply_async(add_to_parent=True)
  184. self.assertIn(res, a3cX1.request.children)
  185. finally:
  186. a3cX1.pop_request()
  187. finally:
  188. _task_stack.pop()
  189. def test_TaskSet(self):
  190. ts = self.app.TaskSet()
  191. self.assertListEqual(ts.tasks, [])
  192. self.assertIs(ts.app, self.app)
  193. def test_pickle_app(self):
  194. changes = dict(THE_FOO_BAR='bars',
  195. THE_MII_MAR='jars')
  196. self.app.conf.update(changes)
  197. saved = pickle.dumps(self.app)
  198. self.assertLess(len(saved), 2048)
  199. restored = pickle.loads(saved)
  200. self.assertDictContainsSubset(changes, restored.conf)
  201. def test_worker_main(self):
  202. from celery.bin import celeryd
  203. class WorkerCommand(celeryd.WorkerCommand):
  204. def execute_from_commandline(self, argv):
  205. return argv
  206. prev, celeryd.WorkerCommand = celeryd.WorkerCommand, WorkerCommand
  207. try:
  208. ret = self.app.worker_main(argv=['--version'])
  209. self.assertListEqual(ret, ['--version'])
  210. finally:
  211. celeryd.WorkerCommand = prev
  212. def test_config_from_envvar(self):
  213. os.environ['CELERYTEST_CONFIG_OBJECT'] = 'celery.tests.app.test_app'
  214. self.app.config_from_envvar('CELERYTEST_CONFIG_OBJECT')
  215. self.assertEqual(self.app.conf.THIS_IS_A_KEY, 'this is a value')
  216. def test_config_from_object(self):
  217. class Object(object):
  218. LEAVE_FOR_WORK = True
  219. MOMENT_TO_STOP = True
  220. CALL_ME_BACK = 123456789
  221. WANT_ME_TO = False
  222. UNDERSTAND_ME = True
  223. self.app.config_from_object(Object())
  224. self.assertTrue(self.app.conf.LEAVE_FOR_WORK)
  225. self.assertTrue(self.app.conf.MOMENT_TO_STOP)
  226. self.assertEqual(self.app.conf.CALL_ME_BACK, 123456789)
  227. self.assertFalse(self.app.conf.WANT_ME_TO)
  228. self.assertTrue(self.app.conf.UNDERSTAND_ME)
  229. def test_config_from_cmdline(self):
  230. cmdline = ['.always_eager=no',
  231. '.result_backend=/dev/null',
  232. '.task_error_whitelist=(list)["a", "b", "c"]',
  233. 'celeryd.prefetch_multiplier=368',
  234. '.foobarstring=(string)300',
  235. '.foobarint=(int)300',
  236. '.result_engine_options=(dict){"foo": "bar"}']
  237. self.app.config_from_cmdline(cmdline, namespace='celery')
  238. self.assertFalse(self.app.conf.CELERY_ALWAYS_EAGER)
  239. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, '/dev/null')
  240. self.assertEqual(self.app.conf.CELERYD_PREFETCH_MULTIPLIER, 368)
  241. self.assertListEqual(self.app.conf.CELERY_TASK_ERROR_WHITELIST,
  242. ['a', 'b', 'c'])
  243. self.assertEqual(self.app.conf.CELERY_FOOBARSTRING, '300')
  244. self.assertEqual(self.app.conf.CELERY_FOOBARINT, 300)
  245. self.assertDictEqual(self.app.conf.CELERY_RESULT_ENGINE_OPTIONS,
  246. {'foo': 'bar'})
  247. def test_compat_setting_CELERY_BACKEND(self):
  248. self.app.config_from_object(Object(CELERY_BACKEND='set_by_us'))
  249. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, 'set_by_us')
  250. def test_setting_BROKER_TRANSPORT_OPTIONS(self):
  251. _args = {'foo': 'bar', 'spam': 'baz'}
  252. self.app.config_from_object(Object())
  253. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, {})
  254. self.app.config_from_object(Object(BROKER_TRANSPORT_OPTIONS=_args))
  255. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, _args)
  256. def test_Windows_log_color_disabled(self):
  257. self.app.IS_WINDOWS = True
  258. self.assertFalse(self.app.log.supports_color(True))
  259. def test_compat_setting_CARROT_BACKEND(self):
  260. self.app.config_from_object(Object(CARROT_BACKEND='set_by_us'))
  261. self.assertEqual(self.app.conf.BROKER_TRANSPORT, 'set_by_us')
  262. def test_WorkController(self):
  263. x = self.app.WorkController
  264. self.assertIs(x.app, self.app)
  265. def test_Worker(self):
  266. x = self.app.Worker
  267. self.assertIs(x.app, self.app)
  268. def test_AsyncResult(self):
  269. x = self.app.AsyncResult('1')
  270. self.assertIs(x.app, self.app)
  271. r = loads(dumps(x))
  272. # not set as current, so ends up as default app after reduce
  273. self.assertIs(r.app, _state.default_app)
  274. @patch('celery.bin.celery.CeleryCommand.execute_from_commandline')
  275. def test_start(self, execute):
  276. self.app.start()
  277. self.assertTrue(execute.called)
  278. def test_mail_admins(self):
  279. class Loader(BaseLoader):
  280. def mail_admins(*args, **kwargs):
  281. return args, kwargs
  282. self.app.loader = Loader()
  283. self.app.conf.ADMINS = None
  284. self.assertFalse(self.app.mail_admins('Subject', 'Body'))
  285. self.app.conf.ADMINS = [('George Costanza', 'george@vandelay.com')]
  286. self.assertTrue(self.app.mail_admins('Subject', 'Body'))
  287. def test_amqp_get_broker_info(self):
  288. self.assertDictContainsSubset(
  289. {'hostname': 'localhost',
  290. 'userid': 'guest',
  291. 'password': 'guest',
  292. 'virtual_host': '/'},
  293. self.app.connection('amqp://').info(),
  294. )
  295. self.app.conf.BROKER_PORT = 1978
  296. self.app.conf.BROKER_VHOST = 'foo'
  297. self.assertDictContainsSubset(
  298. {'port': 1978, 'virtual_host': 'foo'},
  299. self.app.connection('amqp://:1978/foo').info(),
  300. )
  301. conn = self.app.connection('amqp:////value')
  302. self.assertDictContainsSubset({'virtual_host': '/value'},
  303. conn.info())
  304. def test_BROKER_BACKEND_alias(self):
  305. self.assertEqual(self.app.conf.BROKER_BACKEND,
  306. self.app.conf.BROKER_TRANSPORT)
  307. def test_with_default_connection(self):
  308. @self.app.with_default_connection
  309. def handler(connection=None, foo=None):
  310. return connection, foo
  311. connection, foo = handler(foo=42)
  312. self.assertEqual(foo, 42)
  313. self.assertTrue(connection)
  314. def test_after_fork(self):
  315. p = self.app._pool = Mock()
  316. self.app._after_fork(self.app)
  317. p.force_close_all.assert_called_with()
  318. self.assertIsNone(self.app._pool)
  319. self.app._after_fork(self.app)
  320. def test_pool_no_multiprocessing(self):
  321. with mask_modules('multiprocessing.util'):
  322. pool = self.app.pool
  323. self.assertIs(pool, self.app._pool)
  324. def test_bugreport(self):
  325. self.assertTrue(self.app.bugreport())
  326. def test_send_task_sent_event(self):
  327. class Dispatcher(object):
  328. sent = []
  329. def publish(self, type, fields, *args, **kwargs):
  330. self.sent.append((type, fields))
  331. conn = self.app.connection()
  332. chan = conn.channel()
  333. try:
  334. for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):
  335. chan.exchange_declare(e, 'direct', durable=True)
  336. chan.queue_declare(e, durable=True)
  337. chan.queue_bind(e, e, e)
  338. finally:
  339. chan.close()
  340. assert conn.transport_cls == 'memory'
  341. prod = self.app.amqp.TaskProducer(
  342. conn, exchange=Exchange('foo_exchange'),
  343. send_sent_event=True,
  344. )
  345. dispatcher = Dispatcher()
  346. self.assertTrue(prod.publish_task('footask', (), {},
  347. exchange='moo_exchange',
  348. routing_key='moo_exchange',
  349. event_dispatcher=dispatcher))
  350. self.assertTrue(dispatcher.sent)
  351. self.assertEqual(dispatcher.sent[0][0], 'task-sent')
  352. self.assertTrue(prod.publish_task('footask', (), {},
  353. event_dispatcher=dispatcher,
  354. exchange='bar_exchange',
  355. routing_key='bar_exchange'))
  356. def test_error_mail_sender(self):
  357. x = ErrorMail.subject % {'name': 'task_name',
  358. 'id': uuid(),
  359. 'exc': 'FOOBARBAZ',
  360. 'hostname': 'lana'}
  361. self.assertTrue(x)
  362. class test_defaults(Case):
  363. def test_str_to_bool(self):
  364. for s in ('false', 'no', '0'):
  365. self.assertFalse(defaults.strtobool(s))
  366. for s in ('true', 'yes', '1'):
  367. self.assertTrue(defaults.strtobool(s))
  368. with self.assertRaises(TypeError):
  369. defaults.strtobool('unsure')
  370. class test_debugging_utils(Case):
  371. def test_enable_disable_trace(self):
  372. try:
  373. _app.enable_trace()
  374. self.assertEqual(_app.app_or_default, _app._app_or_default_trace)
  375. _app.disable_trace()
  376. self.assertEqual(_app.app_or_default, _app._app_or_default)
  377. finally:
  378. _app.disable_trace()
  379. class test_pyimplementation(Case):
  380. def test_platform_python_implementation(self):
  381. with platform_pyimp(lambda: 'Xython'):
  382. self.assertEqual(pyimplementation(), 'Xython')
  383. def test_platform_jython(self):
  384. with platform_pyimp():
  385. with sys_platform('java 1.6.51'):
  386. self.assertIn('Jython', pyimplementation())
  387. def test_platform_pypy(self):
  388. with platform_pyimp():
  389. with sys_platform('darwin'):
  390. with pypy_version((1, 4, 3)):
  391. self.assertIn('PyPy', pyimplementation())
  392. with pypy_version((1, 4, 3, 'a4')):
  393. self.assertIn('PyPy', pyimplementation())
  394. def test_platform_fallback(self):
  395. with platform_pyimp():
  396. with sys_platform('darwin'):
  397. with pypy_version():
  398. self.assertEqual('CPython', pyimplementation())