test_celerybeat.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. import logging
  2. import sys
  3. from collections import defaultdict
  4. from kombu.tests.utils import redirect_stdouts
  5. from celery import beat
  6. from celery import platforms
  7. from celery.app import app_or_default
  8. from celery.bin import celerybeat as celerybeat_bin
  9. from celery.apps import beat as beatapp
  10. from celery.tests.utils import AppCase
  11. class MockedShelveModule(object):
  12. shelves = defaultdict(lambda: {})
  13. def open(self, filename, *args, **kwargs):
  14. return self.shelves[filename]
  15. mocked_shelve = MockedShelveModule()
  16. class MockService(beat.Service):
  17. started = False
  18. in_sync = False
  19. persistence = mocked_shelve
  20. def start(self):
  21. self.__class__.started = True
  22. def sync(self):
  23. self.__class__.in_sync = True
  24. class MockBeat(beatapp.Beat):
  25. running = False
  26. def run(self):
  27. self.__class__.running = True
  28. class MockBeat2(beatapp.Beat):
  29. Service = MockService
  30. def install_sync_handler(self, b):
  31. pass
  32. class MockBeat3(beatapp.Beat):
  33. Service = MockService
  34. def install_sync_handler(self, b):
  35. raise TypeError("xxx")
  36. class test_Beat(AppCase):
  37. def test_loglevel_string(self):
  38. b = beatapp.Beat(loglevel="DEBUG")
  39. self.assertEqual(b.loglevel, logging.DEBUG)
  40. b2 = beatapp.Beat(loglevel=logging.DEBUG)
  41. self.assertEqual(b2.loglevel, logging.DEBUG)
  42. def test_init_loader(self):
  43. b = beatapp.Beat()
  44. b.init_loader()
  45. def test_process_title(self):
  46. b = beatapp.Beat()
  47. b.set_process_title()
  48. def test_run(self):
  49. b = MockBeat2()
  50. MockService.started = False
  51. b.run()
  52. self.assertTrue(MockService.started)
  53. def psig(self, fun, *args, **kwargs):
  54. handlers = {}
  55. def i(sig=None, handler=None, **sigmap):
  56. if sig:
  57. sigmap[sig] = handler
  58. handlers.update(sigmap)
  59. p, platforms.install_signal_handler = \
  60. platforms.install_signal_handler, i
  61. try:
  62. fun(*args, **kwargs)
  63. return handlers
  64. finally:
  65. platforms.install_signal_handler = p
  66. def test_install_sync_handler(self):
  67. b = beatapp.Beat()
  68. clock = MockService()
  69. MockService.in_sync = False
  70. handlers = self.psig(b.install_sync_handler, clock)
  71. self.assertRaises(SystemExit, handlers["SIGINT"],
  72. "SIGINT", object())
  73. self.assertTrue(MockService.in_sync)
  74. MockService.in_sync = False
  75. def test_setup_logging(self):
  76. b = beatapp.Beat()
  77. b.redirect_stdouts = False
  78. b.setup_logging()
  79. self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
  80. @redirect_stdouts
  81. def test_logs_errors(self, stdout, stderr):
  82. class MockLogger(object):
  83. _critical = []
  84. def debug(self, *args, **kwargs):
  85. pass
  86. def critical(self, msg, *args, **kwargs):
  87. self._critical.append(msg)
  88. logger = MockLogger()
  89. b = MockBeat3(socket_timeout=None)
  90. b.start_scheduler(logger)
  91. self.assertTrue(logger._critical)
  92. @redirect_stdouts
  93. def test_use_pidfile(self, stdout, stderr):
  94. from celery import platforms
  95. class create_pidlock(object):
  96. instance = [None]
  97. def __init__(self, file):
  98. self.file = file
  99. self.instance[0] = self
  100. def acquire(self):
  101. self.acquired = True
  102. class Object(object):
  103. def release(self):
  104. pass
  105. return Object()
  106. prev, platforms.create_pidlock = platforms.create_pidlock, \
  107. create_pidlock
  108. try:
  109. b = MockBeat2(pidfile="pidfilelockfilepid", socket_timeout=None)
  110. b.start_scheduler()
  111. self.assertTrue(create_pidlock.instance[0].acquired)
  112. finally:
  113. platforms.create_pidlock = prev
  114. class MockDaemonContext(object):
  115. opened = False
  116. closed = False
  117. def open(self):
  118. self.__class__.opened = True
  119. def close(self):
  120. self.__class__.closed = True
  121. def create_daemon_context(*args, **kwargs):
  122. context = MockDaemonContext()
  123. return context, context.close
  124. class test_div(AppCase):
  125. def setup(self):
  126. self.prev, beatapp.Beat = beatapp.Beat, MockBeat
  127. self.ctx, celerybeat_bin.create_daemon_context = \
  128. celerybeat_bin.create_daemon_context, create_daemon_context
  129. def teardown(self):
  130. beatapp.Beat = self.prev
  131. def test_main(self):
  132. sys.argv = [sys.argv[0], "-s", "foo"]
  133. try:
  134. celerybeat_bin.main()
  135. self.assertTrue(MockBeat.running)
  136. finally:
  137. MockBeat.running = False
  138. def test_detach(self):
  139. cmd = celerybeat_bin.BeatCommand()
  140. cmd.app = app_or_default()
  141. cmd.run(detach=True)
  142. self.assertTrue(MockDaemonContext.opened)
  143. self.assertTrue(MockDaemonContext.closed)
  144. def test_parse_options(self):
  145. cmd = celerybeat_bin.BeatCommand()
  146. cmd.app = app_or_default()
  147. options, args = cmd.parse_options("celerybeat", ["-s", "foo"])
  148. self.assertEqual(options.schedule, "foo")