|
@@ -1,3 +1,4 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
#
|
|
|
# Module providing the `Pool` class for managing a process pool
|
|
|
#
|
|
@@ -5,8 +6,7 @@
|
|
|
#
|
|
|
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
|
|
|
#
|
|
|
-
|
|
|
-__all__ = ['Pool']
|
|
|
+from __future__ import absolute_import
|
|
|
|
|
|
#
|
|
|
# Imports
|
|
@@ -22,11 +22,13 @@ import collections
|
|
|
import time
|
|
|
import signal
|
|
|
import warnings
|
|
|
+import logging
|
|
|
|
|
|
from multiprocessing import Process, cpu_count, TimeoutError, Event
|
|
|
from multiprocessing import util
|
|
|
from multiprocessing.util import Finalize, debug
|
|
|
|
|
|
+from celery.datastructures import ExceptionInfo
|
|
|
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
|
|
|
from celery.exceptions import WorkerLostError
|
|
|
|
|
@@ -74,16 +76,30 @@ class LaxBoundedSemaphore(threading._Semaphore):
|
|
|
_Semaphore.__init__(self, value, verbose)
|
|
|
self._initial_value = value
|
|
|
|
|
|
- def release(self):
|
|
|
- if self._Semaphore__value < self._initial_value:
|
|
|
- _Semaphore.release(self)
|
|
|
- if __debug__:
|
|
|
- self._note("%s.release: success, value=%s (unchanged)" % (
|
|
|
- self, self._Semaphore__value))
|
|
|
+ if sys.version_info >= (3, 0):
|
|
|
+
|
|
|
+ def release(self):
|
|
|
+ if self._value < self._initial_value:
|
|
|
+ _Semaphore.release(self)
|
|
|
+ if __debug__:
|
|
|
+ self._note("%s.release: success, value=%s (unchanged)" % (
|
|
|
+ self, self._value))
|
|
|
+
|
|
|
+ def clear(self):
|
|
|
+ while self._value < self._initial_value:
|
|
|
+ _Semaphore.release(self)
|
|
|
+ else:
|
|
|
+
|
|
|
+ def release(self): # noqa
|
|
|
+ if self._Semaphore__value < self._initial_value:
|
|
|
+ _Semaphore.release(self)
|
|
|
+ if __debug__:
|
|
|
+ self._note("%s.release: success, value=%s (unchanged)" % (
|
|
|
+ self, self._Semaphore__value))
|
|
|
|
|
|
- def clear(self):
|
|
|
- while self._Semaphore__value < self._initial_value:
|
|
|
- _Semaphore.release(self)
|
|
|
+ def clear(self): # noqa
|
|
|
+ while self._Semaphore__value < self._initial_value:
|
|
|
+ _Semaphore.release(self)
|
|
|
|
|
|
#
|
|
|
# Exceptions
|
|
@@ -118,8 +134,19 @@ def soft_timeout_sighandler(signum, frame):
|
|
|
#
|
|
|
|
|
|
|
|
|
-def worker(inqueue, outqueue, initializer=None, initargs=(),
|
|
|
+def worker(inqueue, outqueue, initializer=None, initargs=(),
|
|
|
maxtasks=None, sentinel=None):
|
|
|
+ # Re-init logging system.
|
|
|
+ # Workaround for http://bugs.python.org/issue6721#msg140215
|
|
|
+ # Python logging module uses RLock() objects which are broken after
|
|
|
+ # fork. This can result in a deadlock (Issue #496).
|
|
|
+ logger_names = logging.Logger.manager.loggerDict.keys()
|
|
|
+ logger_names.append(None) # for root logger
|
|
|
+ for name in logger_names:
|
|
|
+ for handler in logging.getLogger(name).handlers:
|
|
|
+ handler.createLock()
|
|
|
+ logging._lock = threading.RLock()
|
|
|
+
|
|
|
pid = os.getpid()
|
|
|
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
|
|
|
put = outqueue.put
|
|
@@ -171,13 +198,15 @@ def worker(inqueue, outqueue, initializer=None, initargs=(),
|
|
|
put((ACK, (job, i, time.time(), pid)))
|
|
|
try:
|
|
|
result = (True, func(*args, **kwds))
|
|
|
- except Exception, e:
|
|
|
- result = (False, e)
|
|
|
+ except Exception:
|
|
|
+ result = (False, ExceptionInfo(sys.exc_info()))
|
|
|
try:
|
|
|
put((READY, (job, i, result)))
|
|
|
except Exception, exc:
|
|
|
+ _, _, tb = sys.exc_info()
|
|
|
wrapped = MaybeEncodingError(exc, result[1])
|
|
|
- put((READY, (job, i, (False, wrapped))))
|
|
|
+ einfo = ExceptionInfo((MaybeEncodingError, wrapped, tb))
|
|
|
+ put((READY, (job, i, (False, einfo))))
|
|
|
|
|
|
completed += 1
|
|
|
debug('worker exiting after %d tasks' % completed)
|
|
@@ -325,7 +354,12 @@ class TimeoutHandler(PoolThread):
|
|
|
return
|
|
|
debug('hard time limit exceeded for %i', i)
|
|
|
# Remove from cache and set return value to an exception
|
|
|
- job._set(i, (False, TimeLimitExceeded(hard_timeout)))
|
|
|
+ exc_info = None
|
|
|
+ try:
|
|
|
+ raise TimeLimitExceeded(hard_timeout)
|
|
|
+ except TimeLimitExceeded:
|
|
|
+ exc_info = sys.exc_info()
|
|
|
+ job._set(i, (False, ExceptionInfo(exc_info)))
|
|
|
|
|
|
# Remove from _pool
|
|
|
process, _index = _process_by_pid(job._worker_pid)
|
|
@@ -413,7 +447,7 @@ class ResultHandler(PoolThread):
|
|
|
debug('result handler starting')
|
|
|
while 1:
|
|
|
try:
|
|
|
- ready, task = poll(0.2)
|
|
|
+ ready, task = poll(1.0)
|
|
|
except (IOError, EOFError), exc:
|
|
|
debug('result handler got %r -- exiting' % (exc, ))
|
|
|
return
|
|
@@ -433,7 +467,7 @@ class ResultHandler(PoolThread):
|
|
|
time_terminate = None
|
|
|
while cache and self._state != TERMINATE:
|
|
|
try:
|
|
|
- ready, task = poll(0.2)
|
|
|
+ ready, task = poll(1.0)
|
|
|
except (IOError, EOFError), exc:
|
|
|
debug('result handler got %r -- exiting' % (exc, ))
|
|
|
return
|
|
@@ -567,7 +601,7 @@ class Pool(object):
|
|
|
self._poolctrl[w.pid] = sentinel
|
|
|
return w
|
|
|
|
|
|
- def _join_exited_workers(self, shutdown=False, lost_worker_timeout=10.0):
|
|
|
+ def _join_exited_workers(self, shutdown=False):
|
|
|
"""Cleanup after any worker processes which have exited due to
|
|
|
reaching their specified lifetime. Returns True if any workers were
|
|
|
cleaned up.
|
|
@@ -575,13 +609,18 @@ class Pool(object):
|
|
|
now = None
|
|
|
# The worker may have published a result before being terminated,
|
|
|
# but we have no way to accurately tell if it did. So we wait for
|
|
|
- # 10 seconds before we mark the job with WorkerLostError.
|
|
|
+ # _lost_worker_timeout seconds before we mark the job with
|
|
|
+ # WorkerLostError.
|
|
|
for job in [job for job in self._cache.values()
|
|
|
if not job.ready() and job._worker_lost]:
|
|
|
now = now or time.time()
|
|
|
- if now - job._worker_lost > lost_worker_timeout:
|
|
|
- err = WorkerLostError("Worker exited prematurely.")
|
|
|
- job._set(None, (False, err))
|
|
|
+ if now - job._worker_lost > job._lost_worker_timeout:
|
|
|
+ exc_info = None
|
|
|
+ try:
|
|
|
+ raise WorkerLostError("Worker exited prematurely.")
|
|
|
+ except WorkerLostError:
|
|
|
+ exc_info = ExceptionInfo(sys.exc_info())
|
|
|
+ job._set(None, (False, exc_info))
|
|
|
|
|
|
if shutdown and not len(self._pool):
|
|
|
raise WorkersJoined()
|
|
@@ -601,10 +640,11 @@ class Pool(object):
|
|
|
for job in self._cache.values():
|
|
|
for worker_pid in job.worker_pids():
|
|
|
if worker_pid in cleaned and not job.ready():
|
|
|
- if self._putlock is not None:
|
|
|
- self._putlock.release()
|
|
|
job._worker_lost = time.time()
|
|
|
continue
|
|
|
+ if self._putlock is not None:
|
|
|
+ for worker in cleaned:
|
|
|
+ self._putlock.release()
|
|
|
return True
|
|
|
return False
|
|
|
|
|
@@ -701,39 +741,44 @@ class Pool(object):
|
|
|
assert self._state == RUN
|
|
|
return self.map_async(func, iterable, chunksize).get()
|
|
|
|
|
|
- def imap(self, func, iterable, chunksize=1):
|
|
|
+ def imap(self, func, iterable, chunksize=1, lost_worker_timeout=10.0):
|
|
|
'''
|
|
|
Equivalent of `itertools.imap()` -- can be MUCH slower
|
|
|
than `Pool.map()`
|
|
|
'''
|
|
|
assert self._state == RUN
|
|
|
if chunksize == 1:
|
|
|
- result = IMapIterator(self._cache)
|
|
|
+ result = IMapIterator(self._cache,
|
|
|
+ lost_worker_timeout=lost_worker_timeout)
|
|
|
self._taskqueue.put((((result._job, i, func, (x,), {})
|
|
|
for i, x in enumerate(iterable)), result._set_length))
|
|
|
return result
|
|
|
else:
|
|
|
assert chunksize > 1
|
|
|
task_batches = Pool._get_tasks(func, iterable, chunksize)
|
|
|
- result = IMapIterator(self._cache)
|
|
|
+ result = IMapIterator(self._cache,
|
|
|
+ lost_worker_timeout=lost_worker_timeout)
|
|
|
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
|
|
|
for i, x in enumerate(task_batches)), result._set_length))
|
|
|
return (item for chunk in result for item in chunk)
|
|
|
|
|
|
- def imap_unordered(self, func, iterable, chunksize=1):
|
|
|
+ def imap_unordered(self, func, iterable, chunksize=1,
|
|
|
+ lost_worker_timeout=10.0):
|
|
|
'''
|
|
|
Like `imap()` method but ordering of results is arbitrary
|
|
|
'''
|
|
|
assert self._state == RUN
|
|
|
if chunksize == 1:
|
|
|
- result = IMapUnorderedIterator(self._cache)
|
|
|
+ result = IMapUnorderedIterator(self._cache,
|
|
|
+ lost_worker_timeout=lost_worker_timeout)
|
|
|
self._taskqueue.put((((result._job, i, func, (x,), {})
|
|
|
for i, x in enumerate(iterable)), result._set_length))
|
|
|
return result
|
|
|
else:
|
|
|
assert chunksize > 1
|
|
|
task_batches = Pool._get_tasks(func, iterable, chunksize)
|
|
|
- result = IMapUnorderedIterator(self._cache)
|
|
|
+ result = IMapUnorderedIterator(self._cache,
|
|
|
+ lost_worker_timeout=lost_worker_timeout)
|
|
|
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
|
|
|
for i, x in enumerate(task_batches)), result._set_length))
|
|
|
return (item for chunk in result for item in chunk)
|
|
@@ -762,19 +807,21 @@ class Pool(object):
|
|
|
warnings.warn(UserWarning("Soft timeouts are not supported: "
|
|
|
"on this platform: It does not have the SIGUSR1 signal."))
|
|
|
soft_timeout = None
|
|
|
- result = ApplyResult(self._cache, callback,
|
|
|
- accept_callback, timeout_callback,
|
|
|
- error_callback, soft_timeout, timeout)
|
|
|
-
|
|
|
if waitforslot and self._putlock is not None:
|
|
|
- self._putlock.acquire()
|
|
|
- if self._state != RUN:
|
|
|
- return
|
|
|
- if timeout or soft_timeout:
|
|
|
- # start the timeout handler thread when required.
|
|
|
- self._start_timeout_handler()
|
|
|
- self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
|
|
|
- return result
|
|
|
+ while 1:
|
|
|
+ if self._state != RUN or self._putlock.acquire(False):
|
|
|
+ break
|
|
|
+ time.sleep(1.0)
|
|
|
+ if self._state == RUN:
|
|
|
+ result = ApplyResult(self._cache, callback,
|
|
|
+ accept_callback, timeout_callback,
|
|
|
+ error_callback, soft_timeout, timeout)
|
|
|
+ if timeout or soft_timeout:
|
|
|
+ # start the timeout handler thread when required.
|
|
|
+ self._start_timeout_handler()
|
|
|
+ self._taskqueue.put(([(result._job, None,
|
|
|
+ func, args, kwds)], None))
|
|
|
+ return result
|
|
|
|
|
|
def map_async(self, func, iterable, chunksize=None, callback=None):
|
|
|
'''
|
|
@@ -912,7 +959,7 @@ class ApplyResult(object):
|
|
|
|
|
|
def __init__(self, cache, callback, accept_callback=None,
|
|
|
timeout_callback=None, error_callback=None, soft_timeout=None,
|
|
|
- timeout=None):
|
|
|
+ timeout=None, lost_worker_timeout=10.0):
|
|
|
self._mutex = threading.Lock()
|
|
|
self._cond = threading.Condition(threading.Lock())
|
|
|
self._job = job_counter.next()
|
|
@@ -924,6 +971,7 @@ class ApplyResult(object):
|
|
|
self._timeout_callback = timeout_callback
|
|
|
self._timeout = timeout
|
|
|
self._soft_timeout = soft_timeout
|
|
|
+ self._lost_worker_timeout = lost_worker_timeout
|
|
|
|
|
|
self._accepted = False
|
|
|
self._worker_pid = None
|
|
@@ -1067,15 +1115,19 @@ class MapResult(ApplyResult):
|
|
|
|
|
|
|
|
|
class IMapIterator(object):
|
|
|
+ _worker_lost = None
|
|
|
|
|
|
- def __init__(self, cache):
|
|
|
+ def __init__(self, cache, lost_worker_timeout=10.0):
|
|
|
self._cond = threading.Condition(threading.Lock())
|
|
|
self._job = job_counter.next()
|
|
|
self._cache = cache
|
|
|
self._items = collections.deque()
|
|
|
self._index = 0
|
|
|
self._length = None
|
|
|
+ self._ready = False
|
|
|
self._unsorted = {}
|
|
|
+ self._worker_pids = []
|
|
|
+ self._lost_worker_timeout = lost_worker_timeout
|
|
|
cache[self._job] = self
|
|
|
|
|
|
def __iter__(self):
|
|
@@ -1088,12 +1140,14 @@ class IMapIterator(object):
|
|
|
item = self._items.popleft()
|
|
|
except IndexError:
|
|
|
if self._index == self._length:
|
|
|
+ self._ready = True
|
|
|
raise StopIteration
|
|
|
self._cond.wait(timeout)
|
|
|
try:
|
|
|
item = self._items.popleft()
|
|
|
except IndexError:
|
|
|
if self._index == self._length:
|
|
|
+ self._ready = True
|
|
|
raise StopIteration
|
|
|
raise TimeoutError
|
|
|
finally:
|
|
@@ -1102,7 +1156,7 @@ class IMapIterator(object):
|
|
|
success, value = item
|
|
|
if success:
|
|
|
return value
|
|
|
- raise value
|
|
|
+ raise Exception(value)
|
|
|
|
|
|
__next__ = next # XXX
|
|
|
|
|
@@ -1121,6 +1175,7 @@ class IMapIterator(object):
|
|
|
self._unsorted[i] = obj
|
|
|
|
|
|
if self._index == self._length:
|
|
|
+ self._ready = True
|
|
|
del self._cache[self._job]
|
|
|
finally:
|
|
|
self._cond.release()
|
|
@@ -1130,11 +1185,21 @@ class IMapIterator(object):
|
|
|
try:
|
|
|
self._length = length
|
|
|
if self._index == self._length:
|
|
|
+ self._ready = True
|
|
|
self._cond.notify()
|
|
|
del self._cache[self._job]
|
|
|
finally:
|
|
|
self._cond.release()
|
|
|
|
|
|
+ def _ack(self, i, time_accepted, pid):
|
|
|
+ self._worker_pids.append(pid)
|
|
|
+
|
|
|
+ def ready(self):
|
|
|
+ return self._ready
|
|
|
+
|
|
|
+ def worker_pids(self):
|
|
|
+ return self._worker_pids
|
|
|
+
|
|
|
#
|
|
|
# Class whose instances are returned by `Pool.imap_unordered()`
|
|
|
#
|
|
@@ -1149,6 +1214,7 @@ class IMapUnorderedIterator(IMapIterator):
|
|
|
self._index += 1
|
|
|
self._cond.notify()
|
|
|
if self._index == self._length:
|
|
|
+ self._ready = True
|
|
|
del self._cache[self._job]
|
|
|
finally:
|
|
|
self._cond.release()
|