|
@@ -53,64 +53,73 @@ OPTION_LIST = (
|
|
|
)
|
|
|
|
|
|
|
|
|
-def run_clockservice(loglevel=conf.CELERYBEAT_LOG_LEVEL,
|
|
|
- logfile=conf.CELERYBEAT_LOG_FILE,
|
|
|
- schedule=conf.CELERYBEAT_SCHEDULE_FILENAME, **kwargs):
|
|
|
- """Starts the celerybeat clock server."""
|
|
|
-
|
|
|
- print("celerybeat %s is starting." % celery.__version__)
|
|
|
-
|
|
|
- # Setup logging
|
|
|
- if not isinstance(loglevel, int):
|
|
|
- loglevel = conf.LOG_LEVELS[loglevel.upper()]
|
|
|
-
|
|
|
- # Run the worker init handler.
|
|
|
- # (Usually imports task modules and such.)
|
|
|
- from celery.loaders import current_loader
|
|
|
- current_loader().init_worker()
|
|
|
-
|
|
|
-
|
|
|
- # Dump configuration to screen so we have some basic information
|
|
|
- # when users sends e-mails.
|
|
|
-
|
|
|
- print(STARTUP_INFO_FMT % {
|
|
|
- "conninfo": info.format_broker_info(),
|
|
|
- "logfile": logfile or "@stderr",
|
|
|
- "loglevel": conf.LOG_LEVELS[loglevel],
|
|
|
- "schedule": schedule,
|
|
|
- })
|
|
|
-
|
|
|
- print("celerybeat has started.")
|
|
|
- arg_start = "manage" in sys.argv[0] and 2 or 1
|
|
|
- platform.set_process_title("celerybeat",
|
|
|
- info=" ".join(sys.argv[arg_start:]))
|
|
|
-
|
|
|
- def _run_clock():
|
|
|
+class Beat(object):
|
|
|
+
|
|
|
+ def __init__(self, loglevel=conf.CELERYBEAT_LOG_LEVEL,
|
|
|
+ logfile=conf.CELERYBEAT_LOG_FILE,
|
|
|
+ schedule=conf.CELERYBEAT_SCHEDULE_FILENAME, **kwargs):
|
|
|
+ """Starts the celerybeat task scheduler."""
|
|
|
+
|
|
|
+ self.loglevel = loglevel
|
|
|
+ self.logfile = logfile
|
|
|
+ self.schedule = schedule
|
|
|
+ # Setup logging
|
|
|
+ if not isinstance(self.loglevel, int):
|
|
|
+ self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ print("celerybeat %s is starting." % celery.__version__)
|
|
|
+ self.init_loader()
|
|
|
+ print(self.startup_info())
|
|
|
+ self.set_process_title()
|
|
|
+ print("celerybeat has started.")
|
|
|
+ self.start_scheduler()
|
|
|
+
|
|
|
+ def start_scheduler(self):
|
|
|
from celery.log import setup_logger
|
|
|
- logger = setup_logger(loglevel, logfile)
|
|
|
- clockservice = ClockService(logger=logger, schedule_filename=schedule)
|
|
|
+ logger = setup_logger(self.loglevel, self.logfile)
|
|
|
+ beat = ClockService(logger,
|
|
|
+ schedule_filename=self.schedule)
|
|
|
|
|
|
try:
|
|
|
- install_sync_handler(clockservice)
|
|
|
- clockservice.start()
|
|
|
- except Exception, e:
|
|
|
+ self.install_sync_handler(beat)
|
|
|
+ beat.start()
|
|
|
+ except Exception, exc:
|
|
|
emergency_error(logfile,
|
|
|
"celerybeat raised exception %s: %s\n%s" % (
|
|
|
- e.__class__, e, traceback.format_exc()))
|
|
|
+ exc.__class__, exc, traceback.format_exc()))
|
|
|
+
|
|
|
+ def init_loader(self):
|
|
|
+ # Run the worker init handler.
|
|
|
+ # (Usually imports task modules and such.)
|
|
|
+ from celery.loaders import current_loader
|
|
|
+ current_loader().init_worker()
|
|
|
|
|
|
- _run_clock()
|
|
|
+ def startup_info(self):
|
|
|
+ return STARTUP_INFO_FMT % {
|
|
|
+ "conninfo": info.format_broker_info(),
|
|
|
+ "logfile": self.logfile or "@stderr",
|
|
|
+ "loglevel": conf.LOG_LEVELS[self.loglevel],
|
|
|
+ "schedule": self.schedule,
|
|
|
+ }
|
|
|
+
|
|
|
+ def set_process_title(self):
|
|
|
+ arg_start = "manage" in sys.argv[0] and 2 or 1
|
|
|
+ platform.set_process_title("celerybeat",
|
|
|
+ info=" ".join(sys.argv[arg_start:]))
|
|
|
|
|
|
+ def install_sync_handler(self, beat):
|
|
|
+ """Install a ``SIGTERM`` + ``SIGINT`` handler that saves
|
|
|
+ the celerybeat schedule."""
|
|
|
|
|
|
-def install_sync_handler(beat):
|
|
|
- """Install a ``SIGTERM`` + ``SIGINT`` handler that saves
|
|
|
- the celerybeat schedule."""
|
|
|
+ def _sync(signum, frame):
|
|
|
+ beat.sync()
|
|
|
+ raise SystemExit()
|
|
|
+
|
|
|
+ platform.install_signal_handler("SIGTERM", _sync)
|
|
|
+ platform.install_signal_handler("SIGINT", _sync)
|
|
|
|
|
|
- def _sync(signum, frame):
|
|
|
- beat.sync()
|
|
|
- raise SystemExit()
|
|
|
|
|
|
- platform.install_signal_handler("SIGTERM", _sync)
|
|
|
- platform.install_signal_handler("SIGINT", _sync)
|
|
|
|
|
|
|
|
|
def parse_options(arguments):
|
|
@@ -120,9 +129,13 @@ def parse_options(arguments):
|
|
|
return options
|
|
|
|
|
|
|
|
|
+def run_celerybeat(**options):
|
|
|
+ Beat(**options).run()
|
|
|
+
|
|
|
+
|
|
|
def main():
|
|
|
options = parse_options(sys.argv[1:])
|
|
|
- run_clockservice(**vars(options))
|
|
|
+ run_celerybeat(**vars(options))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|