test_beat.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. from __future__ import absolute_import
  2. import logging
  3. import sys
  4. from collections import defaultdict
  5. from kombu.tests.utils import redirect_stdouts
  6. from mock import patch
  7. from celery import beat
  8. from celery import platforms
  9. from celery.app import app_or_default
  10. from celery.bin import beat as beat_bin
  11. from celery.apps import beat as beatapp
  12. from celery.tests.utils import AppCase
  13. class MockedShelveModule(object):
  14. shelves = defaultdict(lambda: {})
  15. def open(self, filename, *args, **kwargs):
  16. return self.shelves[filename]
  17. mocked_shelve = MockedShelveModule()
  18. class MockService(beat.Service):
  19. started = False
  20. in_sync = False
  21. persistence = mocked_shelve
  22. def start(self):
  23. self.__class__.started = True
  24. def sync(self):
  25. self.__class__.in_sync = True
  26. class MockBeat(beatapp.Beat):
  27. running = False
  28. def run(self):
  29. MockBeat.running = True
  30. class MockBeat2(beatapp.Beat):
  31. Service = MockService
  32. def install_sync_handler(self, b):
  33. pass
  34. class MockBeat3(beatapp.Beat):
  35. Service = MockService
  36. def install_sync_handler(self, b):
  37. raise TypeError('xxx')
  38. class test_Beat(AppCase):
  39. def test_loglevel_string(self):
  40. b = beatapp.Beat(loglevel='DEBUG')
  41. self.assertEqual(b.loglevel, logging.DEBUG)
  42. b2 = beatapp.Beat(loglevel=logging.DEBUG)
  43. self.assertEqual(b2.loglevel, logging.DEBUG)
  44. def test_init_loader(self):
  45. b = beatapp.Beat()
  46. b.init_loader()
  47. def test_process_title(self):
  48. b = beatapp.Beat()
  49. b.set_process_title()
  50. def test_run(self):
  51. b = MockBeat2()
  52. MockService.started = False
  53. b.run()
  54. self.assertTrue(MockService.started)
  55. def psig(self, fun, *args, **kwargs):
  56. handlers = {}
  57. class Signals(platforms.Signals):
  58. def __setitem__(self, sig, handler):
  59. handlers[sig] = handler
  60. p, platforms.signals = platforms.signals, Signals()
  61. try:
  62. fun(*args, **kwargs)
  63. return handlers
  64. finally:
  65. platforms.signals = p
  66. def test_install_sync_handler(self):
  67. b = beatapp.Beat()
  68. clock = MockService()
  69. MockService.in_sync = False
  70. handlers = self.psig(b.install_sync_handler, clock)
  71. with self.assertRaises(SystemExit):
  72. handlers['SIGINT']('SIGINT', object())
  73. self.assertTrue(MockService.in_sync)
  74. MockService.in_sync = False
  75. def test_setup_logging(self):
  76. try:
  77. # py3k
  78. delattr(sys.stdout, 'logger')
  79. except AttributeError:
  80. pass
  81. b = beatapp.Beat()
  82. b.redirect_stdouts = False
  83. b.app.log.__class__._setup = False
  84. b.setup_logging()
  85. with self.assertRaises(AttributeError):
  86. sys.stdout.logger
  87. @redirect_stdouts
  88. @patch('celery.apps.beat.logger')
  89. def test_logs_errors(self, logger, stdout, stderr):
  90. b = MockBeat3(socket_timeout=None)
  91. b.start_scheduler()
  92. self.assertTrue(logger.critical.called)
  93. @redirect_stdouts
  94. @patch('celery.platforms.create_pidlock')
  95. def test_use_pidfile(self, create_pidlock, stdout, stderr):
  96. b = MockBeat2(pidfile='pidfilelockfilepid', socket_timeout=None)
  97. b.start_scheduler()
  98. self.assertTrue(create_pidlock.called)
  99. class MockDaemonContext(object):
  100. opened = False
  101. closed = False
  102. def __init__(self, *args, **kwargs):
  103. pass
  104. def open(self):
  105. self.__class__.opened = True
  106. return self
  107. __enter__ = open
  108. def close(self, *args):
  109. self.__class__.closed = True
  110. __exit__ = close
  111. class test_div(AppCase):
  112. def setup(self):
  113. self.prev, beatapp.Beat = beatapp.Beat, MockBeat
  114. self.ctx, beat_bin.detached = \
  115. beat_bin.detached, MockDaemonContext
  116. def teardown(self):
  117. beatapp.Beat = self.prev
  118. def test_main(self):
  119. sys.argv = [sys.argv[0], '-s', 'foo']
  120. try:
  121. beat_bin.main()
  122. self.assertTrue(MockBeat.running)
  123. finally:
  124. MockBeat.running = False
  125. def test_detach(self):
  126. cmd = beat_bin.BeatCommand()
  127. cmd.app = app_or_default()
  128. cmd.run(detach=True)
  129. self.assertTrue(MockDaemonContext.opened)
  130. self.assertTrue(MockDaemonContext.closed)
  131. def test_parse_options(self):
  132. cmd = beat_bin.BeatCommand()
  133. cmd.app = app_or_default()
  134. options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
  135. self.assertEqual(options.schedule, 'foo')