Browse Source

Refactored celery.bin.celerybeat into ~ + celery.apps.beat

Ask Solem 14 years ago
parent
commit
44f004c2a1
5 changed files with 149 additions and 129 deletions
  1. 4 2
      bin/celerybeat
  2. 104 0
      celery/apps/beat.py
  3. 3 1
      celery/bin/base.py
  4. 38 125
      celery/bin/celerybeat.py
  5. 0 1
      celery/bin/celeryd.py

+ 4 - 2
bin/celerybeat

@@ -1,6 +1,8 @@
 #!/usr/bin/env python
 from celery.bin import celerybeat
 
+def main():
+    celerybeat.main()
+
 if __name__ == "__main__":
-    options = celerybeat.parse_options(sys.argv[1:])
-    celerybeat.run_celerybeat(**vars(options))
+    main()

+ 104 - 0
celery/apps/beat.py

@@ -0,0 +1,104 @@
+import logging
+import sys
+import traceback
+
+from celery import __version__
+from celery import beat
+from celery import platform
+from celery.log import emergency_error
+from celery.utils import info, LOG_LEVELS
+
+STARTUP_INFO_FMT = """
+Configuration ->
+    . broker -> %(conninfo)s
+    . schedule -> %(schedule)s
+    . logfile -> %(logfile)s@%(loglevel)s
+""".strip()
+
+
+class Beat(object):
+    Service = beat.Service
+
+    def __init__(self, loglevel=None, logfile=None, schedule=None,
+            max_interval=None, scheduler_cls=None, defaults=None, **kwargs):
+        """Starts the celerybeat task scheduler."""
+
+        self.defaults = defaults
+        if self.defaults is None:
+            from celery import conf
+            self.defaults = conf
+
+        self.loglevel = loglevel or defaults.CELERYBEAT_LOG_LEVEL
+        self.logfile = logfile or defaults.CELERYBEAT_LOG_FILE
+        self.schedule = schedule or defaults.CELERYBEAT_SCHEDULE_FILENAME
+        self.scheduler_cls = scheduler_cls
+        self.max_interval = max_interval
+
+        if not isinstance(self.loglevel, int):
+            self.loglevel = LOG_LEVELS[self.loglevel.upper()]
+
+    def run(self):
+        logger = self.setup_logging()
+        print("celerybeat v%s is starting." % __version__)
+        self.init_loader()
+        print(self.startup_info())
+        self.set_process_title()
+        print("celerybeat has started.")
+        self.start_scheduler(logger)
+
+    def setup_logging(self):
+        from celery import log
+        handled = log.setup_logging_subsystem(loglevel=self.loglevel,
+                                              logfile=self.logfile)
+        if not handled:
+            logger = log.get_default_logger(name="celery.beat")
+            log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
+        return logger
+
+    def start_scheduler(self, logger=None):
+        beat = self.Service(logger=logger,
+                            max_interval=self.max_interval,
+                            scheduler_cls=self.scheduler_cls,
+                            schedule_filename=self.schedule)
+
+        try:
+            self.install_sync_handler(beat)
+            beat.start()
+        except Exception, exc:
+            emergency_error(self.logfile,
+                    "celerybeat raised exception %s: %s\n%s" % (
+                            exc.__class__, exc, traceback.format_exc()))
+
+    def init_loader(self):
+        # Run the worker init handler.
+        # (Usually imports task modules and such.)
+        from celery.loaders import current_loader
+        current_loader().init_worker()
+
+    def startup_info(self):
+        return STARTUP_INFO_FMT % {
+            "conninfo": info.format_broker_info(),
+            "logfile": self.logfile or "@stderr",
+            "loglevel": LOG_LEVELS[self.loglevel],
+            "schedule": self.schedule,
+        }
+
+    def set_process_title(self):
+        arg_start = "manage" in sys.argv[0] and 2 or 1
+        platform.set_process_title("celerybeat",
+                               info=" ".join(sys.argv[arg_start:]))
+
+    def install_sync_handler(self, beat):
+        """Install a ``SIGTERM`` + ``SIGINT`` handler that saves
+        the celerybeat schedule."""
+
+        def _sync(signum, frame):
+            beat.sync()
+            raise SystemExit()
+
+        platform.install_signal_handler("SIGTERM", _sync)
+        platform.install_signal_handler("SIGINT", _sync)
+
+
+def run_celerybeat(*args, **kwargs):
+    return Beat(*args, **kwargs).run()

+ 3 - 1
celery/bin/base.py

@@ -3,10 +3,12 @@ import sys
 
 from optparse import OptionParser, make_option as Option
 
+from celery import __version__
+
 
 class Command(object):
     args = ''
-    version = ''
+    version = __version__
     option_list = ()
 
     Parser = OptionParser

+ 38 - 125
celery/bin/celerybeat.py

@@ -22,134 +22,47 @@
     ``ERROR``, ``CRITICAL``, or ``FATAL``.
 
 """
-import sys
-import traceback
-
-from optparse import OptionParser, make_option as Option
-
-import celery
-from celery import beat
-from celery import conf
-from celery import platform
-from celery.log import emergency_error
-from celery.utils import info
-
-STARTUP_INFO_FMT = """
-Configuration ->
-    . broker -> %(conninfo)s
-    . schedule -> %(schedule)s
-    . logfile -> %(logfile)s@%(loglevel)s
-""".strip()
-
-OPTION_LIST = (
-    Option('-s', '--schedule',
-        default=conf.CELERYBEAT_SCHEDULE_FILENAME,
-        action="store", dest="schedule",
-        help="Path to the schedule database. The extension "
-             "'.db' will be appended to the filename. Default: %s" % (
-                    conf.CELERYBEAT_SCHEDULE_FILENAME, )),
-    Option('--max-interval',
-        default=3600, type="int", dest="max_interval",
-        help="Maximum time to sleep between re-reading the schedule."),
-    Option('-S', '--scheduler',
-        default=None,
-        action="store", dest="scheduler_cls",
-        help="Scheduler class. Default is celery.beat.PersistentScheduler"),
-    Option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
-        action="store", dest="logfile",
-        help="Path to log file."),
-    Option('-l', '--loglevel',
-        default=conf.CELERYBEAT_LOG_LEVEL,
-        action="store", dest="loglevel",
-        help="Loglevel. One of DEBUG/INFO/WARNING/ERROR/CRITICAL."),
-)
-
-
-class Beat(object):
-    Service = beat.Service
-
-    def __init__(self, loglevel=None, logfile=None, schedule=None,
-            max_interval=None, scheduler_cls=None, defaults=conf, **kwargs):
-        """Starts the celerybeat task scheduler."""
-
-        self.loglevel = loglevel or defaults.CELERYBEAT_LOG_LEVEL
-        self.logfile = logfile or defaults.CELERYBEAT_LOG_FILE
-        self.schedule = schedule or defaults.CELERYBEAT_SCHEDULE_FILENAME
-        self.scheduler_cls = scheduler_cls
-        self.max_interval = max_interval
-
-        if not isinstance(self.loglevel, int):
-            self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
-
-    def run(self):
-        print("celerybeat %s is starting." % celery.__version__)
-        self.init_loader()
-        print(self.startup_info())
-        self.set_process_title()
-        print("celerybeat has started.")
-        self.start_scheduler()
-
-    def start_scheduler(self):
-        from celery.log import setup_logger
-        logger = setup_logger(self.loglevel, self.logfile, name="celery.beat")
-        beat = self.Service(logger=logger,
-                            max_interval=self.max_interval,
-                            scheduler_cls=self.scheduler_cls,
-                            schedule_filename=self.schedule)
-
-        try:
-            self.install_sync_handler(beat)
-            beat.start()
-        except Exception, exc:
-            emergency_error(self.logfile,
-                    "celerybeat raised exception %s: %s\n%s" % (
-                            exc.__class__, exc, traceback.format_exc()))
-
-    def init_loader(self):
-        # Run the worker init handler.
-        # (Usually imports task modules and such.)
-        from celery.loaders import current_loader
-        current_loader().init_worker()
-
-    def startup_info(self):
-        return STARTUP_INFO_FMT % {
-            "conninfo": info.format_broker_info(),
-            "logfile": self.logfile or "@stderr",
-            "loglevel": conf.LOG_LEVELS[self.loglevel],
-            "schedule": self.schedule,
-        }
-
-    def set_process_title(self):
-        arg_start = "manage" in sys.argv[0] and 2 or 1
-        platform.set_process_title("celerybeat",
-                               info=" ".join(sys.argv[arg_start:]))
-
-    def install_sync_handler(self, beat):
-        """Install a ``SIGTERM`` + ``SIGINT`` handler that saves
-        the celerybeat schedule."""
-
-        def _sync(signum, frame):
-            beat.sync()
-            raise SystemExit()
-
-        platform.install_signal_handler("SIGTERM", _sync)
-        platform.install_signal_handler("SIGINT", _sync)
-
-
-def parse_options(arguments):
-    """Parse the available options to ``celeryd``."""
-    parser = OptionParser(option_list=OPTION_LIST)
-    options, values = parser.parse_args(arguments)
-    return options
-
-
-def run_celerybeat(**options):
-    Beat(**options).run()
+from celery.bin.base import Command, Option
+
+
+class BeatCommand(Command):
+
+    def run(self, *args, **kwargs):
+        from celery.apps.beat import Beat
+        kwargs["defaults"] = self.defaults
+        return Beat(*args, **kwargs).run()
+
+    def get_options(self):
+        conf = self.defaults
+
+        return (
+            Option('-s', '--schedule',
+                default=conf.CELERYBEAT_SCHEDULE_FILENAME,
+                action="store", dest="schedule",
+                help="Path to the schedule database. The extension "
+                    "'.db' will be appended to the filename. Default: %s" % (
+                            conf.CELERYBEAT_SCHEDULE_FILENAME, )),
+            Option('--max-interval',
+                default=3600.0, type="float", dest="max_interval",
+                help="Max. seconds to sleep between schedule iterations."),
+            Option('-S', '--scheduler',
+                default=None,
+                action="store", dest="scheduler_cls",
+                help="Scheduler class. Default is "
+                     "celery.beat.PersistentScheduler"),
+            Option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
+                action="store", dest="logfile",
+                help="Path to log file."),
+            Option('-l', '--loglevel',
+                default=conf.CELERYBEAT_LOG_LEVEL,
+                action="store", dest="loglevel",
+                help="Loglevel. One of DEBUG/INFO/WARNING/ERROR/CRITICAL."),
+        )
 
 
 def main():
-    options = parse_options(sys.argv[1:])
-    run_celerybeat(**vars(options))
+    beat = BeatCommand()
+    beat.execute_from_commandline()
 
 if __name__ == "__main__":
     main()

+ 0 - 1
celery/bin/celeryd.py

@@ -74,7 +74,6 @@ from celery.bin.base import Command, Option
 
 
 class WorkerCommand(Command):
-    version = __version__
 
     def run(self, *args, **kwargs):
         from celery.apps.worker import Worker