浏览代码

Getting closer to a working celerybeat

Ask Solem 15 年之前
父节点
当前提交
5a3746e591
共有 3 个文件被更改,包括 117 次插入65 次删除
  1. 55 59
      celery/beat.py
  2. 8 6
      celery/bin/celerybeat.py
  3. 54 0
      celery/conf.py

+ 55 - 59
celery/beat.py

@@ -1,62 +1,19 @@
+import time
+import shelve
+import atexit
+import threading
 from UserDict import UserDict
 from datetime import datetime
+from celery import conf
 from celery import registry
 from celery.log import setup_logger
-import shelve
-import atexit
-import time
-import threading
 
-schedule = shelve.open(filename="celerybeat-schedule")
-atexit.register(schedule.close)
 
 
 class SchedulingError(Exception):
     """An error occured while scheduling task."""
 
 
-class ClockService(object):
-    scheduler_cls = Scheduler
-    schedule = schedule
-    registry = registry.tasks
-
-    def __init__(self, loglevel, logfile, is_detached=False):
-        self.logger = setup_logger(loglevel, logfile)
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
-
-    def start(self):
-        scheduler = self.scheduler_cls(schedule=self.schedule,
-                                       registry=self.registry)
-
-        try:
-            while True:
-                if self._shutdown.isSet():
-                    break
-                scheduler.tick()
-                time.sleep(scheduler.interval)
-        finally:
-            scheduler.stop()
-            self._stopped.set()
-
-    def stop(self, wait=False):
-        self._shutdown.set()
-        wait and self._stopped.wait() # block until shutdown done.
-
-
-class ClockServiceThread(threading.Thread):
-
-    def __init__(self, *args, **kwargs):
-        self.clockservice = ClockService(*args, **kwargs)
-        self.setDaemon(True)
-
-    def run(self):
-        self.clockservice.start()
-
-    def stop(self):
-        self.clockservice.stop(wait=True)
-
-
 class ScheduleEntry(object):
     """An entry in the scheduler.
 
@@ -67,10 +24,10 @@ class ScheduleEntry(object):
 
     """
 
-    def __init__(self, task, last_run_at=None, total_run_count=None)
+    def __init__(self, task, last_run_at=None, total_run_count=None):
         self.task = task
-        self.last_run_at = None
-        self.total_run_count = None
+        self.last_run_at = last_run_at or datetime.now()
+        self.total_run_count = total_run_count or 0
 
     def execute(self):
         # Increment timestamps and counts before executing,
@@ -87,8 +44,7 @@ class ScheduleEntry(object):
         return result
 
     def is_due(self):
-        run_at = self.last_run_at + self.task.run_every
-        return True if datetime.now() > self.last_run_at else return False
+        return datetime.now() > (self.last_run_at + self.task.run_every)
 
 
 class Scheduler(UserDict):
@@ -115,9 +71,6 @@ class Scheduler(UserDict):
         return [(entry.task, entry.execute())
                     for entry in self.get_due_tasks()]
 
-    def stop(self):
-        self.schedule.close()
-
     def get_due_tasks(self):
         """Get all the schedule entries that are due to execution."""
         return filter(lambda entry: entry.is_due(), self.schedule.values())
@@ -125,10 +78,53 @@ class Scheduler(UserDict):
     def schedule_registry(self):
         """Add the current contents of the registry to the schedule."""
         periodic_tasks = self.registry.get_all_periodic()
-        schedule = dict((name, tasktimetuple(task))
-                            for name, task in periodic_tasks.items())
-        self.schedule = dict(schedule, **self.schedule)
+        for name, task in self.registry.get_all_periodic().items():
+            self.schedule.setdefault(name, ScheduleEntry(task))
 
     @property
     def schedule(self):
         return self.data
+
+
+class ClockService(object):
+    scheduler_cls = Scheduler
+    schedule_filename = conf.CELERYBEAT_SCHEDULE_FILENAME
+    registry = registry.tasks
+
+    def __init__(self, loglevel, logfile, is_detached=False):
+        self.logger = setup_logger(loglevel, logfile)
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+
+    def start(self):
+        schedule = shelve.open(filename=self.schedule_filename)
+        atexit.register(schedule.close)
+        scheduler = self.scheduler_cls(schedule=schedule,
+                                       registry=self.registry)
+
+        try:
+            while True:
+                if self._shutdown.isSet():
+                    break
+                scheduler.tick()
+                time.sleep(scheduler.interval)
+        finally:
+            schedule.close()
+            self._stopped.set()
+
+    def stop(self, wait=False):
+        self._shutdown.set()
+        wait and self._stopped.wait() # block until shutdown done.
+
+
+class ClockServiceThread(threading.Thread):
+
+    def __init__(self, *args, **kwargs):
+        self.clockservice = ClockService(*args, **kwargs)
+        self.setDaemon(True)
+
+    def run(self):
+        self.clockservice.start()
+
+    def stop(self):
+        self.clockservice.stop(wait=True)

+ 8 - 6
celery/bin/celerybeat.py

@@ -60,13 +60,15 @@ Configuration ->
 """.strip()
 
 OPTION_LIST = (
-    optparse.make_option('-f', '--logfile', default=conf.DAEMON_LOG_FILE,
+    optparse.make_option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
             action="store", dest="logfile",
             help="Path to log file."),
-    optparse.make_option('-l', '--loglevel', default=conf.DAEMON_LOG_LEVEL,
+    optparse.make_option('-l', '--loglevel',
+            default=conf.CELERYBEAT_LOG_LEVEL,
             action="store", dest="loglevel",
             help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
-    optparse.make_option('-p', '--pidfile', default=conf.DAEMON_PID_FILE,
+    optparse.make_option('-p', '--pidfile',
+            default=conf.CELERYBEAT_PID_FILE,
             action="store", dest="pidfile",
             help="Path to pidfile."),
     optparse.make_option('-d', '--detach', '--daemon', default=False,
@@ -90,8 +92,8 @@ OPTION_LIST = (
     )
 
 
-def run_clockserver(detach=False, loglevel=conf.DAEMON_LOG_LEVEL,
-        logfile=conf.DAEMON_LOG_FILE, pidfile=conf.DAEMON_PID_FILE,
+def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
+        logfile=conf.CELERYBEAT_LOG_FILE, pidfile=conf.CELERYBEAT_PID_FILE,
         umask=0, uid=None, gid=None, working_directory=None, chroot=None,
         **kwargs):
     """Starts the celerybeat clock server."""
@@ -165,4 +167,4 @@ def parse_options(arguments):
 
 if __name__ == "__main__":
     options = parse_options(sys.argv[1:])
-    run_clockserver(**vars(options))
+    run_clockservice(**vars(options))

+ 54 - 0
celery/conf.py

@@ -22,6 +22,10 @@ DEFAULT_AMQP_CONNECTION_RETRY = True
 DEFAULT_AMQP_CONNECTION_MAX_RETRIES = 100
 DEFAULT_TASK_SERIALIZER = "pickle"
 DEFAULT_BACKEND = "database"
+DEFAULT_CELERYBEAT_PID_FILE = "celerybeat.pid"
+DEFAULT_CELERYBEAT_LOG_LEVEL = "INFO"
+DEFAULT_CELERYBEAT_LOG_FILE = "celerybeat.log"
+DEFAULT_CELERYBEAT_SCHEDULE_FILENAME = "celerybeat-schedule"
 
 
 """
@@ -252,3 +256,53 @@ cache backend in ``CACHE_BACKEND`` will be used.
 
 """
 CELERY_CACHE_BACKEND = getattr(settings, "CELERY_CACHE_BACKEND", None)
+
+DEFAULT_CELERYBEAT_PID_FILE = "celerybeat.pid"
+DEFAULT_CELERYBEAT_LOG_LEVEL = "INFO"
+DEFAULT_CELERYBEAT_LOG_FILE = "celerybeat.log"
+DEFAULT_CELERYBEAT_SCHEDULE_FILENAME = "celerybeat-schedule"
+
+"""
+
+.. data:: CELERYBEAT_PID_FILE
+
+Name of celerybeats pid file.
+Default is: ``celerybeat.pid``.
+
+"""
+CELERYBEAT_PID_FILE = getattr(settings, "CELERYBEAT_PID_FILE",
+                              DEFAULT_CELERYBEAT_PID_FILE)
+
+"""
+
+.. data:: CELERYBEAT_LOG_LEVEL
+
+Default log level for celerybeat.
+Default is: ``INFO``.
+
+"""
+CELERYBEAT_LOG_LEVEL = getattr(settings, "CELERYBEAT_LOG_LEVEL",
+                               DEFAULT_CELERYBEAT_LOG_LEVEL)
+
+"""
+
+.. data:: CELERYBEAT_LOG_FILE
+
+Default log file for celerybeat.
+Default is: ``celerybeat.log``.
+
+"""
+CELERYBEAT_LOG_FILE = getattr(settings, "CELERYBEAT_LOG_FILE",
+                              DEFAULT_CELERYBEAT_LOG_FILE)
+
+"""
+
+.. data:: CELERYBEAT_SCHEDULE_FILENAME
+
+Name of the persistent schedule database file.
+Default is: ``celerybeat-schedule``.
+
+"""
+CELERYBEAT_SCHEDULE_FILENAME = getattr(settings,
+                                       "CELERYBEAT_SCHEDULE_FILENAME",
+                                       DEFAULT_CELERYBEAT_SCHEDULE_FILENAME)