Explorar o código

Working towards celerybeat.py

Ask Solem %!s(int64=15) %!d(string=hai) anos
pai
achega
003557a70e
Modificáronse 4 ficheiros con 239 adicións e 16 borrados
  1. 224 0
      celery/bin/celerybeat.py
  2. 11 12
      celery/bin/celeryd.py
  3. 3 3
      celery/log.py
  4. 1 1
      celery/tests/test_models.py

+ 224 - 0
celery/bin/celerybeat.py

@@ -0,0 +1,224 @@
+#!/usr/bin/env python
+"""celerybeat
+
+.. program:: celerybeat
+
+.. cmdoption:: -f, --logfile
+
+    Path to log file. If no logfile is specified, ``stderr`` is used.
+
+.. cmdoption:: -l, --loglevel
+
+    Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
+    ``ERROR``, ``CRITICAL``, or ``FATAL``.
+
+.. cmdoption:: -p, --pidfile
+
+    Path to pidfile.
+
+.. cmdoption:: -d, --detach, --daemon
+
+    Run in the background as a daemon.
+
+.. cmdoption:: -u, --uid
+
+    User-id to run ``celerybeat`` as when in daemon mode.
+
+.. cmdoption:: -g, --gid
+
+    Group-id to run ``celerybeat`` as when in daemon mode.
+
+.. cmdoption:: --umask
+
+    umask of the process when in daemon mode.
+
+.. cmdoption:: --workdir
+
+    Directory to change to when in daemon mode.
+
+.. cmdoption:: --chroot
+
+    Change root directory to this path when in daemon mode.
+
+"""
+import os
+import sys
+from celery.loaders import current_loader
+from celery.loaders import settings
+from celery import __version__
+from celery.log import emergency_error
+from celery import conf
+from celery import discovery
+from celery.task import discard_all
+from celery.worker import WorkController
+from celery import platform
+import traceback
+import optparse
+
+
+STARTUP_INFO_FMT = """
+Configuration ->
+    * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
+    * Exchange -> %(exchange)s (%(exchange_type)s)
+    * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
+""".strip()
+
+OPTION_LIST = (
+    optparse.make_option('-f', '--logfile', default=conf.DAEMON_LOG_FILE,
+            action="store", dest="logfile",
+            help="Path to log file."),
+    optparse.make_option('-l', '--loglevel', default=conf.DAEMON_LOG_LEVEL,
+            action="store", dest="loglevel",
+            help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
+    optparse.make_option('-p', '--pidfile', default=conf.DAEMON_PID_FILE,
+            action="store", dest="pidfile",
+            help="Path to pidfile."),
+    optparse.make_option('-d', '--detach', '--daemon', default=False,
+            action="store_true", dest="detach",
+            help="Run in the background as a daemon."),
+    optparse.make_option('-u', '--uid', default=None,
+            action="store", dest="uid",
+            help="User-id to run celerybeat as when in daemon mode."),
+    optparse.make_option('-g', '--gid', default=None,
+            action="store", dest="gid",
+            help="Group-id to run celerybeat as when in daemon mode."),
+    optparse.make_option('--umask', default=0,
+            action="store", type="int", dest="umask",
+            help="umask of the process when in daemon mode."),
+    optparse.make_option('--workdir', default=None,
+            action="store", dest="working_directory",
+            help="Directory to change to when in daemon mode."),
+    optparse.make_option('--chroot', default=None,
+            action="store", dest="chroot",
+            help="Change root directory to this path when in daemon mode."),
+    )
+
+
+def run_clock(detach=False, loglevel=conf.DAEMON_LOG_LEVEL,
+        logfile=conf.DAEMON_LOG_FILE, pidfile=conf.DAEMON_PID_FILE,
+        umask=0, uid=None, gid=None, working_directory=None, chroot=None,
+        **kwargs):
+    """Starts the celerybeat clock server."""
+
+    print("Celery Beat %s is starting." % __version__)
+
+    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
+    # it) lets the parent wait() for the terminated child process and stops
+    # the 'OSError: [Errno 10] No child processes' problem.
+    platform.reset_signal("SIGCLD")
+
+    if statistics is not None:
+        settings.CELERY_STATISTICS = statistics
+
+
+    if conf.CELERY_BACKEND == "database" \
+            and settings.DATABASE_ENGINE == "sqlite3" and \
+            concurrency > 1:
+        import warnings
+        warnings.warn("The sqlite3 database engine doesn't support "
+                "concurrency. We'll be using a single process only.",
+                UserWarning)
+        concurrency = 1
+
+    # Setup logging
+    if not isinstance(loglevel, int):
+        loglevel = conf.LOG_LEVELS[loglevel.upper()]
+    if not detach:
+        logfile = None # log to stderr when not running in the background.
+
+    if discard:
+        discarded_count = discard_all()
+        what = discarded_count > 1 and "messages" or "message"
+        print("discard: Erased %d %s from the queue.\n" % (
+                discarded_count, what))
+
+    # Dump configuration to screen so we have some basic information
+    # when users sends e-mails.
+    print(STARTUP_INFO_FMT % {
+            "vhost": getattr(settings, "AMQP_VHOST", "(default)"),
+            "host": getattr(settings, "AMQP_SERVER", "(default)"),
+            "port": getattr(settings, "AMQP_PORT", "(default)"),
+            "exchange": conf.AMQP_EXCHANGE,
+            "exchange_type": conf.AMQP_EXCHANGE_TYPE,
+            "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
+            "consumer_rkey": conf.AMQP_CONSUMER_ROUTING_KEY,
+            "publisher_rkey": conf.AMQP_PUBLISHER_ROUTING_KEY,
+            "concurrency": concurrency,
+            "loglevel": loglevel,
+            "pidfile": pidfile,
+            "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
+    })
+
+    print("Celery has started.")
+    if detach:
+        from celery.log import setup_logger, redirect_stdouts_to_logger
+        context = platform.create_daemon_context(logfile, pidfile,
+                                        chroot_directory=chroot,
+                                        working_directory=working_directory,
+                                        umask=umask,
+                                        uid=uid,
+                                        gid=gid)
+        context.open()
+        logger = setup_logger(loglevel, logfile)
+        redirect_stdouts_to_logger(logger, loglevel)
+
+    # Run the worker init handler.
+    # (Usually imports task modules and such.)
+    current_loader.on_worker_init()
+
+    def run_worker():
+        worker = WorkController(concurrency=concurrency,
+                                loglevel=loglevel,
+                                logfile=logfile,
+                                is_detached=detach)
+
+        # Install signal handler that restarts celeryd on SIGHUP,
+        # (only on POSIX systems)
+        install_worker_restart_handler(worker)
+
+        try:
+            worker.start()
+        except Exception, e:
+            emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
+                            e.__class__, e, traceback.format_exc()))
+
+    try:
+        if supervised:
+            OFASupervisor(target=run_worker).start()
+        else:
+            run_worker()
+    except:
+        if detach:
+            context.close()
+        raise
+
+
+def install_worker_restart_handler(worker):
+
+    def restart_worker_sig_handler(signum, frame):
+        """Signal handler restarting the current python program."""
+        worker.logger.info("Restarting celeryd (%s)" % (
+            " ".join(sys.argv)))
+        if worker.is_detached:
+            pid = os.fork()
+            if pid:
+                worker.stop()
+                sys.exit(0)
+        else:
+            worker.stop()
+        os.execv(sys.executable, [sys.executable] + sys.argv)
+
+    platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
+
+
+
+def parse_options(arguments):
+    """Parse the available options to ``celeryd``."""
+    parser = optparse.OptionParser(option_list=OPTION_LIST)
+    options, values = parser.parse_args(arguments)
+    return options
+
+
+if __name__ == "__main__":
+    options = parse_options(sys.argv[1:])
+    run_worker(**vars(options))

+ 11 - 12
celery/bin/celeryd.py

@@ -74,8 +74,6 @@ from celery.loaders import settings
 from celery import __version__
 from celery.supervisor import OFASupervisor
 from celery.log import emergency_error
-from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
-from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
 from celery import conf
 from celery import discovery
 from celery.task import discard_all
@@ -100,7 +98,8 @@ Configuration ->
 """.strip()
 
 OPTION_LIST = (
-    optparse.make_option('-c', '--concurrency', default=DAEMON_CONCURRENCY,
+    optparse.make_option('-c', '--concurrency',
+            default=conf.DAEMON_CONCURRENCY,
             action="store", dest="concurrency", type="int",
             help="Number of child processes processing the queue."),
     optparse.make_option('--discard', default=False,
@@ -111,13 +110,13 @@ OPTION_LIST = (
     optparse.make_option('-s', '--statistics', default=USE_STATISTICS,
             action="store_true", dest="statistics",
             help="Collect statistics."),
-    optparse.make_option('-f', '--logfile', default=DAEMON_LOG_FILE,
+    optparse.make_option('-f', '--logfile', default=conf.DAEMON_LOG_FILE,
             action="store", dest="logfile",
             help="Path to log file."),
-    optparse.make_option('-l', '--loglevel', default=DAEMON_LOG_LEVEL,
+    optparse.make_option('-l', '--loglevel', default=conf.DAEMON_LOG_LEVEL,
             action="store", dest="loglevel",
             help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
-    optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
+    optparse.make_option('-p', '--pidfile', default=conf.DAEMON_PID_FILE,
             action="store", dest="pidfile",
             help="Path to pidfile."),
     optparse.make_option('-d', '--detach', '--daemon', default=False,
@@ -180,11 +179,11 @@ def acquire_pidlock(pidfile):
     return pidlock
 
 
-def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
-        loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
-        pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
-        supervised=False, working_directory=None, chroot=None,
-        statistics=None, **kwargs):
+def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
+        loglevel=conf.DAEMON_LOG_LEVEL, logfile=conf.DAEMON_LOG_FILE,
+        discard=False, pidfile=conf.DAEMON_PID_FILE, umask=0,
+        uid=None, gid=None, supervised=False, working_directory=None,
+        chroot=None, statistics=None, **kwargs):
     """Starts the celery worker server."""
 
     # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
@@ -213,7 +212,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
 
     # Setup logging
     if not isinstance(loglevel, int):
-        loglevel = LOG_LEVELS[loglevel.upper()]
+        loglevel = conf.LOG_LEVELS[loglevel.upper()]
     if not detach:
         logfile = None # log to stderr when not running in the background.
 

+ 3 - 3
celery/log.py

@@ -4,7 +4,7 @@ import sys
 import time
 import logging
 import traceback
-from celery.conf import LOG_FORMAT, DAEMON_LOG_LEVEL
+from celery import conf
 
 
 def get_default_logger(loglevel=None):
@@ -14,8 +14,8 @@ def get_default_logger(loglevel=None):
     return logger
 
 
-def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
-        **kwargs):
+def setup_logger(loglevel=conf.DAEMON_LOG_LEVEL, logfile=None,
+        format=conf.LOG_FORMAT, **kwargs):
     """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
     ``stderr`` is used.
 

+ 1 - 1
celery/tests/test_models.py

@@ -1,6 +1,6 @@
 import unittest
 from datetime import datetime, timedelta
-from celery.models import TaskMeta, PeriodicTaskMeta
+from celery.models import TaskMeta
 from celery.task import PeriodicTask
 from celery.registry import tasks
 from celery.utils import gen_unique_id