ソースを参照

celery.beat.ClockService is now run by the celerybeat program

Ask Solem 15 年 前
コミット
e263b8115b
4 ファイル変更74 行追加97 行削除
  1. 50 17
      celery/beat.py
  2. 12 66
      celery/bin/celerybeat.py
  3. 7 14
      celery/bin/celeryd.py
  4. 5 0
      celery/platform.py

+ 50 - 17
celery/beat.py

@@ -1,14 +1,58 @@
 from UserDict import UserDict
 from datetime import datetime
 from celery import registry
+from celery.log import setup_logger
 import shelve
 import atexit
 import time
+import threading
 
 schedule = shelve.open(filename="celerybeat-schedule")
 atexit.register(schedule.close)
 
 
+class ClockService(object):
+    scheduler_cls = Scheduler
+    schedule = schedule
+    registry = registry.tasks
+
+    def __init__(self, loglevel, logfile, is_detached=False):
+        self.logger = setup_logger(loglevel, logfile)
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+
+    def start(self):
+        scheduler = self.scheduler_cls(schedule=self.schedule,
+                                       registry=self.registry)
+
+        try:
+            while True:
+                if self._shutdown.isSet():
+                    break
+                scheduler.tick()
+                time.sleep(scheduler.interval)
+        finally:
+            scheduler.stop()
+            self._stopped.set()
+
+    def stop(self, wait=False):
+        self._shutdown.set()
+        wait and self._stopped.wait() # block until shutdown done.
+
+
+class ClockServiceThread(threading.Thread):
+
+    def __init__(self, *args, **kwargs):
+        self.clockservice = ClockService(*args, **kwargs)
+        self.setDaemon(True)
+
+    def run(self):
+        return self.clockservice.start()
+
+    def stop(self):
+       return self.clockservice.stop(wait=True)
+
+
 class ScheduleEntry(object):
     """An entry in the scheduler.
 
@@ -48,26 +92,15 @@ class Scheduler(UserDict):
         persistent schedule ``celery.beat.schedule``.
 
     """
+    interval = 1
 
-    registry = registry.tasks
-    data = schedule
-
-    def __init__(self, registry=None, schedule=None):
-        self.registry = registry or self.registry
-        self.data = schedule or self.data
+    def __init__(self, registry=None, schedule=None, interval=None):
+        self.registry = registry or {}
+        self.data = schedule or {}
+        if interval is not None:
+            self.interval = interval
         self.schedule_registry()
 
-    def run(self):
-        """Run the scheduler.
-
-        This runs :meth:`tick` every second in a never-exit loop."""
-        try:
-            while True:
-                self.tick() 
-                time.sleep(1)
-        finally:
-            self.stop()
-
     def stop(self):
         self.schedule.close()
 

+ 12 - 66
celery/bin/celerybeat.py

@@ -48,10 +48,8 @@ from celery.loaders import settings
 from celery import __version__
 from celery.log import emergency_error
 from celery import conf
-from celery import discovery
-from celery.task import discard_all
-from celery.worker import WorkController
 from celery import platform
+from celery.beat import ClockService
 import traceback
 import optparse
 
@@ -94,7 +92,7 @@ OPTION_LIST = (
     )
 
 
-def run_clock(detach=False, loglevel=conf.DAEMON_LOG_LEVEL,
+def run_clockserver(detach=False, loglevel=conf.DAEMON_LOG_LEVEL,
         logfile=conf.DAEMON_LOG_FILE, pidfile=conf.DAEMON_PID_FILE,
         umask=0, uid=None, gid=None, working_directory=None, chroot=None,
         **kwargs):
@@ -102,36 +100,12 @@ def run_clock(detach=False, loglevel=conf.DAEMON_LOG_LEVEL,
 
     print("Celery Beat %s is starting." % __version__)
 
-    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
-    # it) lets the parent wait() for the terminated child process and stops
-    # the 'OSError: [Errno 10] No child processes' problem.
-    platform.reset_signal("SIGCLD")
-
-    if statistics is not None:
-        settings.CELERY_STATISTICS = statistics
-
-
-    if conf.CELERY_BACKEND == "database" \
-            and settings.DATABASE_ENGINE == "sqlite3" and \
-            concurrency > 1:
-        import warnings
-        warnings.warn("The sqlite3 database engine doesn't support "
-                "concurrency. We'll be using a single process only.",
-                UserWarning)
-        concurrency = 1
-
     # Setup logging
     if not isinstance(loglevel, int):
         loglevel = conf.LOG_LEVELS[loglevel.upper()]
     if not detach:
         logfile = None # log to stderr when not running in the background.
 
-    if discard:
-        discarded_count = discard_all()
-        what = discarded_count > 1 and "messages" or "message"
-        print("discard: Erased %d %s from the queue.\n" % (
-                discarded_count, what))
-
     # Dump configuration to screen so we have some basic information
     # when users sends e-mails.
     print(STARTUP_INFO_FMT % {
@@ -143,13 +117,11 @@ def run_clock(detach=False, loglevel=conf.DAEMON_LOG_LEVEL,
             "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
             "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
             "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
-            "concurrency": concurrency,
             "loglevel": loglevel,
             "pidfile": pidfile,
-            "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
     })
 
-    print("Celery has started.")
+    print("Celery Beat has started.")
     if detach:
         from celery.log import setup_logger, redirect_stdouts_to_logger
         context = platform.create_daemon_context(logfile, pidfile,
@@ -166,52 +138,26 @@ def run_clock(detach=False, loglevel=conf.DAEMON_LOG_LEVEL,
     # (Usually imports task modules and such.)
     current_loader.on_worker_init()
 
-    def run_worker():
-        worker = WorkController(concurrency=concurrency,
-                                loglevel=loglevel,
-                                logfile=logfile,
-                                is_detached=detach)
-
-        # Install signal handler that restarts celeryd on SIGHUP,
-        # (only on POSIX systems)
-        install_worker_restart_handler(worker)
+    def _run_clock():
+        clockservice = ClockService(loglevel=loglevel,
+                                    logfile=logfile,
+                                    is_detached=detach)
 
         try:
-            worker.start()
+            clockservice.start()
         except Exception, e:
-            emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
+            emergency_error(logfile,
+                    "celerybeat raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))
 
     try:
-        if supervised:
-            OFASupervisor(target=run_worker).start()
-        else:
-            run_worker()
+        _run_clock()
     except:
         if detach:
             context.close()
         raise
 
 
-def install_worker_restart_handler(worker):
-
-    def restart_worker_sig_handler(signum, frame):
-        """Signal handler restarting the current python program."""
-        worker.logger.info("Restarting celeryd (%s)" % (
-            " ".join(sys.argv)))
-        if worker.is_detached:
-            pid = os.fork()
-            if pid:
-                worker.stop()
-                sys.exit(0)
-        else:
-            worker.stop()
-        os.execv(sys.executable, [sys.executable] + sys.argv)
-
-    platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
-
-
-
 def parse_options(arguments):
     """Parse the available options to ``celeryd``."""
     parser = optparse.OptionParser(option_list=OPTION_LIST)
@@ -221,4 +167,4 @@ def parse_options(arguments):
 
 if __name__ == "__main__":
     options = parse_options(sys.argv[1:])
-    run_worker(**vars(options))
+    run_clockserver(**vars(options))

+ 7 - 14
celery/bin/celeryd.py

@@ -63,19 +63,17 @@
 """
 import os
 import sys
-from celery.loaders import current_loader
-from celery.loaders import settings
+import multiprocessing
+import traceback
+import optparse
 from celery import __version__
-from celery.supervisor import OFASupervisor
-from celery.log import emergency_error
 from celery import conf
-from celery import discovery
+from celery import platform
+from celery.log import emergency_error
 from celery.task import discard_all
 from celery.worker import WorkController
-from celery import platform
-import multiprocessing
-import traceback
-import optparse
+from celery.loaders import current_loader, settings
+from celery.supervisor import OFASupervisor
 
 USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
 # Make sure the setting exists.
@@ -145,11 +143,6 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
 
     print("Celery %s is starting." % __version__)
 
-    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
-    # it) lets the parent wait() for the terminated child process and stops
-    # the 'OSError: [Errno 10] No child processes' problem.
-    platform.reset_signal("SIGCLD")
-
     if statistics is not None:
         settings.CELERY_STATISTICS = statistics
 

+ 5 - 0
celery/platform.py

@@ -53,6 +53,11 @@ def create_daemon_context(logfile=None, pidfile=None, **options):
 
     from daemon import DaemonContext
 
+    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
+    # it) lets the parent wait() for the terminated child process and stops
+    # the 'OSError: [Errno 10] No child processes' problem.
+    platform.reset_signal("SIGCLD")
+
     # Since without stderr any errors will be silently suppressed,
     # we need to know that we have access to the logfile
     if logfile: