pool.py 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
  7. #
  8. __all__ = ['Pool']
  9. #
  10. # Imports
  11. #
  12. import os
  13. import errno
  14. import threading
  15. import Queue
  16. import itertools
  17. import collections
  18. import time
  19. import signal
  20. from multiprocessing import Process, cpu_count, TimeoutError
  21. from multiprocessing.util import Finalize, debug
  22. from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
  23. #
  24. # Constants representing the state of a pool
  25. #
  26. RUN = 0
  27. CLOSE = 1
  28. TERMINATE = 2
  29. # Signal used for soft time limits.
  30. SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
  31. #
  32. # Miscellaneous
  33. #
  34. job_counter = itertools.count()
  35. def mapstar(args):
  36. return map(*args)
  37. #
  38. # Code run by worker processes
  39. #
  40. class MaybeEncodingError(Exception):
  41. """Wraps unpickleable object."""
  42. def __init__(self, exc, value):
  43. self.exc = str(exc)
  44. self.value = repr(value)
  45. super(MaybeEncodingError, self).__init__(self.exc, self.value)
  46. def __repr__(self):
  47. return "<MaybeEncodingError: %s>" % str(self)
  48. def __str__(self):
  49. return "Error sending result: '%s'. Reason: '%s'." % (
  50. self.value, self.exc)
  51. def soft_timeout_sighandler(signum, frame):
  52. raise SoftTimeLimitExceeded()
  53. def worker(inqueue, outqueue, ackqueue, initializer=None, initargs=(),
  54. maxtasks=None):
  55. assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
  56. pid = os.getpid()
  57. put = outqueue.put
  58. get = inqueue.get
  59. ack = ackqueue.put
  60. if hasattr(inqueue, '_writer'):
  61. inqueue._writer.close()
  62. outqueue._reader.close()
  63. if initializer is not None:
  64. initializer(*initargs)
  65. if SIG_SOFT_TIMEOUT is not None:
  66. signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
  67. completed = 0
  68. while maxtasks is None or (maxtasks and completed < maxtasks):
  69. try:
  70. task = get()
  71. except (EOFError, IOError):
  72. debug('worker got EOFError or IOError -- exiting')
  73. break
  74. if task is None:
  75. debug('worker got sentinel -- exiting')
  76. break
  77. job, i, func, args, kwds = task
  78. ack((job, i, time.time(), pid))
  79. try:
  80. result = (True, func(*args, **kwds))
  81. except Exception, e:
  82. result = (False, e)
  83. try:
  84. put((job, i, result))
  85. except Exception, exc:
  86. wrapped = MaybeEncodingError(exc, result[1])
  87. put((job, i, (False, wrapped)))
  88. completed += 1
  89. debug('worker exiting after %d tasks' % completed)
  90. #
  91. # Class representing a process pool
  92. #
  93. class PoolThread(threading.Thread):
  94. def __init__(self, *args, **kwargs):
  95. threading.Thread.__init__(self)
  96. self._state = RUN
  97. self.daemon = True
  98. def terminate(self):
  99. self._state = TERMINATE
  100. def close(self):
  101. self._state = CLOSE
  102. class Supervisor(PoolThread):
  103. def __init__(self, pool):
  104. self.pool = pool
  105. super(Supervisor, self).__init__()
  106. def run(self):
  107. debug('worker handler starting')
  108. while self._state == RUN and self.pool._state == RUN:
  109. self.pool._maintain_pool()
  110. time.sleep(0.1)
  111. debug('worker handler exiting')
  112. class TaskHandler(PoolThread):
  113. def __init__(self, taskqueue, put, outqueue, pool):
  114. self.taskqueue = taskqueue
  115. self.put = put
  116. self.outqueue = outqueue
  117. self.pool = pool
  118. super(TaskHandler, self).__init__()
  119. def run(self):
  120. taskqueue = self.taskqueue
  121. outqueue = self.outqueue
  122. put = self.put
  123. pool = self.pool
  124. for taskseq, set_length in iter(taskqueue.get, None):
  125. i = -1
  126. for i, task in enumerate(taskseq):
  127. if self._state:
  128. debug('task handler found thread._state != RUN')
  129. break
  130. try:
  131. put(task)
  132. except IOError:
  133. debug('could not put task on queue')
  134. break
  135. else:
  136. if set_length:
  137. debug('doing set_length()')
  138. set_length(i+1)
  139. continue
  140. break
  141. else:
  142. debug('task handler got sentinel')
  143. try:
  144. # tell result handler to finish when cache is empty
  145. debug('task handler sending sentinel to result handler')
  146. outqueue.put(None)
  147. # tell workers there is no more work
  148. debug('task handler sending sentinel to workers')
  149. for p in pool:
  150. put(None)
  151. except IOError:
  152. debug('task handler got IOError when sending sentinels')
  153. debug('task handler exiting')
  154. class AckHandler(PoolThread):
  155. def __init__(self, ackqueue, get, cache):
  156. self.ackqueue = ackqueue
  157. self.get = get
  158. self.cache = cache
  159. super(AckHandler, self).__init__()
  160. def run(self):
  161. debug('ack handler starting')
  162. get = self.get
  163. cache = self.cache
  164. while 1:
  165. try:
  166. task = get()
  167. except (IOError, EOFError), exc:
  168. debug('ack handler got %s -- exiting',
  169. exc.__class__.__name__)
  170. if self._state:
  171. assert self._state == TERMINATE
  172. debug('ack handler found thread._state=TERMINATE')
  173. break
  174. if task is None:
  175. debug('ack handler got sentinel')
  176. break
  177. job, i, time_accepted, pid = task
  178. try:
  179. cache[job]._ack(i, time_accepted, pid)
  180. except (KeyError, AttributeError), exc:
  181. # Object gone, or doesn't support _ack (e.g. IMapIterator)
  182. pass
  183. while cache and self._state != TERMINATE:
  184. try:
  185. task = get()
  186. except (IOError, EOFError), exc:
  187. debug('ack handler got %s -- exiting',
  188. exc.__class__.__name__)
  189. return
  190. if task is None:
  191. debug('result handler ignoring extra sentinel')
  192. continue
  193. job, i, time_accepted, pid = task
  194. try:
  195. cache[job]._ack(i, time_accepted, pid)
  196. except KeyError:
  197. pass
  198. debug('ack handler exiting: len(cache)=%s, thread._state=%s',
  199. len(cache), self._state)
  200. class TimeoutHandler(PoolThread):
  201. def __init__(self, processes, cache, t_soft, t_hard):
  202. self.processes = processes
  203. self.cache = cache
  204. self.t_soft = t_soft
  205. self.t_hard = t_hard
  206. super(TimeoutHandler, self).__init__()
  207. def run(self):
  208. processes = self.processes
  209. cache = self.cache
  210. t_hard, t_soft = self.t_hard, self.t_soft
  211. dirty = set()
  212. def _process_by_pid(pid):
  213. for index, process in enumerate(processes):
  214. if process.pid == pid:
  215. return process, index
  216. return None, None
  217. def _pop_by_pid(pid):
  218. process, index = _process_by_pid(pid)
  219. if not process:
  220. return
  221. p = processes.pop(index)
  222. assert p is process
  223. return process
  224. def _timed_out(start, timeout):
  225. if not start or not timeout:
  226. return False
  227. if time.time() >= start + timeout:
  228. return True
  229. def _on_soft_timeout(job, i):
  230. debug('soft time limit exceeded for %i' % i)
  231. process, _index = _process_by_pid(job._accept_pid)
  232. if not process:
  233. return
  234. # Run timeout callback
  235. if job._timeout_callback is not None:
  236. job._timeout_callback(soft=True)
  237. try:
  238. os.kill(job._accept_pid, SIG_SOFT_TIMEOUT)
  239. except OSError, exc:
  240. if exc.errno == errno.ESRCH:
  241. pass
  242. else:
  243. raise
  244. dirty.add(i)
  245. def _on_hard_timeout(job, i):
  246. debug('hard time limit exceeded for %i', i)
  247. # Remove from _pool
  248. process = _pop_by_pid(job._accept_pid)
  249. # Remove from cache and set return value to an exception
  250. job._set(i, (False, TimeLimitExceeded()))
  251. # Run timeout callback
  252. if job._timeout_callback is not None:
  253. job._timeout_callback(soft=False)
  254. if not process:
  255. return
  256. # Terminate the process
  257. process.terminate()
  258. # Inner-loop
  259. while self._state == RUN:
  260. # Remove dirty items not in cache anymore
  261. if dirty:
  262. dirty = set(k for k in dirty if k in cache)
  263. for i, job in cache.items():
  264. ack_time = job._time_accepted
  265. if _timed_out(ack_time, t_hard):
  266. _on_hard_timeout(job, i)
  267. elif i not in dirty and _timed_out(ack_time, t_soft):
  268. _on_soft_timeout(job, i)
  269. time.sleep(0.5) # Don't waste CPU cycles.
  270. debug('timeout handler exiting')
  271. class ResultHandler(PoolThread):
  272. def __init__(self, outqueue, get, cache, putlock):
  273. self.outqueue = outqueue
  274. self.get = get
  275. self.cache = cache
  276. self.putlock = putlock
  277. super(ResultHandler, self).__init__()
  278. def run(self):
  279. get = self.get
  280. outqueue = self.outqueue
  281. cache = self.cache
  282. putlock = self.putlock
  283. debug('result handler starting')
  284. while 1:
  285. try:
  286. task = get()
  287. except (IOError, EOFError), exc:
  288. debug('result handler got %s -- exiting',
  289. exc.__class__.__name__)
  290. return
  291. if putlock is not None:
  292. try:
  293. putlock.release()
  294. except ValueError:
  295. pass
  296. if self._state:
  297. assert self._state == TERMINATE
  298. debug('result handler found thread._state=TERMINATE')
  299. break
  300. if task is None:
  301. debug('result handler got sentinel')
  302. break
  303. job, i, obj = task
  304. try:
  305. cache[job]._set(i, obj)
  306. except KeyError:
  307. pass
  308. if putlock is not None:
  309. try:
  310. putlock.release()
  311. except ValueError:
  312. pass
  313. while cache and self._state != TERMINATE:
  314. try:
  315. task = get()
  316. except (IOError, EOFError), exc:
  317. debug('result handler got %s -- exiting',
  318. exc.__class__.__name__)
  319. return
  320. if task is None:
  321. debug('result handler ignoring extra sentinel')
  322. continue
  323. job, i, obj = task
  324. try:
  325. cache[job]._set(i, obj)
  326. except KeyError:
  327. pass
  328. if hasattr(outqueue, '_reader'):
  329. debug('ensuring that outqueue is not full')
  330. # If we don't make room available in outqueue then
  331. # attempts to add the sentinel (None) to outqueue may
  332. # block. There is guaranteed to be no more than 2 sentinels.
  333. try:
  334. for i in range(10):
  335. if not outqueue._reader.poll():
  336. break
  337. get()
  338. except (IOError, EOFError):
  339. pass
  340. debug('result handler exiting: len(cache)=%s, thread._state=%s',
  341. len(cache), self._state)
  342. class Pool(object):
  343. '''
  344. Class which supports an async version of the `apply()` builtin
  345. '''
  346. Process = Process
  347. Supervisor = Supervisor
  348. TaskHandler = TaskHandler
  349. AckHandler = AckHandler
  350. TimeoutHandler = TimeoutHandler
  351. ResultHandler = ResultHandler
  352. SoftTimeLimitExceeded = SoftTimeLimitExceeded
  353. def __init__(self, processes=None, initializer=None, initargs=(),
  354. maxtasksperchild=None, timeout=None, soft_timeout=None):
  355. self._setup_queues()
  356. self._taskqueue = Queue.Queue()
  357. self._cache = {}
  358. self._state = RUN
  359. self.timeout = timeout
  360. self.soft_timeout = soft_timeout
  361. self._maxtasksperchild = maxtasksperchild
  362. self._initializer = initializer
  363. self._initargs = initargs
  364. if self.soft_timeout and SIG_SOFT_TIMEOUT is None:
  365. raise NotImplementedError("Soft timeouts not supported: "
  366. "Your platform does not have the SIGUSR1 signal.")
  367. if processes is None:
  368. try:
  369. processes = cpu_count()
  370. except NotImplementedError:
  371. processes = 1
  372. self._processes = processes
  373. if initializer is not None and not hasattr(initializer, '__call__'):
  374. raise TypeError('initializer must be a callable')
  375. self._pool = []
  376. for i in range(processes):
  377. self._create_worker_process()
  378. self._worker_handler = self.Supervisor(self)
  379. self._worker_handler.start()
  380. self._putlock = threading.BoundedSemaphore(self._processes)
  381. self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put,
  382. self._outqueue, self._pool)
  383. self._task_handler.start()
  384. # Thread processing acknowledgements from the ackqueue.
  385. self._ack_handler = self.AckHandler(self._ackqueue,
  386. self._quick_get_ack, self._cache)
  387. self._ack_handler.start()
  388. # Thread killing timedout jobs.
  389. if self.timeout or self.soft_timeout:
  390. self._timeout_handler = self.TimeoutHandler(
  391. self._pool, self._cache,
  392. self.soft_timeout, self.timeout)
  393. self._timeout_handler.start()
  394. else:
  395. self._timeout_handler = None
  396. # Thread processing results in the outqueue.
  397. self._result_handler = self.ResultHandler(self._outqueue,
  398. self._quick_get, self._cache,
  399. self._putlock)
  400. self._result_handler.start()
  401. self._terminate = Finalize(
  402. self, self._terminate_pool,
  403. args=(self._taskqueue, self._inqueue, self._outqueue,
  404. self._ackqueue, self._pool, self._ack_handler,
  405. self._worker_handler, self._task_handler,
  406. self._result_handler, self._cache,
  407. self._timeout_handler),
  408. exitpriority=15,
  409. )
  410. def _create_worker_process(self):
  411. w = self.Process(
  412. target=worker,
  413. args=(self._inqueue, self._outqueue, self._ackqueue,
  414. self._initializer, self._initargs,
  415. self._maxtasksperchild),
  416. )
  417. self._pool.append(w)
  418. w.name = w.name.replace('Process', 'PoolWorker')
  419. w.daemon = True
  420. w.start()
  421. return w
  422. def _join_exited_workers(self):
  423. """Cleanup after any worker processes which have exited due to
  424. reaching their specified lifetime. Returns True if any workers were
  425. cleaned up.
  426. """
  427. for i in reversed(range(len(self._pool))):
  428. worker = self._pool[i]
  429. if worker.exitcode is not None:
  430. # worker exited
  431. debug('cleaning up worker %d' % i)
  432. if self._putlock is not None:
  433. try:
  434. self._putlock.release()
  435. except ValueError:
  436. pass
  437. worker.join()
  438. del self._pool[i]
  439. return len(self._pool) < self._processes
  440. def _repopulate_pool(self):
  441. """Bring the number of pool processes up to the specified number,
  442. for use after reaping workers which have exited.
  443. """
  444. debug('repopulating pool')
  445. for i in range(self._processes - len(self._pool)):
  446. self._create_worker_process()
  447. debug('added worker')
  448. def _maintain_pool(self):
  449. """"Clean up any exited workers and start replacements for them.
  450. """
  451. if self._join_exited_workers():
  452. self._repopulate_pool()
  453. def _setup_queues(self):
  454. from multiprocessing.queues import SimpleQueue
  455. self._inqueue = SimpleQueue()
  456. self._outqueue = SimpleQueue()
  457. self._ackqueue = SimpleQueue()
  458. self._quick_put = self._inqueue._writer.send
  459. self._quick_get = self._outqueue._reader.recv
  460. self._quick_get_ack = self._ackqueue._reader.recv
  461. def apply(self, func, args=(), kwds={}):
  462. '''
  463. Equivalent of `apply()` builtin
  464. '''
  465. assert self._state == RUN
  466. return self.apply_async(func, args, kwds).get()
  467. def map(self, func, iterable, chunksize=None):
  468. '''
  469. Equivalent of `map()` builtin
  470. '''
  471. assert self._state == RUN
  472. return self.map_async(func, iterable, chunksize).get()
  473. def imap(self, func, iterable, chunksize=1):
  474. '''
  475. Equivalent of `itertools.imap()` -- can be MUCH slower
  476. than `Pool.map()`
  477. '''
  478. assert self._state == RUN
  479. if chunksize == 1:
  480. result = IMapIterator(self._cache)
  481. self._taskqueue.put((((result._job, i, func, (x,), {})
  482. for i, x in enumerate(iterable)), result._set_length))
  483. return result
  484. else:
  485. assert chunksize > 1
  486. task_batches = Pool._get_tasks(func, iterable, chunksize)
  487. result = IMapIterator(self._cache)
  488. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  489. for i, x in enumerate(task_batches)), result._set_length))
  490. return (item for chunk in result for item in chunk)
  491. def imap_unordered(self, func, iterable, chunksize=1):
  492. '''
  493. Like `imap()` method but ordering of results is arbitrary
  494. '''
  495. assert self._state == RUN
  496. if chunksize == 1:
  497. result = IMapUnorderedIterator(self._cache)
  498. self._taskqueue.put((((result._job, i, func, (x,), {})
  499. for i, x in enumerate(iterable)), result._set_length))
  500. return result
  501. else:
  502. assert chunksize > 1
  503. task_batches = Pool._get_tasks(func, iterable, chunksize)
  504. result = IMapUnorderedIterator(self._cache)
  505. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  506. for i, x in enumerate(task_batches)), result._set_length))
  507. return (item for chunk in result for item in chunk)
  508. def apply_async(self, func, args=(), kwds={},
  509. callback=None, accept_callback=None, timeout_callback=None,
  510. waitforslot=False, error_callback=None):
  511. '''
  512. Asynchronous equivalent of `apply()` builtin.
  513. Callback is called when the functions return value is ready.
  514. The accept callback is called when the job is accepted to be executed.
  515. Simplified the flow is like this:
  516. >>> if accept_callback:
  517. ... accept_callback()
  518. >>> retval = func(*args, **kwds)
  519. >>> if callback:
  520. ... callback(retval)
  521. '''
  522. assert self._state == RUN
  523. result = ApplyResult(self._cache, callback,
  524. accept_callback, timeout_callback,
  525. error_callback)
  526. if waitforslot:
  527. self._putlock.acquire()
  528. self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
  529. return result
  530. def map_async(self, func, iterable, chunksize=None, callback=None):
  531. '''
  532. Asynchronous equivalent of `map()` builtin
  533. '''
  534. assert self._state == RUN
  535. if not hasattr(iterable, '__len__'):
  536. iterable = list(iterable)
  537. if chunksize is None:
  538. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  539. if extra:
  540. chunksize += 1
  541. if len(iterable) == 0:
  542. chunksize = 0
  543. task_batches = Pool._get_tasks(func, iterable, chunksize)
  544. result = MapResult(self._cache, chunksize, len(iterable), callback)
  545. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  546. for i, x in enumerate(task_batches)), None))
  547. return result
  548. @staticmethod
  549. def _get_tasks(func, it, size):
  550. it = iter(it)
  551. while 1:
  552. x = tuple(itertools.islice(it, size))
  553. if not x:
  554. return
  555. yield (func, x)
  556. def __reduce__(self):
  557. raise NotImplementedError(
  558. 'pool objects cannot be passed between '
  559. 'processes or pickled')
  560. def close(self):
  561. debug('closing pool')
  562. if self._state == RUN:
  563. self._state = CLOSE
  564. self._worker_handler.close()
  565. self._taskqueue.put(None)
  566. def terminate(self):
  567. debug('terminating pool')
  568. self._state = TERMINATE
  569. self._worker_handler.terminate()
  570. self._terminate()
  571. def join(self):
  572. assert self._state in (CLOSE, TERMINATE)
  573. self._worker_handler.join()
  574. self._task_handler.join()
  575. self._result_handler.join()
  576. for p in self._pool:
  577. p.join()
  578. debug('after join()')
  579. @staticmethod
  580. def _help_stuff_finish(inqueue, task_handler, size):
  581. # task_handler may be blocked trying to put items on inqueue
  582. debug('removing tasks from inqueue until task handler finished')
  583. inqueue._rlock.acquire()
  584. while task_handler.is_alive() and inqueue._reader.poll():
  585. inqueue._reader.recv()
  586. time.sleep(0)
  587. @classmethod
  588. def _terminate_pool(cls, taskqueue, inqueue, outqueue, ackqueue, pool,
  589. ack_handler, worker_handler, task_handler,
  590. result_handler, cache, timeout_handler):
  591. # this is guaranteed to only be called once
  592. debug('finalizing pool')
  593. worker_handler.terminate()
  594. task_handler.terminate()
  595. taskqueue.put(None) # sentinel
  596. debug('helping task handler/workers to finish')
  597. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  598. assert result_handler.is_alive() or len(cache) == 0
  599. result_handler.terminate()
  600. outqueue.put(None) # sentinel
  601. ack_handler.terminate()
  602. ackqueue.put(None) # sentinel
  603. if timeout_handler is not None:
  604. timeout_handler.terminate()
  605. # Terminate workers which haven't already finished
  606. if pool and hasattr(pool[0], 'terminate'):
  607. debug('terminating workers')
  608. for p in pool:
  609. if p.exitcode is None:
  610. p.terminate()
  611. debug('joining task handler')
  612. task_handler.join(1e100)
  613. debug('joining result handler')
  614. result_handler.join(1e100)
  615. debug('joining ack handler')
  616. ack_handler.join(1e100)
  617. if timeout_handler is not None:
  618. debug('joining timeout handler')
  619. timeout_handler.join(1e100)
  620. if pool and hasattr(pool[0], 'terminate'):
  621. debug('joining pool workers')
  622. for p in pool:
  623. if p.is_alive():
  624. # worker has not yet exited
  625. debug('cleaning up worker %d' % p.pid)
  626. p.join()
  627. DynamicPool = Pool
  628. #
  629. # Class whose instances are returned by `Pool.apply_async()`
  630. #
  631. class ApplyResult(object):
  632. def __init__(self, cache, callback, accept_callback=None,
  633. timeout_callback=None, error_callback=None):
  634. self._cond = threading.Condition(threading.Lock())
  635. self._job = job_counter.next()
  636. self._cache = cache
  637. self._accepted = False
  638. self._accept_pid = None
  639. self._time_accepted = None
  640. self._ready = False
  641. self._callback = callback
  642. self._errback = error_callback
  643. self._accept_callback = accept_callback
  644. self._timeout_callback = timeout_callback
  645. cache[self._job] = self
  646. def ready(self):
  647. return self._ready
  648. def accepted(self):
  649. return self._accepted
  650. def successful(self):
  651. assert self._ready
  652. return self._success
  653. def wait(self, timeout=None):
  654. self._cond.acquire()
  655. try:
  656. if not self._ready:
  657. self._cond.wait(timeout)
  658. finally:
  659. self._cond.release()
  660. def get(self, timeout=None):
  661. self.wait(timeout)
  662. if not self._ready:
  663. raise TimeoutError
  664. if self._success:
  665. return self._value
  666. else:
  667. raise self._value
  668. def _set(self, i, obj):
  669. self._success, self._value = obj
  670. if self._callback and self._success:
  671. self._callback(self._value)
  672. if self._errback and not self._success:
  673. self._errback(self._value)
  674. self._cond.acquire()
  675. try:
  676. self._ready = True
  677. self._cond.notify()
  678. finally:
  679. self._cond.release()
  680. if self._accepted:
  681. del self._cache[self._job]
  682. def _ack(self, i, time_accepted, pid):
  683. self._accepted = True
  684. self._time_accepted = time_accepted
  685. self._accept_pid = pid
  686. if self._accept_callback:
  687. self._accept_callback()
  688. if self._ready:
  689. del self._cache[self._job]
  690. #
  691. # Class whose instances are returned by `Pool.map_async()`
  692. #
  693. class MapResult(ApplyResult):
  694. def __init__(self, cache, chunksize, length, callback):
  695. ApplyResult.__init__(self, cache, callback)
  696. self._success = True
  697. self._value = [None] * length
  698. self._chunksize = chunksize
  699. if chunksize <= 0:
  700. self._number_left = 0
  701. self._ready = True
  702. else:
  703. self._number_left = length//chunksize + bool(length % chunksize)
  704. def _set(self, i, success_result):
  705. success, result = success_result
  706. if success:
  707. self._value[i*self._chunksize:(i+1)*self._chunksize] = result
  708. self._number_left -= 1
  709. if self._number_left == 0:
  710. if self._callback:
  711. self._callback(self._value)
  712. del self._cache[self._job]
  713. self._cond.acquire()
  714. try:
  715. self._ready = True
  716. self._cond.notify()
  717. finally:
  718. self._cond.release()
  719. else:
  720. self._success = False
  721. self._value = result
  722. del self._cache[self._job]
  723. self._cond.acquire()
  724. try:
  725. self._ready = True
  726. self._cond.notify()
  727. finally:
  728. self._cond.release()
  729. #
  730. # Class whose instances are returned by `Pool.imap()`
  731. #
  732. class IMapIterator(object):
  733. def __init__(self, cache):
  734. self._cond = threading.Condition(threading.Lock())
  735. self._job = job_counter.next()
  736. self._cache = cache
  737. self._items = collections.deque()
  738. self._index = 0
  739. self._length = None
  740. self._unsorted = {}
  741. cache[self._job] = self
  742. def __iter__(self):
  743. return self
  744. def next(self, timeout=None):
  745. self._cond.acquire()
  746. try:
  747. try:
  748. item = self._items.popleft()
  749. except IndexError:
  750. if self._index == self._length:
  751. raise StopIteration
  752. self._cond.wait(timeout)
  753. try:
  754. item = self._items.popleft()
  755. except IndexError:
  756. if self._index == self._length:
  757. raise StopIteration
  758. raise TimeoutError
  759. finally:
  760. self._cond.release()
  761. success, value = item
  762. if success:
  763. return value
  764. raise value
  765. __next__ = next # XXX
  766. def _set(self, i, obj):
  767. self._cond.acquire()
  768. try:
  769. if self._index == i:
  770. self._items.append(obj)
  771. self._index += 1
  772. while self._index in self._unsorted:
  773. obj = self._unsorted.pop(self._index)
  774. self._items.append(obj)
  775. self._index += 1
  776. self._cond.notify()
  777. else:
  778. self._unsorted[i] = obj
  779. if self._index == self._length:
  780. del self._cache[self._job]
  781. finally:
  782. self._cond.release()
  783. def _set_length(self, length):
  784. self._cond.acquire()
  785. try:
  786. self._length = length
  787. if self._index == self._length:
  788. self._cond.notify()
  789. del self._cache[self._job]
  790. finally:
  791. self._cond.release()
  792. #
  793. # Class whose instances are returned by `Pool.imap_unordered()`
  794. #
  795. class IMapUnorderedIterator(IMapIterator):
  796. def _set(self, i, obj):
  797. self._cond.acquire()
  798. try:
  799. self._items.append(obj)
  800. self._index += 1
  801. self._cond.notify()
  802. if self._index == self._length:
  803. del self._cache[self._job]
  804. finally:
  805. self._cond.release()
  806. #
  807. #
  808. #
  809. class ThreadPool(Pool):
  810. from multiprocessing.dummy import Process as DummyProcess
  811. Process = DummyProcess
  812. def __init__(self, processes=None, initializer=None, initargs=()):
  813. Pool.__init__(self, processes, initializer, initargs)
  814. def _setup_queues(self):
  815. self._inqueue = Queue.Queue()
  816. self._outqueue = Queue.Queue()
  817. self._ackqueue = Queue.Queue()
  818. self._quick_put = self._inqueue.put
  819. self._quick_get = self._outqueue.get
  820. self._quick_get_ack = self._ackqueue.get
  821. @staticmethod
  822. def _help_stuff_finish(inqueue, task_handler, size):
  823. # put sentinels at head of inqueue to make workers finish
  824. inqueue.not_empty.acquire()
  825. try:
  826. inqueue.queue.clear()
  827. inqueue.queue.extend([None] * size)
  828. inqueue.not_empty.notify_all()
  829. finally:
  830. inqueue.not_empty.release()