test_app.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import os
  2. from celery import Celery
  3. from celery.app import defaults
  4. from celery.app.base import BaseApp
  5. from celery.loaders.base import BaseLoader
  6. from celery.utils.serialization import pickle
  7. from celery.tests import config
  8. from celery.tests.utils import unittest
  9. THIS_IS_A_KEY = "this is a value"
  10. class Object(object):
  11. def __init__(self, **kwargs):
  12. for key, value in kwargs.items():
  13. setattr(self, key, value)
  14. def _get_test_config():
  15. return dict((key, getattr(config, key))
  16. for key in dir(config)
  17. if key.isupper() and not key.startswith("_"))
  18. test_config = _get_test_config()
  19. class test_App(unittest.TestCase):
  20. def setUp(self):
  21. self.app = Celery(set_as_current=False)
  22. self.app.conf.update(test_config)
  23. def test_task(self):
  24. app = Celery("foozibari", set_as_current=False)
  25. def fun():
  26. pass
  27. fun.__module__ = "__main__"
  28. task = app.task(fun)
  29. self.assertEqual(task.name, app.main + ".fun")
  30. def test_TaskSet(self):
  31. ts = self.app.TaskSet()
  32. self.assertListEqual(ts.tasks, [])
  33. self.assertIs(ts.app, self.app)
  34. def test_pickle_app(self):
  35. changes = dict(THE_FOO_BAR="bars",
  36. THE_MII_MAR="jars")
  37. self.app.conf.update(changes)
  38. saved = pickle.dumps(self.app)
  39. self.assertLess(len(saved), 2048)
  40. restored = pickle.loads(saved)
  41. self.assertDictContainsSubset(changes, restored.conf)
  42. def test_worker_main(self):
  43. from celery.bin import celeryd
  44. class WorkerCommand(celeryd.WorkerCommand):
  45. def execute_from_commandline(self, argv):
  46. return argv
  47. prev, celeryd.WorkerCommand = celeryd.WorkerCommand, WorkerCommand
  48. try:
  49. ret = self.app.worker_main(argv=["--version"])
  50. self.assertListEqual(ret, ["--version"])
  51. finally:
  52. celeryd.WorkerCommand = prev
  53. def test_config_from_envvar(self):
  54. os.environ["CELERYTEST_CONFIG_OBJECT"] = \
  55. "celery.tests.test_app.test_app"
  56. self.app.config_from_envvar("CELERYTEST_CONFIG_OBJECT")
  57. self.assertEqual(self.app.conf.THIS_IS_A_KEY, "this is a value")
  58. def test_config_from_object(self):
  59. class Object(object):
  60. LEAVE_FOR_WORK = True
  61. MOMENT_TO_STOP = True
  62. CALL_ME_BACK = 123456789
  63. WANT_ME_TO = False
  64. UNDERSTAND_ME = True
  65. self.app.config_from_object(Object())
  66. self.assertTrue(self.app.conf.LEAVE_FOR_WORK)
  67. self.assertTrue(self.app.conf.MOMENT_TO_STOP)
  68. self.assertEqual(self.app.conf.CALL_ME_BACK, 123456789)
  69. self.assertFalse(self.app.conf.WANT_ME_TO)
  70. self.assertTrue(self.app.conf.UNDERSTAND_ME)
  71. def test_config_from_cmdline(self):
  72. cmdline = [".always_eager=no",
  73. ".result_backend=/dev/null",
  74. '.task_error_whitelist=(list)["a", "b", "c"]',
  75. "celeryd.prefetch_multiplier=368",
  76. ".foobarstring=(string)300",
  77. ".foobarint=(int)300",
  78. '.result_engine_options=(dict){"foo": "bar"}']
  79. self.app.config_from_cmdline(cmdline, namespace="celery")
  80. self.assertFalse(self.app.conf.CELERY_ALWAYS_EAGER)
  81. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, "/dev/null")
  82. self.assertEqual(self.app.conf.CELERYD_PREFETCH_MULTIPLIER, 368)
  83. self.assertListEqual(self.app.conf.CELERY_TASK_ERROR_WHITELIST,
  84. ["a", "b", "c"])
  85. self.assertEqual(self.app.conf.CELERY_FOOBARSTRING, "300")
  86. self.assertEqual(self.app.conf.CELERY_FOOBARINT, 300)
  87. self.assertDictEqual(self.app.conf.CELERY_RESULT_ENGINE_OPTIONS,
  88. {"foo": "bar"})
  89. def test_compat_setting_CELERY_BACKEND(self):
  90. self.app.config_from_object(Object(CELERY_BACKEND="set_by_us"))
  91. self.assertEqual(self.app.conf.CELERY_RESULT_BACKEND, "set_by_us")
  92. def test_setting_BROKER_TRANSPORT_OPTIONS(self):
  93. _args = {'foo': 'bar', 'spam': 'baz'}
  94. self.app.config_from_object(Object())
  95. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, {})
  96. self.app.config_from_object(Object(BROKER_TRANSPORT_OPTIONS=_args))
  97. self.assertEqual(self.app.conf.BROKER_TRANSPORT_OPTIONS, _args)
  98. def test_Windows_log_color_disabled(self):
  99. self.app.IS_WINDOWS = True
  100. self.assertFalse(self.app.log.supports_color())
  101. def test_compat_setting_CARROT_BACKEND(self):
  102. self.app.config_from_object(Object(CARROT_BACKEND="set_by_us"))
  103. self.assertEqual(self.app.conf.BROKER_BACKEND, "set_by_us")
  104. def test_mail_admins(self):
  105. class Loader(BaseLoader):
  106. def mail_admins(*args, **kwargs):
  107. return args, kwargs
  108. self.app.loader = Loader()
  109. self.app.conf.ADMINS = None
  110. self.assertFalse(self.app.mail_admins("Subject", "Body"))
  111. self.app.conf.ADMINS = [("George Costanza", "george@vandelay.com")]
  112. self.assertTrue(self.app.mail_admins("Subject", "Body"))
  113. def test_amqp_get_broker_info(self):
  114. self.assertDictContainsSubset({"hostname": "localhost",
  115. "userid": "guest",
  116. "password": "guest",
  117. "virtual_host": "/"},
  118. self.app.broker_connection().info())
  119. self.app.conf.BROKER_PORT = 1978
  120. self.app.conf.BROKER_VHOST = "foo"
  121. self.assertDictContainsSubset({"port": 1978,
  122. "virtual_host": "foo"},
  123. self.app.broker_connection().info())
  124. conn = self.app.broker_connection(virtual_host="/value")
  125. self.assertDictContainsSubset({"virtual_host": "/value"},
  126. conn.info())
  127. def test_send_task_sent_event(self):
  128. from celery.app import amqp
  129. class Dispatcher(object):
  130. sent = []
  131. def send(self, type, **fields):
  132. self.sent.append((type, fields))
  133. conn = self.app.broker_connection()
  134. chan = conn.channel()
  135. try:
  136. for e in ("foo_exchange", "moo_exchange", "bar_exchange"):
  137. chan.exchange_declare(e, "direct", durable=True)
  138. chan.queue_declare(e, durable=True)
  139. chan.queue_bind(e, e, e)
  140. finally:
  141. chan.close()
  142. assert conn.transport_cls == "memory"
  143. pub = self.app.amqp.TaskPublisher(conn, exchange="foo_exchange")
  144. self.assertIn("foo_exchange", amqp._exchanges_declared)
  145. dispatcher = Dispatcher()
  146. self.assertTrue(pub.delay_task("footask", (), {},
  147. exchange="moo_exchange",
  148. routing_key="moo_exchange",
  149. event_dispatcher=dispatcher))
  150. self.assertTrue(dispatcher.sent)
  151. self.assertEqual(dispatcher.sent[0][0], "task-sent")
  152. self.assertTrue(pub.delay_task("footask", (), {},
  153. event_dispatcher=dispatcher,
  154. exchange="bar_exchange",
  155. routing_key="bar_exchange"))
  156. self.assertIn("bar_exchange", amqp._exchanges_declared)
  157. class test_BaseApp(unittest.TestCase):
  158. def test_on_init(self):
  159. BaseApp()
  160. class test_defaults(unittest.TestCase):
  161. def test_str_to_bool(self):
  162. for s in ("false", "no", "0"):
  163. self.assertFalse(defaults.str_to_bool(s))
  164. for s in ("true", "yes", "1"):
  165. self.assertTrue(defaults.str_to_bool(s))
  166. self.assertRaises(TypeError, defaults.str_to_bool, "unsure")