Kaynağa Gözat

Merge branch '3.0'

Conflicts:
	celery/bin/celeryd_multi.py
	extra/generic-init.d/celeryd
Ask Solem 12 yıl önce
ebeveyn
işleme
0e9501bce0

+ 9 - 0
Changelog

@@ -79,6 +79,15 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 - Terminating a task now properly updates the state of the task to revoked,
   and sends a ``task-revoked`` event.
 
+- Multi: No longer parses --app option (Issue #1008).
+
+- Generic worker init script now waits for workers to shutdown by default.
+
+- Multi: stop_verify command renamed to stopwait.
+
+- Daemonization: Now delays trying to create pidfile/logfile until after
+  the working directory has been changed into.
+
 - :program:`celery worker` and :program:`celery beat` commands now respects
   the :option:`--no-color` option (Issue #999).
 

+ 13 - 6
celery/bin/base.py

@@ -127,6 +127,10 @@ class Command(object):
     # module Rst documentation to parse help from (if any)
     doc = None
 
+    # Some programs (multi) does not want to load the app specified
+    # (Issue #1008).
+    respects_app_option = True
+
     #: List of options to parse before parsing other options.
     preload_options = (
         Option('-A', '--app', default=None),
@@ -289,12 +293,15 @@ class Command(object):
         config_module = preload_options.get('config_module')
         if config_module:
             os.environ['CELERY_CONFIG_MODULE'] = config_module
-        if app:
-            self.app = self.find_app(app)
-        elif self.app is None:
-            self.app = self.get_app(loader=loader)
-        if self.enable_config_from_cmdline:
-            argv = self.process_cmdline_config(argv)
+        if self.respects_app_option:
+            if app and self.respects_app_option:
+                self.app = self.find_app(app)
+            elif self.app is None:
+                self.app = self.get_app(loader=loader)
+            if self.enable_config_from_cmdline:
+                argv = self.process_cmdline_config(argv)
+        else:
+            self.app = celery.Celery()
         return argv
 
     def find_app(self, app):

+ 4 - 0
celery/bin/celery.py

@@ -223,6 +223,7 @@ class Delegate(Command):
 @command
 class multi(Command):
     """Start multiple worker instances."""
+    respects_app_option = False
 
     def get_options(self):
         return ()
@@ -887,6 +888,9 @@ class CeleryCommand(BaseCommand):
         return self.execute(command, argv)
 
     def execute_from_commandline(self, argv=None):
+        argv = sys.argv if argv is None else argv
+        if 'multi' in argv[1:3]:  # Issue 1008
+            self.respects_app_option = False
         try:
             sys.exit(determine_exit_status(
                 super(CeleryCommand, self).execute_from_commandline(argv)))

+ 1 - 1
celery/bin/celeryd_multi.py

@@ -155,7 +155,7 @@ class MultiTool(object):
                          'show': self.show,
                          'stop': self.stop,
                          'stopwait': self.stopwait,
-                         'stop_verify': self.stopwait,
+                         'stop_verify': self.stopwait,  # compat alias
                          'restart': self.restart,
                          'kill': self.kill,
                          'names': self.names,

+ 16 - 9
celery/platforms.py

@@ -262,10 +262,11 @@ class DaemonContext(object):
     _is_open = False
 
     def __init__(self, pidfile=None, workdir=None, umask=None,
-            fake=False, **kwargs):
+            fake=False, after_chdir=None, **kwargs):
         self.workdir = workdir or DAEMON_WORKDIR
         self.umask = DAEMON_UMASK if umask is None else umask
         self.fake = fake
+        self.after_chdir = after_chdir
         self.stdfds = (sys.stdin, sys.stdout, sys.stderr)
 
     def redirect_to_null(self, fd):
@@ -281,6 +282,9 @@ class DaemonContext(object):
             os.chdir(self.workdir)
             os.umask(self.umask)
 
+            if self.after_chdir:
+                self.after_chdir()
+
             preserve = [fileno(f) for f in self.stdfds if fileno(f)]
             for fd in reversed(range(get_fdmax(default=2048))):
                 if fd not in preserve:
@@ -353,14 +357,17 @@ def detached(logfile=None, pidfile=None, uid=None, gid=None, umask=0,
         # no point trying to setuid unless we're root.
         maybe_drop_privileges(uid=uid, gid=gid)
 
-    # Since without stderr any errors will be silently suppressed,
-    # we need to know that we have access to the logfile.
-    logfile and open(logfile, 'a').close()
-    # Doesn't actually create the pidfile, but makes sure it's not stale.
-    if pidfile:
-        _create_pidlock(pidfile).release()
-
-    return DaemonContext(umask=umask, workdir=workdir, fake=fake)
+    def after_chdir_do():
+        # Since without stderr any errors will be silently suppressed,
+        # we need to know that we have access to the logfile.
+        logfile and open(logfile, 'a').close()
+        # Doesn't actually create the pidfile, but makes sure it's not stale.
+        if pidfile:
+            _create_pidlock(pidfile).release()
+
+    return DaemonContext(
+        umask=umask, workdir=workdir, fake=fake, after_chdir=after_chdir_do,
+    )
 
 
 def parse_uid(uid):

+ 1 - 1
extra/generic-init.d/celeryd

@@ -129,7 +129,7 @@ export PATH="${PATH:+$PATH:}/usr/sbin:/sbin"
 
 
 stop_workers () {
-    $CELERYD_MULTI stop_verify $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
+    $CELERYD_MULTI stopwait $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
 }