Browse Source

Merge branch 'beat' into 1point0

Conflicts:
	celery/conf.py
Ask Solem 15 years ago
parent
commit
ea5ae5efb2

+ 7 - 0
bin/celerybeat

@@ -0,0 +1,7 @@
+#!/usr/bin/env python
+import sys
+from celery.bin.celerybeat import run_clockservice, parse_options
+
+if __name__ == "__main__":
+    options = parse_options(sys.argv[1:])
+    run_clockservice(**vars(options))

+ 0 - 28
celery/backends/__init__.py

@@ -27,17 +27,6 @@ def get_backend_cls(backend):
 get_default_backend_cls = partial(get_backend_cls, conf.CELERY_BACKEND)
 
 
-"""
-.. function:: get_default_periodicstatus_backend_cls()
-
-    Get the backend class specified in
-    :setting:`CELERY_PERIODIC_STATUS_BACKEND`.
-
-"""
-get_default_periodicstatus_backend_cls = partial(get_backend_cls,
-                                        conf.CELERY_PERIODIC_STATUS_BACKEND)
-
-
 """
 .. class:: DefaultBackend
 
@@ -48,15 +37,6 @@ get_default_periodicstatus_backend_cls = partial(get_backend_cls,
 DefaultBackend = get_default_backend_cls()
 
 
-"""
-.. class:: DefaultPeriodicStatusBackend
-
-    The default backend for storing periodic task metadata, specified
-    in :setting:`CELERY_PERIODIC_STATUS_BACKEND`.
-
-"""
-DefaultPeriodicStatusBackend = get_default_periodicstatus_backend_cls()
-
 """
 .. data:: default_backend
 
@@ -64,11 +44,3 @@ DefaultPeriodicStatusBackend = get_default_periodicstatus_backend_cls()
 
 """
 default_backend = DefaultBackend()
-
-"""
-.. data:: default_periodic_status_backend
-
-    An instance of :class:`DefaultPeriodicStatusBackend`.
-
-"""
-default_periodic_status_backend = DefaultPeriodicStatusBackend()

+ 2 - 20
celery/backends/database.py

@@ -1,35 +1,17 @@
 """celery.backends.database"""
-from celery.models import TaskMeta, PeriodicTaskMeta
+from celery.models import TaskMeta
 from celery.backends.base import BaseBackend
 
 
 class Backend(BaseBackend):
     """The database backends. Using Django models to store task metadata."""
 
-    capabilities = ["ResultStore", "PeriodicStatus"]
+    capabilities = ["ResultStore"]
 
     def __init__(self, *args, **kwargs):
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
 
-    def init_periodic_tasks(self):
-        """Create entries for all periodic tasks in the database."""
-        PeriodicTaskMeta.objects.init_entries()
-
-    def run_periodic_tasks(self):
-        """Run all waiting periodic tasks.
-
-        :returns: a list of ``(task, task_id)`` tuples containing
-            the task class and id for the resulting tasks applied.
-
-        """
-        waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
-        task_id_tuples = []
-        for waiting_task in waiting_tasks:
-            task_id = waiting_task.delay()
-            task_id_tuples.append((waiting_task, task_id))
-        return task_id_tuples
-
     def store_result(self, task_id, result, status, traceback=None):
         """Store return value and status of an executed task."""
         if status == "DONE":

+ 2 - 90
celery/backends/mongodb.py

@@ -1,23 +1,16 @@
 """MongoDB backend for celery."""
-
-import random
-from datetime import datetime, timedelta
-
+from datetime import datetime
 from django.core.exceptions import ImproperlyConfigured
 from celery.serialization import pickle
 from celery.backends.base import BaseBackend
 from celery.loaders import settings
 from celery.conf import TASK_RESULT_EXPIRES
-from celery.registry import tasks
 
 try:
     import pymongo
 except ImportError:
     pymongo = None
 
-# taken from celery.managers.PeriodicTaskManager
-SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
-
 
 class Bunch:
 
@@ -27,7 +20,7 @@ class Bunch:
 
 class Backend(BaseBackend):
 
-    capabilities = ("ResultStore", "PeriodicStatus")
+    capabilities = ["ResultStore"]
 
     mongodb_host = 'localhost'
     mongodb_port = 27017
@@ -35,7 +28,6 @@ class Backend(BaseBackend):
     mongodb_password = None
     mongodb_database = 'celery'
     mongodb_taskmeta_collection = 'celery_taskmeta'
-    mongodb_periodictaskmeta_collection = 'celery_periodictaskmeta'
 
     def __init__(self, *args, **kwargs):
         """Initialize MongoDB backend instance.
@@ -65,9 +57,6 @@ class Backend(BaseBackend):
                     'database', self.mongodb_database)
             self.mongodb_taskmeta_collection = conf.get(
                 'taskmeta_collection', self.mongodb_taskmeta_collection)
-            self.mongodb_collection_periodictaskmeta = conf.get(
-                'periodictaskmeta_collection',
-                self.mongodb_periodictaskmeta_collection)
 
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
@@ -104,83 +93,6 @@ class Backend(BaseBackend):
             # goes out of scope
             self._connection = None
 
-    def init_periodic_tasks(self):
-        """Create collection for periodic tasks in database."""
-        db = self._get_database()
-        collection = db[self.mongodb_periodictaskmeta_collection]
-        collection.ensure_index("name", pymongo.ASCENDING, unique=True)
-
-        periodic_tasks = tasks.get_all_periodic()
-        for task_name in periodic_tasks.keys():
-            if not collection.find_one({"name": task_name}):
-                collection.save({"name": task_name,
-                                 "last_run_at": datetime.fromtimestamp(0),
-                                 "total_run_count": 0}, safe=True)
-
-    def run_periodic_tasks(self):
-        """Run all waiting periodic tasks.
-
-        :returns: a list of ``(task, task_id)`` tuples containing
-            the task class and id for the resulting tasks applied.
-        """
-        db = self._get_database()
-        collection = db[self.mongodb_periodictaskmeta_collection]
-
-        waiting_tasks = self._get_waiting_tasks()
-        task_id_tuples = []
-        for waiting_task in waiting_tasks:
-            task = tasks[waiting_task['name']]
-            resp = task.delay()
-            collection.update({'_id': waiting_task['_id']},
-                              {"$inc": {"total_run_count": 1}})
-
-            task_meta = Bunch(name=waiting_task['name'],
-                              last_run_at=waiting_task['last_run_at'],
-                              total_run_count=waiting_task['total_run_count'])
-            task_id_tuples.append((task_meta, resp.task_id))
-
-        return task_id_tuples
-
-    def _is_time(self, last_run_at, run_every):
-        """Check if if it is time to run the periodic task.
-
-        :param last_run_at: Last time the periodic task was run.
-        :param run_every: How often to run the periodic task.
-
-        :rtype bool:
-
-        """
-        # code taken from celery.managers.PeriodicTaskManager
-        run_every_drifted = run_every + SERVER_DRIFT
-        run_at = last_run_at + run_every_drifted
-        if datetime.now() > run_at:
-            return True
-        return False
-
-    def _get_waiting_tasks(self):
-        """Get all waiting periodic tasks."""
-        db = self._get_database()
-        collection = db[self.mongodb_periodictaskmeta_collection]
-
-        periodic_tasks = tasks.get_all_periodic()
-
-        # find all periodic tasks to be run
-        waiting = []
-        for task_meta in collection.find():
-            if task_meta['name'] in periodic_tasks:
-                task = periodic_tasks[task_meta['name']]
-                run_every = task.run_every
-                if self._is_time(task_meta['last_run_at'], run_every):
-                    collection.update(
-                        {"name": task_meta['name'],
-                         "last_run_at": task_meta['last_run_at']},
-                        {"$set": {"last_run_at": datetime.now()}})
-
-                    if db.last_status()['updatedExisting']:
-                        waiting.append(task_meta)
-
-        return waiting
-
     def store_result(self, task_id, result, status, traceback=None):
         """Store return value and status of an executed task."""
         from pymongo.binary import Binary

+ 189 - 0
celery/beat.py

@@ -0,0 +1,189 @@
+import time
+import shelve
+import atexit
+import threading
+from UserDict import UserDict
+from datetime import datetime
+from celery import conf
+from celery import registry
+from celery.log import setup_logger
+from celery.exceptions import NotRegistered
+
+
+
+class SchedulingError(Exception):
+    """An error occured while scheduling task."""
+
+
+class ScheduleEntry(object):
+    """An entry in the scheduler.
+
+    :param task: The task class.
+    :keyword last_run_at: The time and date when this task was last run.
+    :keyword total_run_count: Total number of times this periodic task has
+        been executed.
+
+    """
+
+    def __init__(self, name, last_run_at=None,
+            total_run_count=None):
+        self.name = name
+        self.last_run_at = last_run_at or datetime.now()
+        self.total_run_count = total_run_count or 0
+
+    def next(self):
+        return self.__class__(self.name, datetime.now(),
+                              self.total_run_count + 1)
+
+    def is_due(self, run_every):
+        return datetime.now() > (self.last_run_at + run_every)
+
+
+class Scheduler(UserDict):
+    """Scheduler for periodic tasks.
+
+    :keyword registry: The task registry to use.
+    :keyword schedule: The schedule dictionary. Default is the global
+        persistent schedule ``celery.beat.schedule``.
+
+    """
+    interval = 1
+
+    def __init__(self, **kwargs):
+
+        def _get_default_logger():
+            import multiprocessing
+            return multiprocessing.get_logger()
+
+        attr_defaults = {"registry": lambda: {},
+                         "schedule": lambda: {},
+                         "interval": lambda: self.interval,
+                         "logger": _get_default_logger}
+
+        for attr_name, attr_default_gen in attr_defaults.items():
+            if attr_name in kwargs:
+                attr_value = kwargs[attr_name]
+            else:
+                attr_value = attr_default_gen()
+            setattr(self, attr_name, attr_value)
+
+        self.cleanup()
+        self.schedule_registry()
+
+    def tick(self):
+        """Run a tick, that is one iteration of the scheduler.
+        Executes all due tasks."""
+        for entry in self.get_due_tasks():
+            self.logger.debug("Scheduler: Sending due task %s" % (
+                    entry.name))
+            result = self.apply_async(entry)
+            self.logger.debug("Scheduler: %s sent. id->%s" % (
+                    entry.name, result.task_id))
+
+    def get_due_tasks(self):
+        """Get all the schedule entries that are due to execution."""
+        return filter(self.is_due, self.schedule.values())
+
+    def get_task(self, name):
+        try:
+            return self.registry[name]
+        except KeyError:
+            raise NotRegistered(name)
+
+    def is_due(self, entry):
+        return entry.is_due(self.get_task(entry.name).run_every)
+
+    def apply_async(self, entry):
+
+        # Update timestamps and run counts before we actually execute,
+        # so we have that done if an exception is raised (doesn't schedule
+        # forever.)
+        entry = self.schedule[entry.name] = entry.next()
+        task = self.get_task(entry.name)
+
+        try:
+            result = task.apply_async()
+        except Exception, exc:
+            raise SchedulingError(
+                    "Couldn't apply scheduled task %s: %s" % (
+                        task.name, exc))
+        return result
+
+    def schedule_registry(self):
+        """Add the current contents of the registry to the schedule."""
+        periodic_tasks = self.registry.get_all_periodic()
+        for name, task in self.registry.get_all_periodic().items():
+            if name not in self.schedule:
+                self.logger.debug(
+                        "Scheduler: Adding periodic task %s to schedule" % (
+                            task.name))
+            self.schedule.setdefault(name, ScheduleEntry(task.name))
+
+    def cleanup(self):
+        for task_name, entry in self.schedule.items():
+            if task_name not in self.registry:
+                self.schedule.pop(task_name, None)
+
+    @property
+    def schedule(self):
+        return self.data
+
+
+class ClockService(object):
+    scheduler_cls = Scheduler
+    schedule_filename = conf.CELERYBEAT_SCHEDULE_FILENAME
+    registry = registry.tasks
+
+    def __init__(self, logger=None, is_detached=False):
+        self.logger = logger
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+
+    def start(self):
+        self.logger.info("ClockService: Starting...")
+        schedule = shelve.open(filename=self.schedule_filename)
+        #atexit.register(schedule.close)
+        scheduler = self.scheduler_cls(schedule=schedule,
+                                       registry=self.registry,
+                                       logger=self.logger)
+        self.logger.debug(
+                "ClockService: Ticking with interval->%d, schedule->%s" % (
+                    scheduler.interval, self.schedule_filename))
+
+        synced = [False]
+        def _stop():
+            if not synced[0]:
+                self.logger.debug("ClockService: Syncing schedule to disk...")
+                schedule.sync()
+                schedule.close()
+                synced[0] = True
+                self._stopped.set()
+
+        try:
+            while True:
+                if self._shutdown.isSet():
+                    break
+                scheduler.tick()
+                time.sleep(scheduler.interval)
+        except (KeyboardInterrupt, SystemExit):
+            _stop()
+        finally:
+            _stop()
+
+    def stop(self, wait=False):
+        self._shutdown.set()
+        wait and self._stopped.wait() # block until shutdown done.
+
+
+class ClockServiceThread(threading.Thread):
+
+    def __init__(self, *args, **kwargs):
+        self.clockservice = ClockService(*args, **kwargs)
+        threading.Thread.__init__(self)
+        self.setDaemon(True)
+
+    def run(self):
+        self.clockservice.start()
+
+    def stop(self):
+        self.clockservice.stop(wait=True)

+ 169 - 0
celery/bin/celerybeat.py

@@ -0,0 +1,169 @@
+#!/usr/bin/env python
+"""celerybeat
+
+.. program:: celerybeat
+
+.. cmdoption:: -f, --logfile
+
+    Path to log file. If no logfile is specified, ``stderr`` is used.
+
+.. cmdoption:: -l, --loglevel
+
+    Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
+    ``ERROR``, ``CRITICAL``, or ``FATAL``.
+
+.. cmdoption:: -p, --pidfile
+
+    Path to pidfile.
+
+.. cmdoption:: -d, --detach, --daemon
+
+    Run in the background as a daemon.
+
+.. cmdoption:: -u, --uid
+
+    User-id to run ``celerybeat`` as when in daemon mode.
+
+.. cmdoption:: -g, --gid
+
+    Group-id to run ``celerybeat`` as when in daemon mode.
+
+.. cmdoption:: --umask
+
+    umask of the process when in daemon mode.
+
+.. cmdoption:: --workdir
+
+    Directory to change to when in daemon mode.
+
+.. cmdoption:: --chroot
+
+    Change root directory to this path when in daemon mode.
+
+"""
+import os
+import sys
+import traceback
+import optparse
+from celery import __version__
+from celery import conf
+from celery import platform
+from celery.log import emergency_error
+from celery.beat import ClockService
+from celery.loaders import current_loader, settings
+
+STARTUP_INFO_FMT = """
+Configuration ->
+    * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
+    * Exchange -> %(exchange)s (%(exchange_type)s)
+    * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
+""".strip()
+
+OPTION_LIST = (
+    optparse.make_option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
+            action="store", dest="logfile",
+            help="Path to log file."),
+    optparse.make_option('-l', '--loglevel',
+            default=conf.CELERYBEAT_LOG_LEVEL,
+            action="store", dest="loglevel",
+            help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
+    optparse.make_option('-p', '--pidfile',
+            default=conf.CELERYBEAT_PID_FILE,
+            action="store", dest="pidfile",
+            help="Path to pidfile."),
+    optparse.make_option('-d', '--detach', '--daemon', default=False,
+            action="store_true", dest="detach",
+            help="Run in the background as a daemon."),
+    optparse.make_option('-u', '--uid', default=None,
+            action="store", dest="uid",
+            help="User-id to run celerybeat as when in daemon mode."),
+    optparse.make_option('-g', '--gid', default=None,
+            action="store", dest="gid",
+            help="Group-id to run celerybeat as when in daemon mode."),
+    optparse.make_option('--umask', default=0,
+            action="store", type="int", dest="umask",
+            help="umask of the process when in daemon mode."),
+    optparse.make_option('--workdir', default=None,
+            action="store", dest="working_directory",
+            help="Directory to change to when in daemon mode."),
+    optparse.make_option('--chroot', default=None,
+            action="store", dest="chroot",
+            help="Change root directory to this path when in daemon mode."),
+    )
+
+
+def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
+        logfile=conf.CELERYBEAT_LOG_FILE, pidfile=conf.CELERYBEAT_PID_FILE,
+        umask=0, uid=None, gid=None, working_directory=None, chroot=None,
+        **kwargs):
+    """Starts the celerybeat clock server."""
+
+    print("Celery Beat %s is starting." % __version__)
+
+    # Setup logging
+    if not isinstance(loglevel, int):
+        loglevel = conf.LOG_LEVELS[loglevel.upper()]
+    if not detach:
+        logfile = None # log to stderr when not running in the background.
+
+    # Dump configuration to screen so we have some basic information
+    # when users sends e-mails.
+    print(STARTUP_INFO_FMT % {
+            "vhost": getattr(settings, "AMQP_VHOST", "(default)"),
+            "host": getattr(settings, "AMQP_SERVER", "(default)"),
+            "port": getattr(settings, "AMQP_PORT", "(default)"),
+            "exchange": conf.AMQP_EXCHANGE,
+            "exchange_type": conf.AMQP_EXCHANGE_TYPE,
+            "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
+            "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
+            "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
+            "loglevel": loglevel,
+            "pidfile": pidfile,
+    })
+
+    print("Celery Beat has started.")
+    if detach:
+        from celery.log import setup_logger, redirect_stdouts_to_logger
+        context = platform.create_daemon_context(logfile, pidfile,
+                                        chroot_directory=chroot,
+                                        working_directory=working_directory,
+                                        umask=umask,
+                                        uid=uid,
+                                        gid=gid)
+        context.open()
+        logger = setup_logger(loglevel, logfile)
+        redirect_stdouts_to_logger(logger, loglevel)
+
+    # Run the worker init handler.
+    # (Usually imports task modules and such.)
+    current_loader.on_worker_init()
+
+    def _run_clock():
+        logger = setup_logger(loglevel, logfile)
+        clockservice = ClockService(logger=logger, is_detached=detach)
+
+        try:
+            clockservice.start()
+        except Exception, e:
+            emergency_error(logfile,
+                    "celerybeat raised exception %s: %s\n%s" % (
+                            e.__class__, e, traceback.format_exc()))
+
+    try:
+        _run_clock()
+    except:
+        if detach:
+            context.close()
+        raise
+
+
+def parse_options(arguments):
+    """Parse the available options to ``celeryd``."""
+    parser = optparse.OptionParser(option_list=OPTION_LIST)
+    options, values = parser.parse_args(arguments)
+    return options
+
+
+if __name__ == "__main__":
+    options = parse_options(sys.argv[1:])
+    run_clockservice(**vars(options))

+ 41 - 104
celery/bin/celeryd.py

@@ -21,6 +21,11 @@
 
     Path to pidfile.
 
+.. cmdoption:: -B, --beat
+
+    Also run the ``celerybeat`` periodic task scheduler. Please note that
+    there must only be one instance of this service.
+
 .. cmdoption:: -s, --statistics
 
     Turn on reporting of statistics (remember to flush the statistics message
@@ -63,28 +68,17 @@
 """
 import os
 import sys
-CAN_DETACH = True
-try:
-    import resource
-except ImportError:
-    CAN_DETACH = False
-
-from celery.loaders import current_loader
-from celery.loaders import settings
+import multiprocessing
+import traceback
+import optparse
 from celery import __version__
-from celery.supervisor import OFASupervisor
-from celery.log import emergency_error
-from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
-from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
 from celery import conf
-from celery import discovery
+from celery import platform
+from celery.log import emergency_error
 from celery.task import discard_all
 from celery.worker import WorkController
-import signal
-import multiprocessing
-import traceback
-import optparse
-import atexit
+from celery.loaders import current_loader, settings
+from celery.supervisor import OFASupervisor
 
 USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
 # Make sure the setting exists.
@@ -97,10 +91,12 @@ Configuration ->
     * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
     * Concurrency -> %(concurrency)s
     * Statistics -> %(statistics)s
+    * Celerybeat -> %(celerybeat)s
 """.strip()
 
 OPTION_LIST = (
-    optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
+    optparse.make_option('-c', '--concurrency',
+            default=conf.DAEMON_CONCURRENCY,
             action="store", dest="concurrency", type="int",
             help="Number of child processes processing the queue."),
     optparse.make_option('--discard', default=False,
@@ -111,15 +107,19 @@ OPTION_LIST = (
     optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
             action="store_true", dest="statistics",
             help="Collect statistics."),
-    optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
+    optparse.make_option('-f', '--logfile', default=conf.DAEMON_LOG_FILE,
             action="store", dest="logfile",
             help="Path to log file."),
-    optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
+    optparse.make_option('-l', '--loglevel', default=conf.DAEMON_LOG_LEVEL,
             action="store", dest="loglevel",
             help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
-    optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
+    optparse.make_option('-p', '--pidfile', default=conf.DAEMON_PID_FILE,
             action="store", dest="pidfile",
             help="Path to pidfile."),
+    optparse.make_option('-B', '--beat', default=False,
+            action="store_true", dest="run_clockservice",
+            help="Also run the celerybeat periodic task scheduler. \
+                  Please note that only one instance must be running."),
     optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="detach",
             help="Run in the background as a daemon."),
@@ -144,56 +144,13 @@ OPTION_LIST = (
     )
 
 
-def acquire_pidlock(pidfile):
-    """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
-    ``pidfile``.
-
-    If the ``pidfile`` already exists, but the process is not running the
-    ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
-    and execution continues as normally. However, if the process is still
-    running the program will exit complaning that the program is already
-    running in the background somewhere.
-
-    """
-    from daemon.pidlockfile import PIDLockFile
-    import errno
-    pidlock = PIDLockFile(pidfile)
-    if not pidlock.is_locked():
-        return pidlock
-    pid = pidlock.read_pid()
-    try:
-        os.kill(pid, 0)
-    except os.error, exc:
-        if exc.errno == errno.ESRCH:
-            sys.stderr.write("Stale pidfile exists. Removing it.\n")
-            os.unlink(pidfile)
-            return PIDLockFile(pidfile)
-    except TypeError, exc:
-        sys.stderr.write("Broken pidfile found. Removing it.\n")
-        os.unlink(pidfile)
-        return PIDLockFile(pidfile)
-    else:
-        raise SystemExit(
-                "ERROR: Pidfile (%s) already exists.\n"
-                "Seems celeryd is already running? (PID: %d)" % (
-                    pidfile, pid))
-    return pidlock
-
-
-def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
-        loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
-        pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
-        supervised=False, working_directory=None, chroot=None,
-        statistics=None, **kwargs):
+def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
+        loglevel=conf.DAEMON_LOG_LEVEL, logfile=conf.DAEMON_LOG_FILE,
+        discard=False, pidfile=conf.DAEMON_PID_FILE, umask=0,
+        uid=None, gid=None, supervised=False, working_directory=None,
+        chroot=None, statistics=None, run_clockservice=False, **kwargs):
     """Starts the celery worker server."""
 
-    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
-    # it) lets the parent wait() for the terminated child process and stops
-    # the 'OSError: [Errno 10] No child processes' problem.
-
-    if hasattr(signal, "SIGCLD"): # Make sure the platform supports signals.
-        signal.signal(signal.SIGCLD, signal.SIG_DFL)
-
     print("Celery %s is starting." % __version__)
 
     if statistics is not None:
@@ -213,7 +170,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
 
     # Setup logging
     if not isinstance(loglevel, int):
-        loglevel = LOG_LEVELS[loglevel.upper()]
+        loglevel = conf.LOG_LEVELS[loglevel.upper()]
     if not detach:
         logfile = None # log to stderr when not running in the background.
 
@@ -238,35 +195,18 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
             "loglevel": loglevel,
             "pidfile": pidfile,
             "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
+            "celerybeat": run_clockservice and "ON" or "OFF",
     })
 
     print("Celery has started.")
     if detach:
-        if not CAN_DETACH:
-            raise RuntimeError(
-                    "This operating system doesn't support detach. ")
-        from daemon import DaemonContext
         from celery.log import setup_logger, redirect_stdouts_to_logger
-
-        # Since without stderr any errors will be silently suppressed,
-        # we need to know that we have access to the logfile
-        if logfile:
-            open(logfile, "a").close()
-
-        pidlock = acquire_pidlock(pidfile)
-        if umask is None:
-            umask = 0
-        if uid is None:
-            uid = os.geteuid()
-        if gid is None:
-            gid = os.getegid()
-        working_directory = working_directory or os.getcwd()
-        context = DaemonContext(chroot_directory=chroot,
-                                working_directory=working_directory,
-                                umask=umask,
-                                pidfile=pidlock,
-                                uid=uid,
-                                gid=gid)
+        context = platform.create_daemon_context(logfile, pidfile,
+                                        chroot_directory=chroot,
+                                        working_directory=working_directory,
+                                        umask=umask,
+                                        uid=uid,
+                                        gid=gid)
         context.open()
         logger = setup_logger(loglevel, logfile)
         redirect_stdouts_to_logger(logger, loglevel)
@@ -279,11 +219,12 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         worker = WorkController(concurrency=concurrency,
                                 loglevel=loglevel,
                                 logfile=logfile,
+                                embed_clockservice=run_clockservice,
                                 is_detached=detach)
 
         # Install signal handler that restarts celeryd on SIGHUP,
         # (only on POSIX systems)
-        install_restart_signal_handler(worker)
+        install_worker_restart_handler(worker)
 
         try:
             worker.start()
@@ -302,14 +243,9 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         raise
 
 
-def install_restart_signal_handler(worker):
-    """Installs a signal handler that restarts the current program
-    when it receives the ``SIGHUP`` signal.
-    """
-    if not hasattr(signal, "SIGHUP"):
-        return  # platform is not POSIX
+def install_worker_restart_handler(worker):
 
-    def restart_self(signum, frame):
+    def restart_worker_sig_handler(signum, frame):
         """Signal handler restarting the current python program."""
         worker.logger.info("Restarting celeryd (%s)" % (
             " ".join(sys.argv)))
@@ -322,7 +258,8 @@ def install_restart_signal_handler(worker):
             worker.stop()
         os.execv(sys.executable, [sys.executable] + sys.argv)
 
-    signal.signal(signal.SIGHUP, restart_self)
+    platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
+
 
 
 def parse_options(arguments):

+ 48 - 10
celery/conf.py

@@ -1,3 +1,4 @@
+from celery.registry import tasks
 """celery.conf"""
 from celery.loaders import settings
 from datetime import timedelta
@@ -21,8 +22,11 @@ DEFAULT_AMQP_CONNECTION_RETRY = True
 DEFAULT_AMQP_CONNECTION_MAX_RETRIES = 100
 DEFAULT_TASK_SERIALIZER = "pickle"
 DEFAULT_BACKEND = "database"
-DEFAULT_PERIODIC_STATUS_BACKEND = "database"
 DEFAULT_DISABLE_RATE_LIMITS = False
+DEFAULT_CELERYBEAT_PID_FILE = "celerybeat.pid"
+DEFAULT_CELERYBEAT_LOG_LEVEL = "INFO"
+DEFAULT_CELERYBEAT_LOG_FILE = "celerybeat.log"
+DEFAULT_CELERYBEAT_SCHEDULE_FILENAME = "celerybeat-schedule"
 
 
 """
@@ -246,24 +250,24 @@ CELERY_BACKEND = getattr(settings, "CELERY_BACKEND", DEFAULT_BACKEND)
 
 """
 
-.. data:: CELERY_PERIODIC_STATUS_BACKEND
+.. data:: CELERY_CACHE_BACKEND
 
-The backend used to store the status of periodic tasks.
+Use a custom cache backend for celery. If not set the django-global
+cache backend in ``CACHE_BACKEND`` will be used.
 
 """
-CELERY_PERIODIC_STATUS_BACKEND = getattr(settings,
-                                    "CELERY_PERIODIC_STATUS_BACKEND",
-                                    DEFAULT_PERIODIC_STATUS_BACKEND)
+CELERY_CACHE_BACKEND = getattr(settings, "CELERY_CACHE_BACKEND", None)
 
 """
 
-.. data:: CELERY_CACHE_BACKEND
+.. data:: CELERYBEAT_PID_FILE
 
-Use a custom cache backend for celery. If not set the django-global
-cache backend in ``CACHE_BACKEND`` will be used.
+Name of celerybeats pid file.
+Default is: ``celerybeat.pid``.
 
 """
-CELERY_CACHE_BACKEND = getattr(settings, "CELERY_CACHE_BACKEND", None)
+CELERYBEAT_PID_FILE = getattr(settings, "CELERYBEAT_PID_FILE",
+                              DEFAULT_CELERYBEAT_PID_FILE)
 
 
 """
@@ -286,3 +290,37 @@ as soon as possible.
 """
 DISABLE_RATE_LIMITS = getattr(settings, "CELERY_DISABLE_RATE_LIMITS",
                               DEFAULT_DISABLE_RATE_LIMITS)
+
+"""
+
+.. data:: CELERYBEAT_LOG_LEVEL
+
+Default log level for celerybeat.
+Default is: ``INFO``.
+
+"""
+CELERYBEAT_LOG_LEVEL = getattr(settings, "CELERYBEAT_LOG_LEVEL",
+                               DEFAULT_CELERYBEAT_LOG_LEVEL)
+
+"""
+
+.. data:: CELERYBEAT_LOG_FILE
+
+Default log file for celerybeat.
+Default is: ``celerybeat.log``.
+
+"""
+CELERYBEAT_LOG_FILE = getattr(settings, "CELERYBEAT_LOG_FILE",
+                              DEFAULT_CELERYBEAT_LOG_FILE)
+
+"""
+
+.. data:: CELERYBEAT_SCHEDULE_FILENAME
+
+Name of the persistent schedule database file.
+Default is: ``celerybeat-schedule``.
+
+"""
+CELERYBEAT_SCHEDULE_FILENAME = getattr(settings,
+                                       "CELERYBEAT_SCHEDULE_FILENAME",
+                                       DEFAULT_CELERYBEAT_SCHEDULE_FILENAME)

+ 2 - 1
celery/loaders/__init__.py

@@ -14,7 +14,8 @@ Loader = DefaultLoader
 if settings.configured:
     Loader = DjangoLoader
 else:
-    if callable(getattr(os, "fork", None)): # Platform doesn't support fork()
+    if not callable(getattr(os, "fork", None)):
+        # Platform doesn't support fork()
         # XXX On systems without fork, multiprocessing seems to be launching
         # the processes in some other way which does not copy the memory
         # of the parent process. This means that any configured env might

+ 3 - 3
celery/log.py

@@ -4,7 +4,7 @@ import sys
 import time
 import logging
 import traceback
-from celery.conf import LOG_FORMAT, DAEMON_LOG_LEVEL
+from celery import conf
 
 
 def get_default_logger(loglevel=None):
@@ -14,8 +14,8 @@ def get_default_logger(loglevel=None):
     return logger
 
 
-def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
-        **kwargs):
+def setup_logger(loglevel=conf.DAEMON_LOG_LEVEL, logfile=None,
+        format=conf.LOG_FORMAT, **kwargs):
     """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
     ``stderr`` is used.
 

+ 17 - 0
celery/management/commands/celerybeat.py

@@ -0,0 +1,17 @@
+"""
+
+Start the celery clock service from the Django management command.
+
+"""
+from django.core.management.base import BaseCommand
+from celery.bin.celerybeat import run_clockservice, OPTION_LIST
+
+
+class Command(BaseCommand):
+    """Run the celery daemon."""
+    option_list = BaseCommand.option_list + OPTION_LIST
+    help = 'Run the celery daemon'
+
+    def handle(self, *args, **options):
+        """Handle the management command."""
+        run_clockservice(**options)

+ 2 - 112
celery/managers.py

@@ -1,63 +1,9 @@
 """celery.managers"""
 from django.db import models
-from django.db import connection, transaction
-from celery.registry import tasks
+from django.db import transaction
 from celery.conf import TASK_RESULT_EXPIRES
-from datetime import datetime, timedelta
+from datetime import datetime
 from django.conf import settings
-import random
-
-# server_drift can be negative, but timedelta supports addition on
-# negative seconds.
-SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
-
-
-class TableLock(object):
-    """Base class for database table locks. Also works as a NOOP lock."""
-
-    def __init__(self, table, type="read"):
-        self.table = table
-        self.type = type
-        self.cursor = None
-
-    def lock_table(self):
-        """Lock the table."""
-        pass
-
-    def unlock_table(self):
-        """Release previously locked tables."""
-        pass
-
-    @classmethod
-    def acquire(cls, table, type=None):
-        """Acquire table lock."""
-        lock = cls(table, type)
-        lock.lock_table()
-        return lock
-
-    def release(self):
-        """Release the lock."""
-        self.unlock_table()
-        if self.cursor:
-            self.cursor.close()
-            self.cursor = None
-
-
-class MySQLTableLock(TableLock):
-    """Table lock support for MySQL."""
-
-    def lock_table(self):
-        """Lock MySQL table."""
-        self.cursor = connection.cursor()
-        self.cursor.execute("LOCK TABLES %s %s" % (
-            self.table, self.type.upper()))
-
-    def unlock_table(self):
-        """Unlock MySQL table."""
-        self.cursor.execute("UNLOCK TABLES")
-
-TABLE_LOCK_FOR_ENGINE = {"mysql": MySQLTableLock}
-table_lock = TABLE_LOCK_FOR_ENGINE.get(settings.DATABASE_ENGINE, TableLock)
 
 
 class TaskManager(models.Manager):
@@ -119,59 +65,3 @@ class TaskManager(models.Manager):
                 self.store_result(task_id, result, status, traceback, False)
             else:
                 raise
-
-
-class PeriodicTaskManager(models.Manager):
-    """Manager for :class:`celery.models.PeriodicTask` models."""
-
-    def init_entries(self):
-        """Add entries for all registered periodic tasks.
-
-        Should be run at worker start.
-        """
-        periodic_tasks = tasks.get_all_periodic()
-        for task_name in periodic_tasks.keys():
-            task_meta, created = self.get_or_create(name=task_name)
-
-    def is_time(self, last_run_at, run_every):
-        """Check if if it is time to run the periodic task.
-
-        :param last_run_at: Last time the periodic task was run.
-        :param run_every: How often to run the periodic task.
-
-        :rtype bool:
-
-        """
-        run_every_drifted = run_every + SERVER_DRIFT
-        run_at = last_run_at + run_every_drifted
-        if datetime.now() > run_at:
-            return True
-        return False
-
-    def get_waiting_tasks(self):
-        """Get all waiting periodic tasks.
-
-        :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
-        """
-        periodic_tasks = tasks.get_all_periodic()
-        db_table = self.model._meta.db_table
-
-        # Find all periodic tasks to be run.
-        waiting = []
-        for task_meta in self.all():
-            if task_meta.name in periodic_tasks:
-                task = periodic_tasks[task_meta.name]
-                run_every = task.run_every
-                if self.is_time(task_meta.last_run_at, run_every):
-                    # Get the object again to be sure noone else
-                    # has already taken care of it.
-                    lock = table_lock.acquire(db_table, "write")
-                    try:
-                        secure = self.get(pk=task_meta.pk)
-                        if self.is_time(secure.last_run_at, run_every):
-                            secure.last_run_at = datetime.now()
-                            secure.save()
-                            waiting.append(secure)
-                    finally:
-                        lock.release()
-        return waiting

+ 3 - 37
celery/models.py

@@ -6,7 +6,7 @@ Django Models.
 import django
 from django.db import models
 from celery.registry import tasks
-from celery.managers import TaskManager, PeriodicTaskManager
+from celery.managers import TaskManager
 from celery.fields import PickledObjectField
 from celery import conf
 from django.utils.translation import ugettext_lazy as _
@@ -40,42 +40,8 @@ class TaskMeta(models.Model):
     def __unicode__(self):
         return u"<Task: %s done:%s>" % (self.task_id, self.status)
 
-
-class PeriodicTaskMeta(models.Model):
-    """Information about a Periodic Task."""
-    name = models.CharField(_(u"name"), max_length=255, unique=True)
-    last_run_at = models.DateTimeField(_(u"last time run"),
-                                       blank=True,
-                                       default=datetime.fromtimestamp(0))
-    total_run_count = models.PositiveIntegerField(_(u"total run count"),
-                                                  default=0)
-
-    objects = PeriodicTaskManager()
-
-    class Meta:
-        """Model meta-data."""
-        verbose_name = _(u"periodic task")
-        verbose_name_plural = _(u"periodic tasks")
-
-    def __unicode__(self):
-        return u"<PeriodicTask: %s [last-run:%s, total-run:%d]>" % (
-                self.name, self.last_run_at, self.total_run_count)
-
-    def delay(self, *args, **kwargs):
-        """Apply the periodic task immediately."""
-        self.task.delay()
-        self.total_run_count = self.total_run_count + 1
-        self.save()
-
-    @property
-    def task(self):
-        """The entry registered in the task registry for this task."""
-        return tasks[self.name]
-
-
 if (django.VERSION[0], django.VERSION[1]) >= (1, 1):
-    # keep models away from syncdb/reset if database backend is not being used.
+    # keep models away from syncdb/reset if database backend is not
+    # being used.
     if conf.CELERY_BACKEND != 'database':
         TaskMeta._meta.managed = False
-    if conf.CELERY_PERIODIC_STATUS_BACKEND != 'database':
-        PeriodicTaskMeta._meta.managed = False

+ 97 - 0
celery/platform.py

@@ -0,0 +1,97 @@
+import os
+import sys
+import signal
+
+
+CAN_DETACH = True
+try:
+    import resource
+except ImportError:
+    CAN_DETACH = False
+
+
+def acquire_pidlock(pidfile):
+    """Get the :class:`daemon.pidlockfile.PIDLockFile` handler for
+    ``pidfile``.
+
+    If the ``pidfile`` already exists, but the process is not running the
+    ``pidfile`` will be removed, a ``"stale pidfile"`` message is emitted
+    and execution continues as normally. However, if the process is still
+    running the program will exit complaning that the program is already
+    running in the background somewhere.
+
+    """
+    from daemon.pidlockfile import PIDLockFile
+    import errno
+    pidlock = PIDLockFile(pidfile)
+    if not pidlock.is_locked():
+        return pidlock
+    pid = pidlock.read_pid()
+    try:
+        os.kill(pid, 0)
+    except os.error, exc:
+        if exc.errno == errno.ESRCH:
+            sys.stderr.write("Stale pidfile exists. Removing it.\n")
+            os.unlink(pidfile)
+            return PIDLockFile(pidfile)
+    except TypeError, exc:
+        sys.stderr.write("Broken pidfile found. Removing it.\n")
+        os.unlink(pidfile)
+        return PIDLockFile(pidfile)
+    else:
+        raise SystemExit(
+                "ERROR: Pidfile (%s) already exists.\n"
+                "Seems celeryd is already running? (PID: %d)" % (
+                    pidfile, pid))
+    return pidlock
+
+
+def create_daemon_context(logfile=None, pidfile=None, **options):
+    if not CAN_DETACH:
+        raise RuntimeError(
+                "This operating system doesn't support detach.")
+
+    from daemon import DaemonContext
+
+    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
+    # it) lets the parent wait() for the terminated child process and stops
+    # the 'OSError: [Errno 10] No child processes' problem.
+    platform.reset_signal("SIGCLD")
+
+    # Since without stderr any errors will be silently suppressed,
+    # we need to know that we have access to the logfile
+    if logfile:
+        open(logfile, "a").close()
+
+    options["pidlock"] = pidfile and acquire_pidlock(pidfile)
+
+    defaults = {"uid": lambda: os.geteuid(),
+                "gid": lambda: os.getegid(),
+                "umask": lambda: 0,
+                "chroot_directory": lambda: None,
+                "working_directory": lambda: os.getcwd()}
+
+    for opt_name, opt_default_gen in defaults.items():
+        if opt_name not in options:
+            options[opt_name] = opt_default_gen()
+
+    return DaemonContext(chroot_directory=chroot,
+                            working_directory=working_directory,
+                            umask=umask,
+                            pidfile=pidlock,
+                            uid=uid,
+                            gid=gid)
+
+
+def reset_signal(signal_name):
+    if hasattr(signal, signal_name):
+        signal.signal(getattr(signal, signal_name), signal.SIG_DFL)
+
+
+def install_signal_handler(signal_name, handler):
+    """Install a SIGHUP handler."""
+    if not hasattr(signal, signal_name):
+        return # Platform doesn't support signal.
+
+    signum = getattr(signal, signal_name)
+    signal.signal(signum, handler)

+ 1 - 1
celery/tests/test_models.py

@@ -1,6 +1,6 @@
 import unittest
 from datetime import datetime, timedelta
-from celery.models import TaskMeta, PeriodicTaskMeta
+from celery.models import TaskMeta
 from celery.task import PeriodicTask
 from celery.registry import tasks
 from celery.utils import gen_unique_id

+ 13 - 5
celery/worker/__init__.py

@@ -5,6 +5,7 @@ The Multiprocessing Worker Server
 """
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 from celery.worker.controllers import Mediator, PeriodicWorkController
+from celery.beat import ClockServiceThread
 from celery.worker.job import TaskWrapper
 from celery.exceptions import NotRegistered
 from celery.messaging import get_consumer_set
@@ -228,7 +229,7 @@ class WorkController(object):
     _state = None
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
-            is_detached=False):
+            is_detached=False, embed_clockservice=False):
 
         # Options
         self.loglevel = loglevel or self.loglevel
@@ -236,6 +237,7 @@ class WorkController(object):
         self.logfile = logfile or self.logfile
         self.is_detached = is_detached
         self.logger = setup_logger(loglevel, logfile)
+        self.embed_clockservice = embed_clockservice
 
         # Queues
         if conf.DISABLE_RATE_LIMITS:
@@ -256,13 +258,19 @@ class WorkController(object):
                                           initial_prefetch_count=concurrency)
         self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
 
+        self.clockservice = None
+        if self.embed_clockservice:
+            self.clockservice = ClockServiceThread(logger=self.logger,
+                                                is_detached=self.is_detached)
+
         # The order is important here;
         #   the first in the list is the first to start,
         # and they must be stopped in reverse order.
-        self.components = [self.pool,
-                           self.mediator,
-                           self.periodic_work_controller,
-                           self.amqp_listener]
+        self.components = filter(None, (self.pool,
+                                        self.mediator,
+                                        self.periodic_work_controller,
+                                        self.clockservice,
+                                        self.amqp_listener))
 
     def start(self):
         """Starts the workers main loop."""

+ 2 - 25
celery/worker/controllers.py

@@ -3,11 +3,9 @@
 Worker Controller Threads
 
 """
-from celery.backends import default_periodic_status_backend
 from Queue import Empty as QueueEmpty
 from datetime import datetime
 from celery.log import get_default_logger
-import traceback
 import threading
 import time
 
@@ -99,9 +97,7 @@ class Mediator(BackgroundThread):
 
 
 class PeriodicWorkController(BackgroundThread):
-    """A thread that continuously checks if there are
-    :class:`celery.task.PeriodicTask` tasks waiting for execution,
-    and executes them. It also finds tasks in the hold queue that is
+    """Finds tasks in the hold queue that is
     ready for execution and moves them to the bucket queue.
 
     (Tasks in the hold queue are tasks waiting for retry, or with an
@@ -114,33 +110,14 @@ class PeriodicWorkController(BackgroundThread):
         self.hold_queue = hold_queue
         self.bucket_queue = bucket_queue
 
-    def on_start(self):
-        """Do backend-specific periodic task initialization."""
-        default_periodic_status_backend.init_periodic_tasks()
-
     def on_iteration(self):
-        """Run periodic tasks and process the hold queue."""
+        """Process the hold queue."""
         logger = get_default_logger()
-        logger.debug("PeriodicWorkController: Running periodic tasks...")
-        try:
-            self.run_periodic_tasks()
-        except Exception, exc:
-            logger.error(
-                "PeriodicWorkController got exception: %s\n%s" % (
-                    exc, traceback.format_exc()))
         logger.debug("PeriodicWorkController: Processing hold queue...")
         self.process_hold_queue()
         logger.debug("PeriodicWorkController: Going to sleep...")
         time.sleep(1)
 
-    def run_periodic_tasks(self):
-        logger = get_default_logger()
-        applied = default_periodic_status_backend.run_periodic_tasks()
-        for task, task_id in applied:
-            logger.debug(
-                "PeriodicWorkController: Periodic task %s applied (%s)" % (
-                    task.name, task_id))
-
     def process_hold_queue(self):
         """Finds paused tasks that are ready for execution and move
         them to the :attr:`bucket_queue`."""

+ 141 - 0
contrib/debian/init.d/celerybeat

@@ -0,0 +1,141 @@
+#! /bin/sh
+
+### BEGIN INIT INFO
+# Provides:		celerybeat
+# Required-Start:	
+# Required-Stop:	
+# Default-Start:	2 3 4 5
+# Default-Stop:		1
+# Short-Description:	celery clock service
+### END INIT INFO
+
+set -e
+
+VIRTUALENV=/opt/Opal/current
+DJANGO_PROJECT_DIR=/opt/Opal/release/opal
+DJANGO_SETTINGS_MODULE=settings
+CELERYBEAT_PID_FILE="/var/run/celerybeat.pid"
+CELERYBEAT_LOG_FILE="/var/log/celerybeat.log"
+CELERYBEAT_LOG_LEVEL="INFO"
+CELERYBEAT="celerybeat"
+
+export DJANGO_SETTINGS_MODULE
+export DJANGO_PROJECT_DIR
+
+# /etc/init.d/ssh: start and stop the celery clock server
+
+if test -f /etc/default/celerybeat; then
+    . /etc/default/celerybeat
+fi
+
+
+. /lib/lsb/init-functions
+
+cd $DJANGO_PROJECT_DIR
+
+CELERYBEAT_OPTS="-f $CELERYBEAT_LOG_FILE -l $CELERYBEAT_LOG_LEVEL -p \
+                    $CELERYBEAT_PID_FILE -d"
+
+if [ -n "$2" ]; then
+    CELERYBEAT_OPTS="$CELERYBEAT_OPTS $2"
+fi
+
+# Are we running from init?
+run_by_init() {
+    ([ "$previous" ] && [ "$runlevel" ]) || [ "$runlevel" = S ]
+}
+
+
+check_dev_null() {
+    if [ ! -c /dev/null ]; then
+	if [ "$1" = log_end_msg ]; then
+	    log_end_msg 1 || true
+	fi
+	if ! run_by_init; then
+	    log_action_msg "/dev/null is not a character device!"
+	fi
+	exit 1
+    fi
+}
+
+
+export PATH="${PATH:+$PATH:}/usr/sbin:/sbin"
+if [ ! -z "$VIRTUALENV" ]; then
+    . "$VIRTUALENV/bin/activate"
+    export PATH="$VIRTUALENV/bin:$PATH"
+    CELERYBEAT="$VIRTUALENV/bin/$CELERYBEAT"
+fi
+
+
+case "$1" in
+  start)
+	check_dev_null
+	log_daemon_msg "Starting celery clock server" "celerybeat"
+	if start-stop-daemon --start --quiet --oknodo --pidfile $CELERYBEAT_PID_FILE --exec $CELERYBEAT -- $CELERYBEAT_OPTS; then
+	    log_end_msg 0
+	else
+	    log_end_msg 1
+	fi
+	;;
+  stop)
+	log_daemon_msg "Stopping celery clock server" "celerybeat"
+	if start-stop-daemon --stop --quiet --oknodo --pidfile $CELERYBEAT_PID_FILE; then log_end_msg 0
+	else
+	    log_end_msg 1
+	fi
+	;;
+
+  reload|force-reload)
+    echo "Use start+stop"
+	;;
+
+  restart)
+	log_daemon_msg "Restarting celery clock server" "celerybeat"
+	start-stop-daemon --stop --quiet --oknodo --retry 30 --pidfile $CELERYBEAT_PID_FILE
+	check_dev_null log_end_msg
+	if start-stop-daemon --start --quiet --oknodo --pidfile $CELERYBEAT_PID_FILE --exec $CELERYBEAT -- $CELERYBEAT_OPTS; then
+	    log_end_msg 0
+	else
+	    log_end_msg 1
+	fi
+	;;
+
+  try-restart)
+	log_daemon_msg "Restarting celery clock server" "celerybeat"
+	set +e
+	start-stop-daemon --stop --quiet --retry 30 --pidfile $CELERYBEAT_PID_FILE
+	RET="$?"
+	set -e
+	case $RET in
+	    0)
+		# old daemon stopped
+		check_dev_null log_end_msg
+		if start-stop-daemon --start --quiet --oknodo --pidfile $CELERYBEAT_PID_FILE --exec $CELERYBEAT -- $CELERYBEAT_OPTS; then
+		    log_end_msg 0
+		else
+		    log_end_msg 1
+		fi
+		;;
+	    1)
+		# daemon not running
+		log_progress_msg "(not running)"
+		log_end_msg 0
+		;;
+	    *)
+		# failed to stop
+		log_progress_msg "(failed to stop)"
+		log_end_msg 1
+		;;
+	esac
+	;;
+
+  status)
+	status_of_proc -p $CELERYBEAT_PID_FILE $CELERYBEAT celerybeat && exit 0 || exit $?
+	;;
+
+  *)
+	log_action_msg "Usage: /etc/init.d/celerybeat {start|stop|force-reload|restart|try-restart|status}"
+	exit 1
+esac
+
+exit 0

+ 1 - 18
docs/configuration.rst

@@ -1,4 +1,5 @@
 ============================
+
  Configuration and defaults
 ============================
 
@@ -73,23 +74,9 @@ Task result backend settings
         try to receive the result once).
 
 
-* CELERY_PERIODIC_STATUS_BACKEND
-    The backend used to store the status of periodic tasks.
-    Can be one of the following:
-
-    * database (default)
-        Use a relational database supported by the Django ORM.
-
-    * mongodb
-        Use MongoDB.
-
-
 Database backend settings
 =========================
 
-This applies to both the result store backend and the periodic status
-backend.
-
 Please see the Django ORM database settings documentation:
 http://docs.djangoproject.com/en/dev/ref/settings/#database-engine
 
@@ -194,10 +181,6 @@ MongoDB backend settings
         The collection name to store task metadata.
         Defaults to "celery_taskmeta".
 
-    * periodictaskmeta_collection
-        The collection name to store periodic task metadata.
-        Defaults to "celery_periodictaskmeta".
-
 
 Example configuration
 ---------------------

+ 1 - 1
setup.py

@@ -77,7 +77,7 @@ setup(
     platforms=["any"],
     license="BSD",
     packages=find_packages(exclude=['ez_setup']),
-    scripts=["bin/celeryd", "bin/celeryinit"],
+    scripts=["bin/celeryd", "bin/celeryinit", "bin/celerybeat"],
     zip_safe=False,
     install_requires=install_requires,
     extra_requires={