test_celerybeat.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import logging
  4. import sys
  5. from collections import defaultdict
  6. from kombu.tests.utils import redirect_stdouts
  7. from celery import beat
  8. from celery import platforms
  9. from celery.app import app_or_default
  10. from celery.bin import celerybeat as celerybeat_bin
  11. from celery.apps import beat as beatapp
  12. from celery.tests.utils import AppCase
  13. class MockedShelveModule(object):
  14. shelves = defaultdict(lambda: {})
  15. def open(self, filename, *args, **kwargs):
  16. return self.shelves[filename]
  17. mocked_shelve = MockedShelveModule()
  18. class MockService(beat.Service):
  19. started = False
  20. in_sync = False
  21. persistence = mocked_shelve
  22. def start(self):
  23. self.__class__.started = True
  24. def sync(self):
  25. self.__class__.in_sync = True
  26. class MockBeat(beatapp.Beat):
  27. running = False
  28. def run(self):
  29. self.__class__.running = True
  30. class MockBeat2(beatapp.Beat):
  31. Service = MockService
  32. def install_sync_handler(self, b):
  33. pass
  34. class MockBeat3(beatapp.Beat):
  35. Service = MockService
  36. def install_sync_handler(self, b):
  37. raise TypeError("xxx")
  38. class test_Beat(AppCase):
  39. def test_loglevel_string(self):
  40. b = beatapp.Beat(loglevel="DEBUG")
  41. self.assertEqual(b.loglevel, logging.DEBUG)
  42. b2 = beatapp.Beat(loglevel=logging.DEBUG)
  43. self.assertEqual(b2.loglevel, logging.DEBUG)
  44. def test_init_loader(self):
  45. b = beatapp.Beat()
  46. b.init_loader()
  47. def test_process_title(self):
  48. b = beatapp.Beat()
  49. b.set_process_title()
  50. def test_run(self):
  51. b = MockBeat2()
  52. MockService.started = False
  53. b.run()
  54. self.assertTrue(MockService.started)
  55. def psig(self, fun, *args, **kwargs):
  56. handlers = {}
  57. class Signals(platforms.Signals):
  58. def __setitem__(self, sig, handler):
  59. handlers[sig] = handler
  60. p, platforms.signals = platforms.signals, Signals()
  61. try:
  62. fun(*args, **kwargs)
  63. return handlers
  64. finally:
  65. platforms.signals = p
  66. def test_install_sync_handler(self):
  67. b = beatapp.Beat()
  68. clock = MockService()
  69. MockService.in_sync = False
  70. handlers = self.psig(b.install_sync_handler, clock)
  71. with self.assertRaises(SystemExit):
  72. handlers["SIGINT"]("SIGINT", object())
  73. self.assertTrue(MockService.in_sync)
  74. MockService.in_sync = False
  75. def test_setup_logging(self):
  76. try:
  77. # py3k
  78. delattr(sys.stdout, "logger")
  79. except AttributeError:
  80. pass
  81. b = beatapp.Beat()
  82. b.redirect_stdouts = False
  83. b.setup_logging()
  84. with self.assertRaises(AttributeError):
  85. sys.stdout.logger
  86. @redirect_stdouts
  87. def test_logs_errors(self, stdout, stderr):
  88. class MockLogger(object):
  89. _critical = []
  90. def debug(self, *args, **kwargs):
  91. pass
  92. def critical(self, msg, *args, **kwargs):
  93. self._critical.append(msg)
  94. logger = MockLogger()
  95. b = MockBeat3(socket_timeout=None)
  96. b.start_scheduler(logger)
  97. self.assertTrue(logger._critical)
  98. @redirect_stdouts
  99. def test_use_pidfile(self, stdout, stderr):
  100. from celery import platforms
  101. class create_pidlock(object):
  102. instance = [None]
  103. def __init__(self, file):
  104. self.file = file
  105. self.instance[0] = self
  106. def acquire(self):
  107. self.acquired = True
  108. class Object(object):
  109. def release(self):
  110. pass
  111. return Object()
  112. prev, platforms.create_pidlock = platforms.create_pidlock, \
  113. create_pidlock
  114. try:
  115. b = MockBeat2(pidfile="pidfilelockfilepid", socket_timeout=None)
  116. b.start_scheduler()
  117. self.assertTrue(create_pidlock.instance[0].acquired)
  118. finally:
  119. platforms.create_pidlock = prev
  120. class MockDaemonContext(object):
  121. opened = False
  122. closed = False
  123. def __init__(self, *args, **kwargs):
  124. pass
  125. def open(self):
  126. self.__class__.opened = True
  127. return self
  128. __enter__ = open
  129. def close(self, *args):
  130. self.__class__.closed = True
  131. __exit__ = close
  132. class test_div(AppCase):
  133. def setup(self):
  134. self.prev, beatapp.Beat = beatapp.Beat, MockBeat
  135. self.ctx, celerybeat_bin.detached = \
  136. celerybeat_bin.detached, MockDaemonContext
  137. def teardown(self):
  138. beatapp.Beat = self.prev
  139. def test_main(self):
  140. sys.argv = [sys.argv[0], "-s", "foo"]
  141. try:
  142. celerybeat_bin.main()
  143. self.assertTrue(MockBeat.running)
  144. finally:
  145. MockBeat.running = False
  146. def test_detach(self):
  147. cmd = celerybeat_bin.BeatCommand()
  148. cmd.app = app_or_default()
  149. cmd.run(detach=True)
  150. self.assertTrue(MockDaemonContext.opened)
  151. self.assertTrue(MockDaemonContext.closed)
  152. def test_parse_options(self):
  153. cmd = celerybeat_bin.BeatCommand()
  154. cmd.app = app_or_default()
  155. options, args = cmd.parse_options("celerybeat", ["-s", "foo"])
  156. self.assertEqual(options.schedule, "foo")