Browse Source

Adds CELERYD_WORKER_LOST_WAIT to control WorkerLostError timeout.

Closes #595
Brendon Crawford 13 years ago
parent
commit
a923f205b9
4 changed files with 30 additions and 8 deletions
  1. 1 0
      celery/app/defaults.py
  2. 14 7
      celery/concurrency/processes/pool.py
  3. 3 1
      celery/worker/__init__.py
  4. 12 0
      docs/configuration.rst

+ 1 - 0
celery/app/defaults.py

@@ -181,6 +181,7 @@ NAMESPACES = {
         "TASK_LOG_FORMAT": Option(DEFAULT_TASK_LOG_FMT),
         "TASK_SOFT_TIME_LIMIT": Option(type="int"),
         "TASK_TIME_LIMIT": Option(type="int"),
+        "WORKER_LOST_WAIT": Option(10.0, type="float")
     },
     "CELERYBEAT": {
         "SCHEDULE": Option({}, type="dict"),

+ 14 - 7
celery/concurrency/processes/pool.py

@@ -58,6 +58,8 @@ SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
 # Miscellaneous
 #
 
+LOST_WORKER_TIMEOUT = 10.0
+
 job_counter = itertools.count()
 
 
@@ -533,7 +535,7 @@ class Pool(object):
 
     def __init__(self, processes=None, initializer=None, initargs=(),
             maxtasksperchild=None, timeout=None, soft_timeout=None,
-            force_execv=False):
+            force_execv=False, lost_worker_timeout=LOST_WORKER_TIMEOUT):
         self._setup_queues()
         self._taskqueue = Queue.Queue()
         self._cache = {}
@@ -544,6 +546,7 @@ class Pool(object):
         self._initializer = initializer
         self._initargs = initargs
         self._force_execv = force_execv
+        self.lost_worker_timeout = lost_worker_timeout or LOST_WORKER_TIMEOUT
 
         if soft_timeout and SIG_SOFT_TIMEOUT is None:
             warnings.warn(UserWarning("Soft timeouts are not supported: "
@@ -755,12 +758,13 @@ class Pool(object):
         assert self._state == RUN
         return self.map_async(func, iterable, chunksize).get()
 
-    def imap(self, func, iterable, chunksize=1, lost_worker_timeout=10.0):
+    def imap(self, func, iterable, chunksize=1, lost_worker_timeout=None):
         '''
         Equivalent of `itertools.imap()` -- can be MUCH slower
         than `Pool.map()`
         '''
         assert self._state == RUN
+        lost_worker_timeout = lost_worker_timeout or self.lost_worker_timeout
         if chunksize == 1:
             result = IMapIterator(self._cache,
                                   lost_worker_timeout=lost_worker_timeout)
@@ -777,11 +781,12 @@ class Pool(object):
             return (item for chunk in result for item in chunk)
 
     def imap_unordered(self, func, iterable, chunksize=1,
-                       lost_worker_timeout=10.0):
+                       lost_worker_timeout=None):
         '''
         Like `imap()` method but ordering of results is arbitrary
         '''
         assert self._state == RUN
+        lost_worker_timeout = lost_worker_timeout or self.lost_worker_timeout
         if chunksize == 1:
             result = IMapUnorderedIterator(self._cache,
                     lost_worker_timeout=lost_worker_timeout)
@@ -800,7 +805,7 @@ class Pool(object):
     def apply_async(self, func, args=(), kwds={},
             callback=None, accept_callback=None, timeout_callback=None,
             waitforslot=False, error_callback=None,
-            soft_timeout=None, timeout=None):
+            soft_timeout=None, timeout=None, lost_worker_timeout=None):
         '''
         Asynchronous equivalent of `apply()` builtin.
 
@@ -817,6 +822,7 @@ class Pool(object):
 
         '''
         assert self._state == RUN
+        lost_worker_timeout = lost_worker_timeout or self.lost_worker_timeout
         if soft_timeout and SIG_SOFT_TIMEOUT is None:
             warnings.warn(UserWarning("Soft timeouts are not supported: "
                     "on this platform: It does not have the SIGUSR1 signal."))
@@ -826,7 +832,8 @@ class Pool(object):
         if self._state == RUN:
             result = ApplyResult(self._cache, callback,
                                  accept_callback, timeout_callback,
-                                 error_callback, soft_timeout, timeout)
+                                 error_callback, soft_timeout, timeout,
+                                 lost_worker_timeout)
             if timeout or soft_timeout:
                 # start the timeout handler thread when required.
                 self._start_timeout_handler()
@@ -970,7 +977,7 @@ class ApplyResult(object):
 
     def __init__(self, cache, callback, accept_callback=None,
             timeout_callback=None, error_callback=None, soft_timeout=None,
-            timeout=None, lost_worker_timeout=10.0):
+            timeout=None, lost_worker_timeout=LOST_WORKER_TIMEOUT):
         self._mutex = threading.Lock()
         self._cond = threading.Condition(threading.Lock())
         self._job = job_counter.next()
@@ -1131,7 +1138,7 @@ class MapResult(ApplyResult):
 class IMapIterator(object):
     _worker_lost = None
 
-    def __init__(self, cache, lost_worker_timeout=10.0):
+    def __init__(self, cache, lost_worker_timeout=LOST_WORKER_TIMEOUT):
         self._cond = threading.Condition(threading.Lock())
         self._job = job_counter.next()
         self._cache = cache

+ 3 - 1
celery/worker/__init__.py

@@ -90,7 +90,8 @@ class Pool(abstract.StartStopComponent):
                                 timeout=w.task_time_limit,
                                 soft_timeout=w.task_soft_time_limit,
                                 putlocks=w.pool_putlocks,
-                                force_execv=w.force_execv)
+                                force_execv=w.force_execv,
+                                lost_worker_timeout=w.worker_lost_wait)
         return pool
 
 
@@ -191,6 +192,7 @@ class WorkController(configurated):
     prefetch_multiplier = from_config()
     state_db = from_config()
     disable_rate_limits = from_config()
+    worker_lost_wait = from_config()
 
     _state = None
     _running = 0

+ 12 - 0
docs/configuration.rst

@@ -1066,6 +1066,18 @@ This option will be enabled by default in a later version.
 
 This is not a problem on Windows, as it does not have `fork()`.
 
+.. setting:: CELERYD_WORKER_LOST_WAIT
+
+CELERYD_WORKER_LOST_WAIT
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+In some cases a worker may be killed without proper cleanup,
+and the worker may have published a result before terminating.
+This value specifies how long we wait for any missing results before
+raising a :exc:`@WorkerLostError` exception.
+
+Default is 10.0
+
 .. setting:: CELERYD_MAX_TASKS_PER_CHILD
 
 CELERYD_MAX_TASKS_PER_CHILD