test_celerybeat.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import logging
  2. import sys
  3. from kombu.tests.utils import redirect_stdouts
  4. from celery import beat
  5. from celery import platforms
  6. from celery.app import app_or_default
  7. from celery.bin import celerybeat as celerybeat_bin
  8. from celery.apps import beat as beatapp
  9. from celery.utils.compat import defaultdict
  10. from celery.tests.utils import AppCase
  11. class MockedShelveModule(object):
  12. shelves = defaultdict(lambda: {})
  13. def open(self, filename, *args, **kwargs):
  14. return self.shelves[filename]
  15. mocked_shelve = MockedShelveModule()
  16. class MockService(beat.Service):
  17. started = False
  18. in_sync = False
  19. persistence = mocked_shelve
  20. def start(self):
  21. self.__class__.started = True
  22. def sync(self):
  23. self.__class__.in_sync = True
  24. class MockBeat(beatapp.Beat):
  25. running = False
  26. def run(self):
  27. self.__class__.running = True
  28. class MockBeat2(beatapp.Beat):
  29. Service = MockService
  30. def install_sync_handler(self, b):
  31. pass
  32. class MockBeat3(beatapp.Beat):
  33. Service = MockService
  34. def install_sync_handler(self, b):
  35. raise TypeError("xxx")
  36. class test_Beat(AppCase):
  37. def test_loglevel_string(self):
  38. b = beatapp.Beat(loglevel="DEBUG")
  39. self.assertEqual(b.loglevel, logging.DEBUG)
  40. b2 = beatapp.Beat(loglevel=logging.DEBUG)
  41. self.assertEqual(b2.loglevel, logging.DEBUG)
  42. def test_init_loader(self):
  43. b = beatapp.Beat()
  44. b.init_loader()
  45. def test_process_title(self):
  46. b = beatapp.Beat()
  47. b.set_process_title()
  48. def test_run(self):
  49. b = MockBeat2()
  50. MockService.started = False
  51. b.run()
  52. self.assertTrue(MockService.started)
  53. def psig(self, fun, *args, **kwargs):
  54. handlers = {}
  55. def i(sig, handler):
  56. handlers[sig] = handler
  57. p, platforms.install_signal_handler = \
  58. platforms.install_signal_handler, i
  59. try:
  60. fun(*args, **kwargs)
  61. return handlers
  62. finally:
  63. platforms.install_signal_handler = p
  64. def test_install_sync_handler(self):
  65. b = beatapp.Beat()
  66. clock = MockService()
  67. MockService.in_sync = False
  68. handlers = self.psig(b.install_sync_handler, clock)
  69. self.assertRaises(SystemExit, handlers["SIGINT"],
  70. "SIGINT", object())
  71. self.assertTrue(MockService.in_sync)
  72. MockService.in_sync = False
  73. def test_setup_logging(self):
  74. b = beatapp.Beat()
  75. b.redirect_stdouts = False
  76. b.setup_logging()
  77. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  78. @redirect_stdouts
  79. def test_logs_errors(self, stdout, stderr):
  80. class MockLogger(object):
  81. _critical = []
  82. def debug(self, *args, **kwargs):
  83. pass
  84. def critical(self, msg, *args, **kwargs):
  85. self._critical.append(msg)
  86. logger = MockLogger()
  87. b = MockBeat3(socket_timeout=None)
  88. b.start_scheduler(logger)
  89. self.assertTrue(logger._critical)
  90. @redirect_stdouts
  91. def test_use_pidfile(self, stdout, stderr):
  92. from celery import platforms
  93. class create_pidlock(object):
  94. instance = [None]
  95. def __init__(self, file):
  96. self.file = file
  97. self.instance[0] = self
  98. def acquire(self):
  99. self.acquired = True
  100. class Object(object):
  101. def release(self):
  102. pass
  103. return Object()
  104. prev, platforms.create_pidlock = platforms.create_pidlock, \
  105. create_pidlock
  106. try:
  107. b = MockBeat2(pidfile="pidfilelockfilepid", socket_timeout=None)
  108. b.start_scheduler()
  109. self.assertTrue(create_pidlock.instance[0].acquired)
  110. finally:
  111. platforms.create_pidlock = prev
  112. class MockDaemonContext(object):
  113. opened = False
  114. closed = False
  115. def open(self):
  116. self.__class__.opened = True
  117. def close(self):
  118. self.__class__.closed = True
  119. def create_daemon_context(*args, **kwargs):
  120. context = MockDaemonContext()
  121. return context, context.close
  122. class test_div(AppCase):
  123. def setup(self):
  124. self.prev, beatapp.Beat = beatapp.Beat, MockBeat
  125. self.ctx, celerybeat_bin.create_daemon_context = \
  126. celerybeat_bin.create_daemon_context, create_daemon_context
  127. def teardown(self):
  128. beatapp.Beat = self.prev
  129. def test_main(self):
  130. sys.argv = [sys.argv[0], "-s", "foo"]
  131. try:
  132. celerybeat_bin.main()
  133. self.assertTrue(MockBeat.running)
  134. finally:
  135. MockBeat.running = False
  136. def test_detach(self):
  137. cmd = celerybeat_bin.BeatCommand()
  138. cmd.app = app_or_default()
  139. cmd.run(detach=True)
  140. self.assertTrue(MockDaemonContext.opened)
  141. self.assertTrue(MockDaemonContext.closed)
  142. def test_parse_options(self):
  143. cmd = celerybeat_bin.BeatCommand()
  144. cmd.app = app_or_default()
  145. options, args = cmd.parse_options("celerybeat", ["-s", "foo"])
  146. self.assertEqual(options.schedule, "foo")