Browse Source

Refactored celery.bin.celeryd into celery.bin.celeryd + celery.apps.worker

Ask Solem 14 years ago
parent
commit
9efb933253

+ 3 - 3
bin/celeryd

@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 import sys
+
 from celery.bin import celeryd
 
 WINDOWS_MESSAGE = """
@@ -19,11 +20,10 @@ You can also supply arguments:
 
 def main():
     if sys.platform == "win32":
-        print(WINDOWS_MESSAGE)
+        sys.stderr.write(WINDOWS_MESSAGE)
         sys.exit()
 
-    options = celeryd.parse_options(sys.argv[1:])
-    celeryd.run_worker(**vars(options))
+    celeryd.main()
 
 if __name__ == "__main__":
     import multiprocessing

+ 0 - 0
celery/apps/__init__.py


+ 289 - 0
celery/apps/worker.py

@@ -0,0 +1,289 @@
+import logging
+import multiprocessing
+import platform as _platform
+import os
+import socket
+import sys
+import warnings
+
+from celery import __version__
+from celery import platform
+from celery import signals
+from celery.exceptions import ImproperlyConfigured
+from celery.routes import Router
+from celery.task import discard_all
+from celery.utils import info,get_full_cls_name, LOG_LEVELS
+from celery.worker import WorkController
+
+
+SYSTEM = _platform.system()
+IS_OSX = SYSTEM == "Darwin"
+
+STARTUP_INFO_FMT = """
+Configuration ->
+    . broker -> %(conninfo)s
+    . queues ->
+%(queues)s
+    . concurrency -> %(concurrency)s
+    . loader -> %(loader)s
+    . logfile -> %(logfile)s@%(loglevel)s
+    . events -> %(events)s
+    . beat -> %(celerybeat)s
+%(tasks)s
+""".strip()
+
+TASK_LIST_FMT = """    . tasks ->\n%s"""
+
+
+class Worker(object):
+    WorkController = WorkController
+
+    def __init__(self, concurrency=None, loglevel=None, logfile=None,
+            hostname=None, discard=False, run_clockservice=False,
+            schedule=None, task_time_limit=None, task_soft_time_limit=None,
+            max_tasks_per_child=None, queues=None, events=False, db=None,
+            include=None, defaults=None, **kwargs):
+        if defaults is None:
+            from celery import conf
+            defaults = conf
+        self.defaults = defaults
+        self.concurrency = (concurrency or
+                            defaults.CELERYD_CONCURRENCY or
+                            multiprocessing.cpu_count())
+        self.loglevel = loglevel or defaults.CELERYD_LOG_LEVEL
+        self.logfile = logfile or defaults.CELERYD_LOG_FILE
+        self.hostname = hostname or socket.gethostname()
+        self.discard = discard
+        self.run_clockservice = run_clockservice
+        self.schedule = schedule or defaults.CELERYBEAT_SCHEDULE_FILENAME
+        self.events = events
+        self.task_time_limit = (task_time_limit or
+                                defaults.CELERYD_TASK_TIME_LIMIT)
+        self.task_soft_time_limit = (task_soft_time_limit or
+                                     defaults.CELERYD_TASK_SOFT_TIME_LIMIT)
+        self.max_tasks_per_child = (max_tasks_per_child or
+                                    defaults.CELERYD_MAX_TASKS_PER_CHILD)
+        self.db = db
+        self.queues = queues or []
+        self.include = include or []
+        self._isatty = sys.stdout.isatty()
+
+        if isinstance(self.queues, basestring):
+            self.queues = self.queues.split(",")
+        if isinstance(self.include, basestring):
+            self.include = self.include.split(",")
+
+        if not isinstance(self.loglevel, int):
+            self.loglevel = LOG_LEVELS[self.loglevel.upper()]
+
+    def run(self):
+        self.init_loader()
+        self.init_queues()
+        self.worker_init()
+        self.redirect_stdouts_to_logger()
+        print("celery@%s v%s is starting." % (self.hostname, __version__))
+
+        if getattr(self.settings, "DEBUG", False):
+            warnings.warn("Using settings.DEBUG leads to a memory leak, "
+                    "never use this setting in a production environment!")
+
+        if self.discard:
+            self.purge_messages()
+
+        # Dump configuration to screen so we have some basic information
+        # for when users sends bug reports.
+        print(self.startup_info())
+        set_process_status("Running...")
+
+        self.run_worker()
+
+    def on_listener_ready(self, listener):
+        signals.worker_ready.send(sender=listener)
+        print("celery@%s has started." % self.hostname)
+
+    def init_queues(self):
+        conf = self.defaults
+        if self.queues:
+            conf.QUEUES = dict((queue, options)
+                                for queue, options in conf.QUEUES.items()
+                                    if queue in self.queues)
+            for queue in self.queues:
+                if queue not in conf.QUEUES:
+                    if conf.CREATE_MISSING_QUEUES:
+                        Router(queues=conf.QUEUES).add_queue(queue)
+                    else:
+                        raise ImproperlyConfigured(
+                            "Queue '%s' not defined in CELERY_QUEUES" % queue)
+
+    def init_loader(self):
+        from celery.loaders import current_loader, load_settings
+        self.loader = current_loader()
+        self.settings = load_settings()
+        if not self.loader.configured:
+            raise ImproperlyConfigured(
+                    "Celery needs to be configured to run celeryd.")
+        map(self.loader.import_module, self.include)
+
+    def redirect_stdouts_to_logger(self):
+        from celery import log
+        handled = log.setup_logging_subsystem(loglevel=self.loglevel,
+                                              logfile=self.logfile)
+        # Redirect stdout/stderr to our logger.
+        if not handled:
+            logger = log.get_default_logger()
+            log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
+
+    def purge_messages(self):
+        discarded_count = discard_all()
+        what = discarded_count > 1 and "messages" or "message"
+        print("discard: Erased %d %s from the queue.\n" % (
+            discarded_count, what))
+
+    def worker_init(self):
+        # Run the worker init handler.
+        # (Usually imports task modules and such.)
+        self.loader.init_worker()
+
+    def tasklist(self, include_builtins=True):
+        from celery.registry import tasks
+        tasklist = tasks.keys()
+        if not include_builtins:
+            tasklist = filter(lambda s: not s.startswith("celery."),
+                              tasklist)
+        return TASK_LIST_FMT % "\n".join("\t. %s" % task
+                                            for task in sorted(tasklist))
+
+    def startup_info(self):
+        tasklist = ""
+        if self.loglevel <= logging.INFO:
+            include_builtins = self.loglevel <= logging.DEBUG
+            tasklist = self.tasklist(include_builtins=include_builtins)
+
+        queues = self.defaults.get_queues()
+
+        return STARTUP_INFO_FMT % {
+            "conninfo": info.format_broker_info(),
+            "queues": info.format_queues(queues, indent=8),
+            "concurrency": self.concurrency,
+            "loglevel": LOG_LEVELS[self.loglevel],
+            "logfile": self.logfile or "[stderr]",
+            "celerybeat": self.run_clockservice and "ON" or "OFF",
+            "events": self.events and "ON" or "OFF",
+            "tasks": tasklist,
+            "loader": get_full_cls_name(self.loader.__class__),
+        }
+
+    def run_worker(self):
+        worker = self.WorkController(concurrency=self.concurrency,
+                                loglevel=self.loglevel,
+                                logfile=self.logfile,
+                                hostname=self.hostname,
+                                ready_callback=self.on_listener_ready,
+                                embed_clockservice=self.run_clockservice,
+                                schedule_filename=self.schedule,
+                                send_events=self.events,
+                                db=self.db,
+                                max_tasks_per_child=self.max_tasks_per_child,
+                                task_time_limit=self.task_time_limit,
+                                task_soft_time_limit=self.task_soft_time_limit)
+        self.install_platform_tweaks(worker)
+        worker.start()
+
+    def install_platform_tweaks(self, worker):
+        """Install platform specific tweaks and workarounds."""
+        if IS_OSX:
+            self.osx_proxy_detection_workaround()
+
+        # Install signal handler so SIGHUP restarts the worker.
+        if not self._isatty:
+            # only install HUP handler if detached from terminal,
+            # so closing the terminal window doesn't restart celeryd
+            # into the background.
+            if IS_OSX:
+                # OS X can't exec from a process using threads.
+                # See http://github.com/ask/celery/issues#issue/152
+                install_HUP_not_supported_handler(worker)
+            else:
+                install_worker_restart_handler(worker)
+        install_worker_term_handler(worker)
+        install_worker_int_handler(worker)
+        signals.worker_init.send(sender=worker)
+
+    def osx_proxy_detection_workaround(self):
+        """See http://github.com/ask/celery/issues#issue/161"""
+        os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
+
+
+def install_worker_int_handler(worker):
+
+    def _stop(signum, frame):
+        process_name = multiprocessing.current_process().name
+        if process_name == "MainProcess":
+            worker.logger.warn(
+                "celeryd: Hitting Ctrl+C again will terminate "
+                "all running tasks!")
+            install_worker_int_again_handler(worker)
+            worker.logger.warn("celeryd: Warm shutdown (%s)" % (
+                process_name))
+            worker.stop()
+        raise SystemExit()
+
+    platform.install_signal_handler("SIGINT", _stop)
+
+
+def install_worker_int_again_handler(worker):
+
+    def _stop(signum, frame):
+        process_name = multiprocessing.current_process().name
+        if process_name == "MainProcess":
+            worker.logger.warn("celeryd: Cold shutdown (%s)" % (
+                process_name))
+            worker.terminate()
+        raise SystemExit()
+
+    platform.install_signal_handler("SIGINT", _stop)
+
+
+def install_worker_term_handler(worker):
+
+    def _stop(signum, frame):
+        process_name = multiprocessing.current_process().name
+        if process_name == "MainProcess":
+            worker.logger.warn("celeryd: Warm shutdown (%s)" % (
+                process_name))
+            worker.stop()
+        raise SystemExit()
+
+    platform.install_signal_handler("SIGTERM", _stop)
+
+
+def install_worker_restart_handler(worker):
+
+    def restart_worker_sig_handler(signum, frame):
+        """Signal handler restarting the current python program."""
+        worker.logger.warn("Restarting celeryd (%s)" % (
+            " ".join(sys.argv)))
+        worker.stop()
+        os.execv(sys.executable, [sys.executable] + sys.argv)
+
+    platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
+
+
+def install_HUP_not_supported_handler(worker):
+
+    def warn_on_HUP_handler(signum, frame):
+        worker.logger.error("SIGHUP not supported: "
+            "Restarting with HUP is unstable on this platform!")
+
+    platform.install_signal_handler("SIGHUP", warn_on_HUP_handler)
+
+
+def set_process_status(info):
+    arg_start = "manage" in sys.argv[0] and 2 or 1
+    if sys.argv[arg_start:]:
+        info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
+    return platform.set_mp_process_title("celeryd", info=info)
+
+
+def run_worker(*args, **kwargs):
+    return Worker(*args, **kwargs).run()

+ 47 - 0
celery/bin/base.py

@@ -0,0 +1,47 @@
+import os
+import sys
+
+from optparse import OptionParser, make_option as Option
+
+
+class Command(object):
+    args = ''
+    version = ''
+    option_list = ()
+
+    Parser = OptionParser
+
+    def __init__(self, defaults=None):
+        self.defaults = defaults
+
+        if self.defaults is None:
+            from celery import conf
+            self.defaults = conf
+
+    def parse_options(self, prog_name, arguments):
+        """Parse the available options."""
+        parser = self.create_parser(prog_name)
+        options, args = parser.parse_args(arguments)
+        return options, args
+
+    def create_parser(self, prog_name):
+        return self.Parser(prog=prog_name,
+                           usage=self.usage(),
+                           version=self.version,
+                           option_list=self.get_options())
+
+    def execute_from_commandline(self, argv=None):
+        if argv is None:
+            argv = list(sys.argv)
+        prog_name = os.path.basename(argv[0])
+        options, args = self.parse_options(prog_name, argv[1:])
+        return self.run(*args, **vars(options))
+
+    def usage(self):
+        return "%%prog [options] %s" % (self.args, )
+
+    def get_options(self):
+        return self.option_list
+
+    def run(self, *args, **options):
+        raise NotImplementedError("subclass responsibility")

+ 78 - 364
celery/bin/celeryd.py

@@ -67,378 +67,92 @@
     terminated and replaced by a new worker.
 
 """
-import os
-import sys
-import socket
-import logging
-import optparse
-import platform as _platform
-import warnings
 import multiprocessing
 
-from optparse import OptionParser, make_option as Option
-
 from celery import __version__
-from celery import conf
-from celery import signals
-from celery import platform
-from celery.task import discard_all
-from celery.utils import info
-from celery.utils import get_full_cls_name
-from celery.worker import WorkController
-from celery.exceptions import ImproperlyConfigured
-from celery.routes import Router
-
-SYSTEM = _platform.system()
-IS_OSX = SYSTEM == "Darwin"
-
-STARTUP_INFO_FMT = """
-Configuration ->
-    . broker -> %(conninfo)s
-    . queues ->
-%(queues)s
-    . concurrency -> %(concurrency)s
-    . loader -> %(loader)s
-    . logfile -> %(logfile)s@%(loglevel)s
-    . events -> %(events)s
-    . beat -> %(celerybeat)s
-%(tasks)s
-""".strip()
-
-TASK_LIST_FMT = """    . tasks ->\n%s"""
-
-
-def dump_version(*args):
-    print("celeryd v%s" % __version__)
-    sys.exit(0)
-
-
-OPTION_LIST = (
-    Option('-c', '--concurrency',
-        default=conf.CELERYD_CONCURRENCY,
-        action="store", dest="concurrency", type="int",
-        help="Number of child processes processing the queue."),
-    Option('-V', '--version',
-        action="callback", callback=dump_version, nargs=0,
-        help="Show version information and exit."),
-    Option('--purge', '--discard', default=False,
-        action="store_true", dest="discard",
-        help="Discard all waiting tasks before the server is started. "
-             "WARNING: This is unrecoverable, and the tasks will be "
-             "deleted from the messaging server."),
-    Option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
-        action="store", dest="logfile",
-        help="Path to log file."),
-    Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
-        action="store", dest="loglevel",
-        help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL/FATAL."),
-    Option('-n', '--hostname', default=None,
-        action="store", dest="hostname",
-        help="Set custom host name. E.g. 'foo.example.com'."),
-    Option('-B', '--beat', default=False,
-        action="store_true", dest="run_clockservice",
-        help="Also run the celerybeat periodic task scheduler. "
-             "Please note that only one instance of celerybeat should be "
-             "running at any one time."),
-    Option('-s', '--schedule',
-        default=conf.CELERYBEAT_SCHEDULE_FILENAME,
-        action="store", dest="schedule",
-        help="Path to the schedule database if running with the -B \
-              option. The extension '.db' will be appended to the \
-              filename. Default: %s" % conf.CELERYBEAT_SCHEDULE_FILENAME),
-    Option('-S', '--statedb', default=conf.CELERYD_STATE_DB,
-        action="store", dest="db",
-        help="Path to the state database. The extension '.db' will "
-             "be appended to the filename. Default: %s" % (
+from celery.bin.base import Command, Option
+
+
+class WorkerCommand(Command):
+    version = __version__
+
+    def run(self, *args, **kwargs):
+        from celery.apps.worker import Worker
+        kwargs["defaults"] = self.defaults
+        return Worker(*args, **kwargs).run()
+
+    def get_options(self):
+        conf = self.defaults
+        return (
+            Option('-c', '--concurrency',
+                default=conf.CELERYD_CONCURRENCY,
+                action="store", dest="concurrency", type="int",
+                help="Number of child processes processing the queue."),
+            Option('--purge', '--discard', default=False,
+                action="store_true", dest="discard",
+                help="Discard all waiting tasks before the server is"
+                     "started. WARNING: There is no undo operation "
+                     "and the tasks will be deleted."),
+            Option('-f', '--logfile', default=conf.CELERYD_LOG_FILE,
+                action="store", dest="logfile",
+                help="Path to log file."),
+            Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL,
+                action="store", dest="loglevel",
+                help="Choose between DEBUG/INFO/WARNING/ERROR/CRITICAL"),
+            Option('-n', '--hostname', default=None,
+                action="store", dest="hostname",
+                help="Set custom host name. E.g. 'foo.example.com'."),
+            Option('-B', '--beat', default=False,
+                action="store_true", dest="run_clockservice",
+                help="Also run the celerybeat periodic task scheduler. "
+                     "NOTE: Only one instance of celerybeat must be"
+                     "running at any one time."),
+            Option('-s', '--schedule',
+                default=conf.CELERYBEAT_SCHEDULE_FILENAME,
+                action="store", dest="schedule",
+                help="Path to the schedule database if running with the -B "
+                     "option. The extension '.db' will be appended to the "
+                    "filename. Default: %s" % (
+                        conf.CELERYBEAT_SCHEDULE_FILENAME, )),
+
+            Option('-S', '--statedb', default=conf.CELERYD_STATE_DB,
+                action="store", dest="db",
+                help="Path to the state database. The extension '.db' will "
+                     "be appended to the filename. Default: %s" % (
                         conf.CELERYD_STATE_DB, )),
-    Option('-E', '--events', default=conf.SEND_EVENTS,
-        action="store_true", dest="events",
-        help="Send events so celery can be monitored by e.g. celerymon."),
-    Option('--time-limit',
-        default=conf.CELERYD_TASK_TIME_LIMIT,
-        action="store", type="int", dest="task_time_limit",
-        help="Enables a hard time limit (in seconds) for tasks."),
-    Option('--soft-time-limit',
-        default=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
-        action="store", type="int", dest="task_soft_time_limit",
-        help="Enables a soft time limit (in seconds) for tasks."),
-    Option('--maxtasksperchild',
-        default=conf.CELERYD_MAX_TASKS_PER_CHILD,
-        action="store", type="int", dest="max_tasks_per_child",
-        help="Maximum number of tasks a pool worker can execute"
-             "before it's terminated and replaced by a new worker."),
-    Option('--queues', '-Q', default=[],
-        action="store", dest="queues",
-        help="Comma separated list of queues to enable for this worker. "
-             "By default all configured queues are enabled. "
-             "Example: -Q video,image"),
-    Option('--include', '-I', default=[],
-        action="store", dest="include",
-        help="Comma separated list of additional modules to import. "
-             "Example: -I foo.tasks,bar.tasks"),
-)
-
-
-class Worker(object):
-    WorkController = WorkController
-
-    def __init__(self, concurrency=None, loglevel=None, logfile=None,
-            hostname=None, discard=False, run_clockservice=False,
-            schedule=None, task_time_limit=None, task_soft_time_limit=None,
-            max_tasks_per_child=None, queues=None, events=False, db=None,
-            include=None, defaults=conf, **kwargs):
-        self.concurrency = (concurrency or
-                            defaults.CELERYD_CONCURRENCY or
-                            multiprocessing.cpu_count())
-        self.loglevel = loglevel or defaults.CELERYD_LOG_LEVEL
-        self.logfile = logfile or defaults.CELERYD_LOG_FILE
-        self.hostname = hostname or socket.gethostname()
-        self.discard = discard
-        self.run_clockservice = run_clockservice
-        self.schedule = schedule or defaults.CELERYBEAT_SCHEDULE_FILENAME
-        self.events = events
-        self.task_time_limit = (task_time_limit or
-                                defaults.CELERYD_TASK_TIME_LIMIT)
-        self.task_soft_time_limit = (task_soft_time_limit or
-                                     defaults.CELERYD_TASK_SOFT_TIME_LIMIT)
-        self.max_tasks_per_child = (max_tasks_per_child or
-                                    defaults.CELERYD_MAX_TASKS_PER_CHILD)
-        self.db = db
-        self.queues = queues or []
-        self.include = include or []
-        self._isatty = sys.stdout.isatty()
-
-        if isinstance(self.queues, basestring):
-            self.queues = self.queues.split(",")
-        if isinstance(self.include, basestring):
-            self.include = self.include.split(",")
-
-        if not isinstance(self.loglevel, int):
-            self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
-
-    def run(self):
-        self.init_loader()
-        self.init_queues()
-        self.worker_init()
-        self.redirect_stdouts_to_logger()
-        print("celery@%s v%s is starting." % (self.hostname, __version__))
-
-        if getattr(self.settings, "DEBUG", False):
-            warnings.warn("Using settings.DEBUG leads to a memory leak, "
-                    "never use this setting in a production environment!")
-
-        if self.discard:
-            self.purge_messages()
-
-        # Dump configuration to screen so we have some basic information
-        # for when users sends bug reports.
-        print(self.startup_info())
-        set_process_status("Running...")
-
-        self.run_worker()
-
-    def on_listener_ready(self, listener):
-        signals.worker_ready.send(sender=listener)
-        print("celery@%s has started." % self.hostname)
-
-    def init_queues(self):
-        if self.queues:
-            conf.QUEUES = dict((queue, options)
-                                for queue, options in conf.QUEUES.items()
-                                    if queue in self.queues)
-            for queue in self.queues:
-                if queue not in conf.QUEUES:
-                    if conf.CREATE_MISSING_QUEUES:
-                        Router(queues=conf.QUEUES).add_queue(queue)
-                    else:
-                        raise ImproperlyConfigured(
-                            "Queue '%s' not defined in CELERY_QUEUES" % queue)
-
-    def init_loader(self):
-        from celery.loaders import current_loader, load_settings
-        self.loader = current_loader()
-        self.settings = load_settings()
-        if not self.loader.configured:
-            raise ImproperlyConfigured(
-                    "Celery needs to be configured to run celeryd.")
-        map(self.loader.import_module, self.include)
-
-    def redirect_stdouts_to_logger(self):
-        from celery import log
-        handled = log.setup_logging_subsystem(loglevel=self.loglevel,
-                                              logfile=self.logfile)
-        # Redirect stdout/stderr to our logger.
-        if not handled:
-            logger = log.get_default_logger()
-            log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
-
-    def purge_messages(self):
-        discarded_count = discard_all()
-        what = discarded_count > 1 and "messages" or "message"
-        print("discard: Erased %d %s from the queue.\n" % (
-            discarded_count, what))
-
-    def worker_init(self):
-        # Run the worker init handler.
-        # (Usually imports task modules and such.)
-        self.loader.init_worker()
-
-    def tasklist(self, include_builtins=True):
-        from celery.registry import tasks
-        tasklist = tasks.keys()
-        if not include_builtins:
-            tasklist = filter(lambda s: not s.startswith("celery."),
-                              tasklist)
-        return TASK_LIST_FMT % "\n".join("\t. %s" % task
-                                            for task in sorted(tasklist))
-
-    def startup_info(self):
-        tasklist = ""
-        if self.loglevel <= logging.INFO:
-            include_builtins = self.loglevel <= logging.DEBUG
-            tasklist = self.tasklist(include_builtins=include_builtins)
-
-        queues = conf.get_queues()
-
-        return STARTUP_INFO_FMT % {
-            "conninfo": info.format_broker_info(),
-            "queues": info.format_queues(queues, indent=8),
-            "concurrency": self.concurrency,
-            "loglevel": conf.LOG_LEVELS[self.loglevel],
-            "logfile": self.logfile or "[stderr]",
-            "celerybeat": self.run_clockservice and "ON" or "OFF",
-            "events": self.events and "ON" or "OFF",
-            "tasks": tasklist,
-            "loader": get_full_cls_name(self.loader.__class__),
-        }
-
-    def run_worker(self):
-        worker = self.WorkController(concurrency=self.concurrency,
-                                loglevel=self.loglevel,
-                                logfile=self.logfile,
-                                hostname=self.hostname,
-                                ready_callback=self.on_listener_ready,
-                                embed_clockservice=self.run_clockservice,
-                                schedule_filename=self.schedule,
-                                send_events=self.events,
-                                db=self.db,
-                                max_tasks_per_child=self.max_tasks_per_child,
-                                task_time_limit=self.task_time_limit,
-                                task_soft_time_limit=self.task_soft_time_limit)
-        self.install_platform_tweaks(worker)
-        worker.start()
-
-    def install_platform_tweaks(self, worker):
-        """Install platform specific tweaks and workarounds."""
-        if IS_OSX:
-            self.osx_proxy_detection_workaround()
-
-        # Install signal handler so SIGHUP restarts the worker.
-        if not self._isatty:
-            # only install HUP handler if detached from terminal,
-            # so closing the terminal window doesn't restart celeryd
-            # into the background.
-            if IS_OSX:
-                # OS X can't exec from a process using threads.
-                # See http://github.com/ask/celery/issues#issue/152
-                install_HUP_not_supported_handler(worker)
-            else:
-                install_worker_restart_handler(worker)
-        install_worker_term_handler(worker)
-        install_worker_int_handler(worker)
-        signals.worker_init.send(sender=worker)
-
-    def osx_proxy_detection_workaround(self):
-        """See http://github.com/ask/celery/issues#issue/161"""
-        os.environ.setdefault("celery_dummy_proxy", "set_by_celeryd")
-
-
-def install_worker_int_handler(worker):
-
-    def _stop(signum, frame):
-        process_name = multiprocessing.current_process().name
-        if process_name == "MainProcess":
-            worker.logger.warn(
-                "celeryd: Hitting Ctrl+C again will terminate "
-                "all running tasks!")
-            install_worker_int_again_handler(worker)
-            worker.logger.warn("celeryd: Warm shutdown (%s)" % (
-                process_name))
-            worker.stop()
-        raise SystemExit()
-
-    platform.install_signal_handler("SIGINT", _stop)
-
-
-def install_worker_int_again_handler(worker):
-
-    def _stop(signum, frame):
-        process_name = multiprocessing.current_process().name
-        if process_name == "MainProcess":
-            worker.logger.warn("celeryd: Cold shutdown (%s)" % (
-                process_name))
-            worker.terminate()
-        raise SystemExit()
-
-    platform.install_signal_handler("SIGINT", _stop)
-
-
-def install_worker_term_handler(worker):
-
-    def _stop(signum, frame):
-        process_name = multiprocessing.current_process().name
-        if process_name == "MainProcess":
-            worker.logger.warn("celeryd: Warm shutdown (%s)" % (
-                process_name))
-            worker.stop()
-        raise SystemExit()
-
-    platform.install_signal_handler("SIGTERM", _stop)
-
-
-def install_worker_restart_handler(worker):
-
-    def restart_worker_sig_handler(signum, frame):
-        """Signal handler restarting the current python program."""
-        worker.logger.warn("Restarting celeryd (%s)" % (
-            " ".join(sys.argv)))
-        worker.stop()
-        os.execv(sys.executable, [sys.executable] + sys.argv)
-
-    platform.install_signal_handler("SIGHUP", restart_worker_sig_handler)
-
-
-def install_HUP_not_supported_handler(worker):
-
-    def warn_on_HUP_handler(signum, frame):
-        worker.logger.error("SIGHUP not supported: "
-            "Restarting with HUP is unstable on this platform!")
-
-    platform.install_signal_handler("SIGHUP", warn_on_HUP_handler)
-
-
-def parse_options(arguments):
-    """Parse the available options to ``celeryd``."""
-    parser = optparse.OptionParser(option_list=OPTION_LIST)
-    options, values = parser.parse_args(arguments)
-    return options
-
-
-def set_process_status(info):
-    arg_start = "manage" in sys.argv[0] and 2 or 1
-    if sys.argv[arg_start:]:
-        info = "%s (%s)" % (info, " ".join(sys.argv[arg_start:]))
-    return platform.set_mp_process_title("celeryd", info=info)
-
-
-def run_worker(**options):
-    return Worker(**options).run()
+            Option('-E', '--events', default=conf.SEND_EVENTS,
+                action="store_true", dest="events",
+                help="Send events so the worker can be monitored by "
+                     "celeryev, celerymon and other monitors.."),
+            Option('--time-limit',
+                default=conf.CELERYD_TASK_TIME_LIMIT,
+                action="store", type="int", dest="task_time_limit",
+                help="Enables a hard time limit (in seconds) for tasks."),
+            Option('--soft-time-limit',
+                default=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
+                action="store", type="int", dest="task_soft_time_limit",
+                help="Enables a soft time limit (in seconds) for tasks."),
+            Option('--maxtasksperchild',
+                default=conf.CELERYD_MAX_TASKS_PER_CHILD,
+                action="store", type="int", dest="max_tasks_per_child",
+                help="Maximum number of tasks a pool worker can execute"
+                     "before it's terminated and replaced by a new worker."),
+            Option('--queues', '-Q', default=[],
+                action="store", dest="queues",
+                help="Comma separated list of queues to consume from. "
+                     "By default all configured queues are used. "
+                     "Example: -Q video,image"),
+            Option('--include', '-I', default=[],
+                action="store", dest="include",
+                help="Comma separated list of additional modules to import. "
+                 "Example: -I foo.tasks,bar.tasks"),
+        )
 
 
 def main():
     multiprocessing.freeze_support()
-    options = parse_options(sys.argv[1:])
-    return run_worker(**vars(options))
+    worker = WorkerCommand()
+    worker.execute_from_commandline()
 
 if __name__ == "__main__":
     main()

+ 1 - 4
celery/conf.py

@@ -5,6 +5,7 @@ from datetime import timedelta
 
 from celery import routes
 from celery.loaders import load_settings
+from celery.utils import LOG_LEVELS
 
 DEFAULT_PROCESS_LOG_FMT = """
     [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
@@ -15,10 +16,6 @@ DEFAULT_TASK_LOG_FMT = " ".join("""
     [%(task_name)s(%(task_id)s)] %(message)s
 """.strip().split())
 
-LOG_LEVELS = dict(logging._levelNames)
-LOG_LEVELS["FATAL"] = logging.FATAL
-LOG_LEVELS[logging.FATAL] = "FATAL"
-
 settings = load_settings()
 
 _DEFAULTS = {

+ 5 - 7
celery/tests/test_bin/test_celeryd.py

@@ -9,7 +9,8 @@ from StringIO import StringIO
 from celery import conf
 from celery import platform
 from celery import signals
-from celery.bin import celeryd as cd
+from celery.apps import worker as cd
+from celery.bin.celeryd import WorkerCommand, main as celeryd_main
 from celery.exceptions import ImproperlyConfigured
 from celery.utils import patch
 from celery.utils.functional import wraps
@@ -152,10 +153,6 @@ class test_Worker(unittest.TestCase):
 
 class test_funs(unittest.TestCase):
 
-    @disable_stdouts
-    def test_dump_version(self):
-        self.assertRaises(SystemExit, cd.dump_version)
-
     @disable_stdouts
     def test_set_process_status(self):
         prev1, sys.argv = sys.argv, ["Arg0"]
@@ -176,7 +173,8 @@ class test_funs(unittest.TestCase):
 
     @disable_stdouts
     def test_parse_options(self):
-        opts = cd.parse_options(["--concurrency=512"])
+        cmd = WorkerCommand()
+        opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
         self.assertEqual(opts.concurrency, 512)
 
     @disable_stdouts
@@ -192,7 +190,7 @@ class test_funs(unittest.TestCase):
         p, cd.Worker = cd.Worker, Worker
         s, sys.argv = sys.argv, ["celeryd", "--discard"]
         try:
-            cd.main()
+            celeryd_main()
         finally:
             cd.Worker = p
             sys.argv = s

+ 8 - 0
celery/utils/__init__.py

@@ -7,6 +7,8 @@ try:
 except ImportError:
     ctypes = None
 import importlib
+import logging
+
 from datetime import datetime
 from uuid import UUID, uuid4, _uuid_generate_random
 from inspect import getargspec
@@ -20,6 +22,12 @@ from celery.utils.timeutils import timedelta_seconds # was here before
 from celery.utils.functional import curry
 
 
+LOG_LEVELS = dict(logging._levelNames)
+LOG_LEVELS["FATAL"] = logging.FATAL
+LOG_LEVELS[logging.FATAL] = "FATAL"
+
+
+
 class promise(object):
     """A promise.