test_app.py 11 KB


  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import os
  4. from mock import Mock, patch
  5. from pickle import loads, dumps
  6. from kombu import Exchange
  7. from celery import Celery
  8. from celery import app as _app
  9. from celery.app import defaults
  10. from celery.app import state
  11. from celery.loaders.base import BaseLoader
  12. from celery.platforms import pyimplementation
  13. from celery.utils.serialization import pickle
  14. from celery.tests import config
  15. from celery.tests.utils import (Case, mask_modules, platform_pyimp,
  16. sys_platform, pypy_version)
  17. from celery.utils import uuid
  18. from celery.utils.mail import ErrorMail
  19. THIS_IS_A_KEY = "this is a value"
  20. class Object(object):
  21. def __init__(self, **kwargs):
  22. for key, value in kwargs.items():
  23. setattr(self, key, value)
  24. def _get_test_config():
  25. return dict((key, getattr(config, key))
  26. for key in dir(config)
  27. if key.isupper() and not key.startswith("_"))
  28. test_config = _get_test_config()
  29. class test_module(Case):
  30. def test_default_app(self):
  31. self.assertEqual(_app.default_app, state.default_app)
  32. def test_bugreport(self):
  33. self.assertTrue(_app.bugreport())
  34. class test_App(Case):
  35. def setUp(self):
  36. self.app = Celery(set_as_current=False)
  37. self.app.conf.update(test_config)
  38. def test_task(self):
  39. app = Celery("foozibari", set_as_current=False)
  40. def fun():
  41. pass
  42. fun.__module__ = "__main__"
  43. task = app.task(fun)
  44. self.assertEqual(task.name, app.main + ".fun")
  45. def test_with_broker(self):
  46. app = Celery(set_as_current=False, broker="foo://baribaz")
  47. self.assertEqual(app.conf.BROKER_HOST, "foo://baribaz")
  48. def test_repr(self):
  49. self.assertTrue(repr(self.app))
  50. def test_TaskSet(self):
  51. ts = self.app.TaskSet()
  52. self.assertListEqual(ts.tasks, [])
  53. self.assertIs(ts.app, self.app)
  54. def test_pickle_app(self):
  55. changes = dict(THE_FOO_BAR="bars",
  56. THE_MII_MAR="jars")
  57. self.app.conf.update(changes)
  58. saved = pickle.dumps(self.app)
  59. self.assertLess(len(saved), 2048)
  60. restored = pickle.loads(saved)
  61. self.assertDictContainsSubset(changes, restored.conf)
  62. def test_worker_main(self):
  63. from celery.bin import celeryd
  64. class WorkerCommand(celeryd.WorkerCommand):
  65. def execute_from_commandline(self, argv):
  66. return argv
  67. prev, celeryd.WorkerCommand = celeryd.WorkerCommand, WorkerCommand
  68. try:
  69. ret = self.app.worker_main(argv=["--version"])
  70. self.assertListEqual(ret, ["--version"])
  71. finally:
  72. celeryd.WorkerCommand = prev
  73. def test_config_from_envvar(self):
  74. os.environ["CELERYTEST_CONFIG_OBJECT"] = "celery.tests.app.test_app"
  75. self.app.config_from_envvar("CELERYTEST_CONFIG_OBJECT")
  76. self.assertEqual(self.app.conf.THIS_IS_A_KEY, "this is a value")
  77. def test_config_from_object(self):
  78. class Object(object):
  79. LEAVE_FOR_WORK = True
  80. MOMENT_TO_STOP = True
  81. CALL_ME_BACK = 123456789
  82. WANT_ME_TO = False
  83. UNDERSTAND_ME = True
  84. self.app.config_from_object(Object())
  85. self.assertTrue(self.app.conf.LEAVE_FOR_WORK)
  86. self.assertTrue(self.app.conf.MOMENT_TO_STOP)
  87. self.assertEqual(self.app.conf.CALL_ME_BACK, 123456789)
  88. self.assertFalse(self.app.conf.WANT_ME_TO)
  89. self.assertTrue(self.app.conf.UNDERSTAND_ME)
  90. def test_config_from_cmdline(self):
  91. cmdline = [".always_eager=no",
  92. ".result_backend=/dev/null",
  93. '.task_error_whitelist=(list)["a", "b", "c"]',
  94. "celeryd.prefetch_multiplier=368",
  95. ".foobarstring=(string)300",
  96. ".foobarint=(int)300",
  97. '.result_engine_options=(dict){"foo": "bar"}']
  98. self.app.config_from_cmdline(cmdline, namespace="celery")
  99. self.assertFalse(self.app.conf.CELERY_ALWAYS_EAGER)
  100. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, "/dev/null")
  101. self.assertEqual(self.app.conf.CELERYD_PREFETCH_MULTIPLIER, 368)
  102. self.assertListEqual(self.app.conf.CELERY_TASK_ERROR_WHITELIST,
  103. ["a", "b", "c"])
  104. self.assertEqual(self.app.conf.CELERY_FOOBARSTRING, "300")
  105. self.assertEqual(self.app.conf.CELERY_FOOBARINT, 300)
  106. self.assertDictEqual(self.app.conf.CELERY_RESULT_ENGINE_OPTIONS,
  107. {"foo": "bar"})
  108. def test_compat_setting_CELERY_BACKEND(self):
  109. self.app.config_from_object(Object(CELERY_BACKEND="set_by_us"))
  110. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, "set_by_us")
  111. def test_setting_BROKER_TRANSPORT_OPTIONS(self):
  112. _args = {'foo': 'bar', 'spam': 'baz'}
  113. self.app.config_from_object(Object())
  114. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, {})
  115. self.app.config_from_object(Object(BROKER_TRANSPORT_OPTIONS=_args))
  116. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, _args)
  117. def test_Windows_log_color_disabled(self):
  118. self.app.IS_WINDOWS = True
  119. self.assertFalse(self.app.log.supports_color())
  120. def test_compat_setting_CARROT_BACKEND(self):
  121. self.app.config_from_object(Object(CARROT_BACKEND="set_by_us"))
  122. self.assertEqual(self.app.conf.BROKER_TRANSPORT, "set_by_us")
  123. def test_WorkController(self):
  124. x = self.app.WorkController
  125. self.assertIs(x.app, self.app)
  126. def test_Worker(self):
  127. x = self.app.Worker
  128. self.assertIs(x.app, self.app)
  129. def test_AsyncResult(self):
  130. x = self.app.AsyncResult("1")
  131. self.assertIs(x.app, self.app)
  132. r = loads(dumps(x))
  133. # not set as current, so ends up as default app after reduce
  134. self.assertIs(r.app, state.default_app)
  135. @patch("celery.bin.celery.CeleryCommand.execute_from_commandline")
  136. def test_start(self, execute):
  137. self.app.start()
  138. self.assertTrue(execute.called)
  139. def test_mail_admins(self):
  140. class Loader(BaseLoader):
  141. def mail_admins(*args, **kwargs):
  142. return args, kwargs
  143. self.app.loader = Loader()
  144. self.app.conf.ADMINS = None
  145. self.assertFalse(self.app.mail_admins("Subject", "Body"))
  146. self.app.conf.ADMINS = [("George Costanza", "george@vandelay.com")]
  147. self.assertTrue(self.app.mail_admins("Subject", "Body"))
  148. def test_amqp_get_broker_info(self):
  149. self.assertDictContainsSubset({"hostname": "localhost",
  150. "userid": "guest",
  151. "password": "guest",
  152. "virtual_host": "/"},
  153. self.app.broker_connection(
  154. transport="amqplib").info())
  155. self.app.conf.BROKER_PORT = 1978
  156. self.app.conf.BROKER_VHOST = "foo"
  157. self.assertDictContainsSubset({"port": 1978,
  158. "virtual_host": "foo"},
  159. self.app.broker_connection(
  160. transport="amqplib").info())
  161. conn = self.app.broker_connection(virtual_host="/value")
  162. self.assertDictContainsSubset({"virtual_host": "/value"},
  163. conn.info())
  164. def test_BROKER_BACKEND_alias(self):
  165. self.assertEqual(self.app.conf.BROKER_BACKEND,
  166. self.app.conf.BROKER_TRANSPORT)
  167. def test_with_default_connection(self):
  168. @self.app.with_default_connection
  169. def handler(connection=None, foo=None):
  170. return connection, foo
  171. connection, foo = handler(foo=42)
  172. self.assertEqual(foo, 42)
  173. self.assertTrue(connection)
  174. def test_after_fork(self):
  175. p = self.app._pool = Mock()
  176. self.app._after_fork(self.app)
  177. p.force_close_all.assert_called_with()
  178. self.assertIsNone(self.app._pool)
  179. self.app._after_fork(self.app)
  180. def test_pool_no_multiprocessing(self):
  181. with mask_modules("multiprocessing.util"):
  182. pool = self.app.pool
  183. self.assertIs(pool, self.app._pool)
  184. def test_bugreport(self):
  185. self.assertTrue(self.app.bugreport())
  186. def test_send_task_sent_event(self):
  187. class Dispatcher(object):
  188. sent = []
  189. def send(self, type, **fields):
  190. self.sent.append((type, fields))
  191. conn = self.app.broker_connection()
  192. chan = conn.channel()
  193. try:
  194. for e in ("foo_exchange", "moo_exchange", "bar_exchange"):
  195. chan.exchange_declare(e, "direct", durable=True)
  196. chan.queue_declare(e, durable=True)
  197. chan.queue_bind(e, e, e)
  198. finally:
  199. chan.close()
  200. assert conn.transport_cls == "memory"
  201. pub = self.app.amqp.TaskPublisher(conn,
  202. exchange=Exchange("foo_exchange"))
  203. dispatcher = Dispatcher()
  204. self.assertTrue(pub.delay_task("footask", (), {},
  205. exchange="moo_exchange",
  206. routing_key="moo_exchange",
  207. event_dispatcher=dispatcher))
  208. self.assertTrue(dispatcher.sent)
  209. self.assertEqual(dispatcher.sent[0][0], "task-sent")
  210. self.assertTrue(pub.delay_task("footask", (), {},
  211. event_dispatcher=dispatcher,
  212. exchange="bar_exchange",
  213. routing_key="bar_exchange"))
  214. def test_error_mail_sender(self):
  215. x = ErrorMail.subject % {"name": "task_name",
  216. "id": uuid(),
  217. "exc": "FOOBARBAZ",
  218. "hostname": "lana"}
  219. self.assertTrue(x)
  220. class test_defaults(Case):
  221. def test_str_to_bool(self):
  222. for s in ("false", "no", "0"):
  223. self.assertFalse(defaults.str_to_bool(s))
  224. for s in ("true", "yes", "1"):
  225. self.assertTrue(defaults.str_to_bool(s))
  226. with self.assertRaises(TypeError):
  227. defaults.str_to_bool("unsure")
  228. class test_debugging_utils(Case):
  229. def test_enable_disable_trace(self):
  230. try:
  231. _app.enable_trace()
  232. self.assertEqual(_app.app_or_default, _app._app_or_default_trace)
  233. _app.disable_trace()
  234. self.assertEqual(_app.app_or_default, _app._app_or_default)
  235. finally:
  236. _app.disable_trace()
  237. class test_pyimplementation(Case):
  238. def test_platform_python_implementation(self):
  239. with platform_pyimp(lambda: "Xython"):
  240. self.assertEqual(pyimplementation(), "Xython")
  241. def test_platform_jython(self):
  242. with platform_pyimp():
  243. with sys_platform("java 1.6.51"):
  244. self.assertIn("Jython", pyimplementation())
  245. def test_platform_pypy(self):
  246. with platform_pyimp():
  247. with sys_platform("darwin"):
  248. with pypy_version((1, 4, 3)):
  249. self.assertIn("PyPy", pyimplementation())
  250. with pypy_version((1, 4, 3, "a4")):
  251. self.assertIn("PyPy", pyimplementation())
  252. def test_platform_fallback(self):
  253. with platform_pyimp():
  254. with sys_platform("darwin"):
  255. with pypy_version():
  256. self.assertEqual("CPython", pyimplementation())