test_app.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. from __future__ import with_statement
  2. import os
  3. import sys
  4. from mock import Mock
  5. from celery import Celery
  6. from celery import app as _app
  7. from celery.app import defaults
  8. from celery.app.base import BaseApp, pyimplementation
  9. from celery.loaders.base import BaseLoader
  10. from celery.utils.serialization import pickle
  11. from celery.tests import config
  12. from celery.tests.utils import (unittest, mask_modules, platform_pyimp,
  13. sys_platform, pypy_version)
  14. THIS_IS_A_KEY = "this is a value"
  15. class Object(object):
  16. def __init__(self, **kwargs):
  17. for key, value in kwargs.items():
  18. setattr(self, key, value)
  19. def _get_test_config():
  20. return dict((key, getattr(config, key))
  21. for key in dir(config)
  22. if key.isupper() and not key.startswith("_"))
  23. test_config = _get_test_config()
  24. class test_App(unittest.TestCase):
  25. def setUp(self):
  26. self.app = Celery(set_as_current=False)
  27. self.app.conf.update(test_config)
  28. def test_task(self):
  29. app = Celery("foozibari", set_as_current=False)
  30. def fun():
  31. pass
  32. fun.__module__ = "__main__"
  33. task = app.task(fun)
  34. self.assertEqual(task.name, app.main + ".fun")
  35. def test_repr(self):
  36. self.assertTrue(repr(self.app))
  37. def test_TaskSet(self):
  38. ts = self.app.TaskSet()
  39. self.assertListEqual(ts.tasks, [])
  40. self.assertIs(ts.app, self.app)
  41. def test_pickle_app(self):
  42. changes = dict(THE_FOO_BAR="bars",
  43. THE_MII_MAR="jars")
  44. self.app.conf.update(changes)
  45. saved = pickle.dumps(self.app)
  46. self.assertLess(len(saved), 2048)
  47. restored = pickle.loads(saved)
  48. self.assertDictContainsSubset(changes, restored.conf)
  49. def test_worker_main(self):
  50. from celery.bin import celeryd
  51. class WorkerCommand(celeryd.WorkerCommand):
  52. def execute_from_commandline(self, argv):
  53. return argv
  54. prev, celeryd.WorkerCommand = celeryd.WorkerCommand, WorkerCommand
  55. try:
  56. ret = self.app.worker_main(argv=["--version"])
  57. self.assertListEqual(ret, ["--version"])
  58. finally:
  59. celeryd.WorkerCommand = prev
  60. def test_config_from_envvar(self):
  61. os.environ["CELERYTEST_CONFIG_OBJECT"] = \
  62. "celery.tests.test_app.test_app"
  63. self.app.config_from_envvar("CELERYTEST_CONFIG_OBJECT")
  64. self.assertEqual(self.app.conf.THIS_IS_A_KEY, "this is a value")
  65. def test_config_from_object(self):
  66. class Object(object):
  67. LEAVE_FOR_WORK = True
  68. MOMENT_TO_STOP = True
  69. CALL_ME_BACK = 123456789
  70. WANT_ME_TO = False
  71. UNDERSTAND_ME = True
  72. self.app.config_from_object(Object())
  73. self.assertTrue(self.app.conf.LEAVE_FOR_WORK)
  74. self.assertTrue(self.app.conf.MOMENT_TO_STOP)
  75. self.assertEqual(self.app.conf.CALL_ME_BACK, 123456789)
  76. self.assertFalse(self.app.conf.WANT_ME_TO)
  77. self.assertTrue(self.app.conf.UNDERSTAND_ME)
  78. def test_config_from_cmdline(self):
  79. cmdline = [".always_eager=no",
  80. ".result_backend=/dev/null",
  81. '.task_error_whitelist=(list)["a", "b", "c"]',
  82. "celeryd.prefetch_multiplier=368",
  83. ".foobarstring=(string)300",
  84. ".foobarint=(int)300",
  85. '.result_engine_options=(dict){"foo": "bar"}']
  86. self.app.config_from_cmdline(cmdline, namespace="celery")
  87. self.assertFalse(self.app.conf.CELERY_ALWAYS_EAGER)
  88. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, "/dev/null")
  89. self.assertEqual(self.app.conf.CELERYD_PREFETCH_MULTIPLIER, 368)
  90. self.assertListEqual(self.app.conf.CELERY_TASK_ERROR_WHITELIST,
  91. ["a", "b", "c"])
  92. self.assertEqual(self.app.conf.CELERY_FOOBARSTRING, "300")
  93. self.assertEqual(self.app.conf.CELERY_FOOBARINT, 300)
  94. self.assertDictEqual(self.app.conf.CELERY_RESULT_ENGINE_OPTIONS,
  95. {"foo": "bar"})
  96. def test_compat_setting_CELERY_BACKEND(self):
  97. self.app.config_from_object(Object(CELERY_BACKEND="set_by_us"))
  98. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, "set_by_us")
  99. def test_setting_BROKER_TRANSPORT_OPTIONS(self):
  100. _args = {'foo': 'bar', 'spam': 'baz'}
  101. self.app.config_from_object(Object())
  102. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, {})
  103. self.app.config_from_object(Object(BROKER_TRANSPORT_OPTIONS=_args))
  104. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, _args)
  105. def test_Windows_log_color_disabled(self):
  106. self.app.IS_WINDOWS = True
  107. self.assertFalse(self.app.log.supports_color())
  108. def test_compat_setting_CARROT_BACKEND(self):
  109. self.app.config_from_object(Object(CARROT_BACKEND="set_by_us"))
  110. self.assertEqual(self.app.conf.BROKER_TRANSPORT, "set_by_us")
  111. def test_mail_admins(self):
  112. class Loader(BaseLoader):
  113. def mail_admins(*args, **kwargs):
  114. return args, kwargs
  115. self.app.loader = Loader()
  116. self.app.conf.ADMINS = None
  117. self.assertFalse(self.app.mail_admins("Subject", "Body"))
  118. self.app.conf.ADMINS = [("George Costanza", "george@vandelay.com")]
  119. self.assertTrue(self.app.mail_admins("Subject", "Body"))
  120. def test_amqp_get_broker_info(self):
  121. self.assertDictContainsSubset({"hostname": "localhost",
  122. "userid": "guest",
  123. "password": "guest",
  124. "virtual_host": "/"},
  125. self.app.broker_connection(
  126. transport="amqplib").info())
  127. self.app.conf.BROKER_PORT = 1978
  128. self.app.conf.BROKER_VHOST = "foo"
  129. self.assertDictContainsSubset({"port": 1978,
  130. "virtual_host": "foo"},
  131. self.app.broker_connection(
  132. transport="amqplib").info())
  133. conn = self.app.broker_connection(virtual_host="/value")
  134. self.assertDictContainsSubset({"virtual_host": "/value"},
  135. conn.info())
  136. def test_BROKER_BACKEND_alias(self):
  137. self.assertEqual(self.app.conf.BROKER_BACKEND,
  138. self.app.conf.BROKER_TRANSPORT)
  139. def test_with_default_connection(self):
  140. @self.app.with_default_connection
  141. def handler(connection=None, foo=None):
  142. return connection, foo
  143. connection, foo = handler(foo=42)
  144. self.assertEqual(foo, 42)
  145. self.assertTrue(connection)
  146. def test_after_fork(self):
  147. p = self.app._pool = Mock()
  148. self.app._after_fork(self.app)
  149. p.force_close_all.assert_called_with()
  150. self.assertIsNone(self.app._pool)
  151. self.app._after_fork(self.app)
  152. def test_pool_no_multiprocessing(self):
  153. with mask_modules("multiprocessing.util"):
  154. pool = self.app.pool
  155. self.assertIs(pool, self.app._pool)
  156. def test_bugreport(self):
  157. self.assertTrue(self.app.bugreport())
  158. def test_send_task_sent_event(self):
  159. from celery.app import amqp
  160. class Dispatcher(object):
  161. sent = []
  162. def send(self, type, **fields):
  163. self.sent.append((type, fields))
  164. conn = self.app.broker_connection()
  165. chan = conn.channel()
  166. try:
  167. for e in ("foo_exchange", "moo_exchange", "bar_exchange"):
  168. chan.exchange_declare(e, "direct", durable=True)
  169. chan.queue_declare(e, durable=True)
  170. chan.queue_bind(e, e, e)
  171. finally:
  172. chan.close()
  173. assert conn.transport_cls == "memory"
  174. pub = self.app.amqp.TaskPublisher(conn, exchange="foo_exchange")
  175. self.assertIn("foo_exchange", amqp._exchanges_declared)
  176. dispatcher = Dispatcher()
  177. self.assertTrue(pub.delay_task("footask", (), {},
  178. exchange="moo_exchange",
  179. routing_key="moo_exchange",
  180. event_dispatcher=dispatcher))
  181. self.assertTrue(dispatcher.sent)
  182. self.assertEqual(dispatcher.sent[0][0], "task-sent")
  183. self.assertTrue(pub.delay_task("footask", (), {},
  184. event_dispatcher=dispatcher,
  185. exchange="bar_exchange",
  186. routing_key="bar_exchange"))
  187. self.assertIn("bar_exchange", amqp._exchanges_declared)
  188. class test_BaseApp(unittest.TestCase):
  189. def test_on_init(self):
  190. BaseApp()
  191. class test_defaults(unittest.TestCase):
  192. def test_str_to_bool(self):
  193. for s in ("false", "no", "0"):
  194. self.assertFalse(defaults.str_to_bool(s))
  195. for s in ("true", "yes", "1"):
  196. self.assertTrue(defaults.str_to_bool(s))
  197. self.assertRaises(TypeError, defaults.str_to_bool, "unsure")
  198. class test_debugging_utils(unittest.TestCase):
  199. def test_enable_disable_trace(self):
  200. try:
  201. _app.enable_trace()
  202. self.assertEqual(_app.app_or_default, _app._app_or_default_trace)
  203. _app.disable_trace()
  204. self.assertEqual(_app.app_or_default, _app._app_or_default)
  205. finally:
  206. _app.disable_trace()
  207. class test_compilation(unittest.TestCase):
  208. _clean = ("celery.app.base", )
  209. def setUp(self):
  210. self._prev = dict((k, sys.modules.pop(k, None)) for k in self._clean)
  211. def tearDown(self):
  212. sys.modules.update(self._prev)
  213. def test_kombu_version_check(self):
  214. import kombu
  215. kombu.VERSION = (0, 9, 9)
  216. with self.assertRaises(ImportError):
  217. __import__("celery.app.base")
  218. class test_pyimplementation(unittest.TestCase):
  219. def test_platform_python_implementation(self):
  220. with platform_pyimp(lambda: "Xython"):
  221. self.assertEqual(pyimplementation(), "Xython")
  222. def test_platform_jython(self):
  223. with platform_pyimp():
  224. with sys_platform("java 1.6.51"):
  225. self.assertIn("Jython", pyimplementation())
  226. def test_platform_pypy(self):
  227. with platform_pyimp():
  228. with sys_platform("darwin"):
  229. with pypy_version((1, 4, 3)):
  230. self.assertIn("PyPy", pyimplementation())
  231. with pypy_version((1, 4, 3, "a4")):
  232. self.assertIn("PyPy", pyimplementation())
  233. def test_platform_fallback(self):
  234. with platform_pyimp():
  235. with sys_platform("darwin"):
  236. with pypy_version():
  237. self.assertEqual("CPython", pyimplementation())