Browse Source

Worker accidentally sets a default socket timeout, but should only be active for shutdown

Ask Solem 11 years ago
parent
commit
2fdf1f7c93
2 changed files with 22 additions and 22 deletions
  1. 15 20
      celery/bootsteps.py
  2. 7 2
      celery/worker/__init__.py

+ 15 - 20
celery/bootsteps.py

@@ -18,7 +18,6 @@ from .datastructures import DependencyGraph, GraphFormatter
 from .five import values, with_metaclass
 from .utils.imports import instantiate, qualname
 from .utils.log import get_logger
-from .utils.threads import default_socket_timeout
 
 try:
     from greenlet import GreenletExit
@@ -28,9 +27,6 @@ except ImportError:  # pragma: no cover
 
 __all__ = ['Blueprint', 'Step', 'StartStopStep', 'ConsumerStep']
 
-#: Default socket timeout at shutdown.
-SHUTDOWN_SOCKET_TIMEOUT = 5.0
-
 #: States
 RUN = 0x1
 CLOSE = 0x2
@@ -149,22 +145,21 @@ class Blueprint(object):
                  description=None, reverse=True, propagate=True, args=()):
         description = description or method.capitalize()
         steps = reversed(parent.steps) if reverse else parent.steps
-        with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):  # Issue 975
-            for step in steps:
-                if step:
-                    self._debug('%s %s...',
-                                description.capitalize(), step.alias)
-                    fun = getattr(step, method, None)
-                    if fun:
-                        try:
-                            fun(parent, *args)
-                        except Exception as exc:
-                            if propagate:
-                                raise
-                            logger.error(
-                                'Error while %s %s: %r',
-                                description, step.alias, exc, exc_info=1,
-                            )
+        for step in steps:
+            if step:
+                self._debug('%s %s...',
+                            description.capitalize(), step.alias)
+                fun = getattr(step, method, None)
+                if fun:
+                    try:
+                        fun(parent, *args)
+                    except Exception as exc:
+                        if propagate:
+                            raise
+                        logger.error(
+                            'Error while %s %s: %r',
+                            description, step.alias, exc, exc_info=1,
+                        )
 
     def stop(self, parent, close=True, terminate=False):
         what = 'terminating' if terminate else 'stopping'

+ 7 - 2
celery/worker/__init__.py

@@ -36,11 +36,15 @@ from celery.five import string_t, values
 from celery.utils import nodename, nodesplit, worker_direct
 from celery.utils.imports import reload_from_cwd
 from celery.utils.log import mlevel, worker_logger as logger
+from celery.utils.threads import default_socket_timeout
 
 from . import state
 
 __all__ = ['WorkController', 'default_nodename']
 
+#: Default socket timeout at shutdown.
+SHUTDOWN_SOCKET_TIMEOUT = 5.0
+
 SELECT_UNKNOWN_QUEUE = """\
 Trying to select queue subset of {0!r}, but queue {1} is not
 defined in the CELERY_QUEUES setting.
@@ -261,8 +265,9 @@ class WorkController(object):
         # if blueprint does not exist it means that we had an
         # error before the bootsteps could be initialized.
         if self.blueprint is not None:
-            self.blueprint.stop(self, terminate=not warm)
-            self.blueprint.join()
+            with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):  # Issue 975
+                self.blueprint.stop(self, terminate=not warm)
+                self.blueprint.join()
 
     def reload(self, modules=None, reload=False, reloader=None):
         modules = self.app.loader.task_modules if modules is None else modules