pool.py 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
  7. #
  8. __all__ = ['Pool']
  9. #
  10. # Imports
  11. #
  12. import os
  13. import errno
  14. import threading
  15. import Queue
  16. import itertools
  17. import collections
  18. import time
  19. import signal
  20. from multiprocessing import Process, cpu_count, TimeoutError
  21. from multiprocessing.util import Finalize, debug
  22. from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
  23. #
  24. # Constants representing the state of a pool
  25. #
  26. RUN = 0
  27. CLOSE = 1
  28. TERMINATE = 2
  29. #
  30. # Constants representing the state of a job
  31. #
  32. ACK = 0
  33. READY = 1
  34. # Signal used for soft time limits.
  35. SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
  36. #
  37. # Exceptions
  38. #
  39. class WorkerLostError(Exception):
  40. """The worker processing a job has exited prematurely."""
  41. pass
  42. #
  43. # Miscellaneous
  44. #
  45. job_counter = itertools.count()
  46. def mapstar(args):
  47. return map(*args)
  48. #
  49. # Code run by worker processes
  50. #
  51. class MaybeEncodingError(Exception):
  52. """Wraps unpickleable object."""
  53. def __init__(self, exc, value):
  54. self.exc = str(exc)
  55. self.value = repr(value)
  56. Exception.__init__(self, self.exc, self.value)
  57. def __repr__(self):
  58. return "<MaybeEncodingError: %s>" % str(self)
  59. def __str__(self):
  60. return "Error sending result: '%s'. Reason: '%s'." % (
  61. self.value, self.exc)
  62. def soft_timeout_sighandler(signum, frame):
  63. raise SoftTimeLimitExceeded()
  64. def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
  65. assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
  66. pid = os.getpid()
  67. put = outqueue.put
  68. get = inqueue.get
  69. if hasattr(inqueue, '_writer'):
  70. inqueue._writer.close()
  71. outqueue._reader.close()
  72. if initializer is not None:
  73. initializer(*initargs)
  74. if SIG_SOFT_TIMEOUT is not None:
  75. signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
  76. completed = 0
  77. while maxtasks is None or (maxtasks and completed < maxtasks):
  78. try:
  79. task = get()
  80. except (EOFError, IOError):
  81. debug('worker got EOFError or IOError -- exiting')
  82. break
  83. if task is None:
  84. debug('worker got sentinel -- exiting')
  85. break
  86. job, i, func, args, kwds = task
  87. put((ACK, (job, i, time.time(), pid)))
  88. try:
  89. result = (True, func(*args, **kwds))
  90. except Exception, e:
  91. result = (False, e)
  92. try:
  93. put((READY, (job, i, result)))
  94. except Exception, exc:
  95. wrapped = MaybeEncodingError(exc, result[1])
  96. put((READY, (job, i, (False, wrapped))))
  97. completed += 1
  98. debug('worker exiting after %d tasks' % completed)
  99. #
  100. # Class representing a process pool
  101. #
  102. class PoolThread(threading.Thread):
  103. def __init__(self, *args, **kwargs):
  104. threading.Thread.__init__(self)
  105. self._state = RUN
  106. self.daemon = True
  107. def terminate(self):
  108. self._state = TERMINATE
  109. def close(self):
  110. self._state = CLOSE
  111. class Supervisor(PoolThread):
  112. def __init__(self, pool):
  113. self.pool = pool
  114. super(Supervisor, self).__init__()
  115. def run(self):
  116. debug('worker handler starting')
  117. while self._state == RUN and self.pool._state == RUN:
  118. self.pool._maintain_pool()
  119. time.sleep(0.1)
  120. debug('worker handler exiting')
  121. class TaskHandler(PoolThread):
  122. def __init__(self, taskqueue, put, outqueue, pool):
  123. self.taskqueue = taskqueue
  124. self.put = put
  125. self.outqueue = outqueue
  126. self.pool = pool
  127. super(TaskHandler, self).__init__()
  128. def run(self):
  129. taskqueue = self.taskqueue
  130. outqueue = self.outqueue
  131. put = self.put
  132. pool = self.pool
  133. for taskseq, set_length in iter(taskqueue.get, None):
  134. i = -1
  135. for i, task in enumerate(taskseq):
  136. if self._state:
  137. debug('task handler found thread._state != RUN')
  138. break
  139. try:
  140. put(task)
  141. except IOError:
  142. debug('could not put task on queue')
  143. break
  144. else:
  145. if set_length:
  146. debug('doing set_length()')
  147. set_length(i+1)
  148. continue
  149. break
  150. else:
  151. debug('task handler got sentinel')
  152. try:
  153. # tell result handler to finish when cache is empty
  154. debug('task handler sending sentinel to result handler')
  155. outqueue.put(None)
  156. # tell workers there is no more work
  157. debug('task handler sending sentinel to workers')
  158. for p in pool:
  159. put(None)
  160. except IOError:
  161. debug('task handler got IOError when sending sentinels')
  162. debug('task handler exiting')
  163. class TimeoutHandler(PoolThread):
  164. def __init__(self, processes, cache, t_soft, t_hard):
  165. self.processes = processes
  166. self.cache = cache
  167. self.t_soft = t_soft
  168. self.t_hard = t_hard
  169. super(TimeoutHandler, self).__init__()
  170. def run(self):
  171. processes = self.processes
  172. cache = self.cache
  173. t_hard, t_soft = self.t_hard, self.t_soft
  174. dirty = set()
  175. def _process_by_pid(pid):
  176. for index, process in enumerate(processes):
  177. if process.pid == pid:
  178. return process, index
  179. return None, None
  180. def _pop_by_pid(pid):
  181. process, index = _process_by_pid(pid)
  182. if not process:
  183. return
  184. p = processes.pop(index)
  185. assert p is process
  186. return process
  187. def _timed_out(start, timeout):
  188. if not start or not timeout:
  189. return False
  190. if time.time() >= start + timeout:
  191. return True
  192. def _on_soft_timeout(job, i):
  193. debug('soft time limit exceeded for %i' % i)
  194. process, _index = _process_by_pid(job._worker_pid)
  195. if not process:
  196. return
  197. # Run timeout callback
  198. if job._timeout_callback is not None:
  199. job._timeout_callback(soft=True)
  200. try:
  201. os.kill(job._worker_pid, SIG_SOFT_TIMEOUT)
  202. except OSError, exc:
  203. if exc.errno == errno.ESRCH:
  204. pass
  205. else:
  206. raise
  207. dirty.add(i)
  208. def _on_hard_timeout(job, i):
  209. debug('hard time limit exceeded for %i', i)
  210. # Remove from _pool
  211. process = _pop_by_pid(job._worker_pid)
  212. # Remove from cache and set return value to an exception
  213. job._set(i, (False, TimeLimitExceeded()))
  214. # Run timeout callback
  215. if job._timeout_callback is not None:
  216. job._timeout_callback(soft=False)
  217. if not process:
  218. return
  219. # Terminate the process
  220. process.terminate()
  221. # Inner-loop
  222. while self._state == RUN:
  223. # Remove dirty items not in cache anymore
  224. if dirty:
  225. dirty = set(k for k in dirty if k in cache)
  226. for i, job in cache.items():
  227. ack_time = job._time_accepted
  228. if _timed_out(ack_time, t_hard):
  229. _on_hard_timeout(job, i)
  230. elif i not in dirty and _timed_out(ack_time, t_soft):
  231. _on_soft_timeout(job, i)
  232. time.sleep(0.5) # Don't waste CPU cycles.
  233. debug('timeout handler exiting')
  234. class ResultHandler(PoolThread):
  235. def __init__(self, outqueue, get, cache, poll,
  236. join_exited_workers, putlock):
  237. self.outqueue = outqueue
  238. self.get = get
  239. self.cache = cache
  240. self.poll = poll
  241. self.join_exited_workers = join_exited_workers
  242. self.putlock = putlock
  243. super(ResultHandler, self).__init__()
  244. def run(self):
  245. get = self.get
  246. outqueue = self.outqueue
  247. cache = self.cache
  248. poll = self.poll
  249. join_exited_workers = self.join_exited_workers
  250. putlock = self.putlock
  251. def on_ack(job, i, time_accepted, pid):
  252. try:
  253. cache[job]._ack(i, time_accepted, pid)
  254. except (KeyError, AttributeError):
  255. # Object gone or doesn't support _ack (e.g. IMAPIterator).
  256. pass
  257. def on_ready(job, i, obj):
  258. try:
  259. cache[job]._set(i, obj)
  260. except KeyError:
  261. pass
  262. state_handlers = {ACK: on_ack, READY: on_ready}
  263. def on_state_change(task):
  264. state, args = task
  265. try:
  266. state_handlers[state](*args)
  267. except KeyError:
  268. debug("Unknown job state: %s (args=%s)" % (state, args))
  269. debug('result handler starting')
  270. while 1:
  271. try:
  272. ready, task = poll(0.2)
  273. except (IOError, EOFError), exc:
  274. debug('result handler got %r -- exiting' % (exc, ))
  275. return
  276. if self._state:
  277. assert self._state == TERMINATE
  278. debug('result handler found thread._state=TERMINATE')
  279. break
  280. if ready:
  281. if task is None:
  282. debug('result handler got sentinel')
  283. break
  284. if putlock is not None:
  285. try:
  286. putlock.release()
  287. except ValueError:
  288. pass
  289. on_state_change(task)
  290. if putlock is not None:
  291. try:
  292. putlock.release()
  293. except ValueError:
  294. pass
  295. while cache and self._state != TERMINATE:
  296. try:
  297. ready, task = poll(0.2)
  298. except (IOError, EOFError), exc:
  299. debug('result handler got %r -- exiting' % (exc, ))
  300. return
  301. if ready:
  302. if task is None:
  303. debug('result handler ignoring extra sentinel')
  304. continue
  305. on_state_change(task)
  306. join_exited_workers()
  307. job, i, obj = task
  308. try:
  309. cache[job]._set(i, obj)
  310. except KeyError:
  311. pass
  312. if hasattr(outqueue, '_reader'):
  313. debug('ensuring that outqueue is not full')
  314. # If we don't make room available in outqueue then
  315. # attempts to add the sentinel (None) to outqueue may
  316. # block. There is guaranteed to be no more than 2 sentinels.
  317. try:
  318. for i in range(10):
  319. if not outqueue._reader.poll():
  320. break
  321. get()
  322. except (IOError, EOFError):
  323. pass
  324. debug('result handler exiting: len(cache)=%s, thread._state=%s',
  325. len(cache), self._state)
  326. class Pool(object):
  327. '''
  328. Class which supports an async version of the `apply()` builtin
  329. '''
  330. Process = Process
  331. Supervisor = Supervisor
  332. TaskHandler = TaskHandler
  333. TimeoutHandler = TimeoutHandler
  334. ResultHandler = ResultHandler
  335. SoftTimeLimitExceeded = SoftTimeLimitExceeded
  336. def __init__(self, processes=None, initializer=None, initargs=(),
  337. maxtasksperchild=None, timeout=None, soft_timeout=None):
  338. self._setup_queues()
  339. self._taskqueue = Queue.Queue()
  340. self._cache = {}
  341. self._state = RUN
  342. self.timeout = timeout
  343. self.soft_timeout = soft_timeout
  344. self._maxtasksperchild = maxtasksperchild
  345. self._initializer = initializer
  346. self._initargs = initargs
  347. if self.soft_timeout and SIG_SOFT_TIMEOUT is None:
  348. raise NotImplementedError("Soft timeouts not supported: "
  349. "Your platform does not have the SIGUSR1 signal.")
  350. if processes is None:
  351. try:
  352. processes = cpu_count()
  353. except NotImplementedError:
  354. processes = 1
  355. self._processes = processes
  356. if initializer is not None and not hasattr(initializer, '__call__'):
  357. raise TypeError('initializer must be a callable')
  358. self._pool = []
  359. for i in range(processes):
  360. self._create_worker_process()
  361. self._worker_handler = self.Supervisor(self)
  362. self._worker_handler.start()
  363. self._putlock = threading.BoundedSemaphore(self._processes)
  364. self._task_handler = self.TaskHandler(self._taskqueue,
  365. self._quick_put,
  366. self._outqueue,
  367. self._pool)
  368. self._task_handler.start()
  369. # Thread killing timedout jobs.
  370. if self.timeout or self.soft_timeout:
  371. self._timeout_handler = self.TimeoutHandler(
  372. self._pool, self._cache,
  373. self.soft_timeout, self.timeout)
  374. self._timeout_handler.start()
  375. else:
  376. self._timeout_handler = None
  377. # Thread processing results in the outqueue.
  378. self._result_handler = self.ResultHandler(self._outqueue,
  379. self._quick_get, self._cache,
  380. self._poll_result,
  381. self._join_exited_workers,
  382. self._putlock)
  383. self._result_handler.start()
  384. self._terminate = Finalize(
  385. self, self._terminate_pool,
  386. args=(self._taskqueue, self._inqueue, self._outqueue,
  387. self._pool, self._worker_handler, self._task_handler,
  388. self._result_handler, self._cache,
  389. self._timeout_handler),
  390. exitpriority=15,
  391. )
  392. def _create_worker_process(self):
  393. w = self.Process(
  394. target=worker,
  395. args=(self._inqueue, self._outqueue,
  396. self._initializer, self._initargs,
  397. self._maxtasksperchild),
  398. )
  399. self._pool.append(w)
  400. w.name = w.name.replace('Process', 'PoolWorker')
  401. w.daemon = True
  402. w.start()
  403. return w
  404. def _join_exited_workers(self):
  405. """Cleanup after any worker processes which have exited due to
  406. reaching their specified lifetime. Returns True if any workers were
  407. cleaned up.
  408. """
  409. cleaned = []
  410. for i in reversed(range(len(self._pool))):
  411. worker = self._pool[i]
  412. if worker.exitcode is not None:
  413. # worker exited
  414. debug('cleaning up worker %d' % i)
  415. if self._putlock is not None:
  416. try:
  417. self._putlock.release()
  418. except ValueError:
  419. pass
  420. worker.join()
  421. cleaned.append(worker.pid)
  422. del self._pool[i]
  423. if cleaned:
  424. for job in self._cache.values():
  425. for worker_pid in job.worker_pids():
  426. if worker_pid in cleaned:
  427. err = WorkerLostError("Worker exited prematurely.")
  428. job._set(None, (False, err))
  429. continue
  430. return True
  431. return False
  432. def _repopulate_pool(self):
  433. """Bring the number of pool processes up to the specified number,
  434. for use after reaping workers which have exited.
  435. """
  436. debug('repopulating pool')
  437. for i in range(self._processes - len(self._pool)):
  438. if self._state != RUN:
  439. return
  440. self._create_worker_process()
  441. debug('added worker')
  442. def _maintain_pool(self):
  443. """"Clean up any exited workers and start replacements for them.
  444. """
  445. if self._join_exited_workers():
  446. self._repopulate_pool()
  447. def _setup_queues(self):
  448. from multiprocessing.queues import SimpleQueue
  449. self._inqueue = SimpleQueue()
  450. self._outqueue = SimpleQueue()
  451. self._quick_put = self._inqueue._writer.send
  452. self._quick_get = self._outqueue._reader.recv
  453. def _poll_result(timeout):
  454. if self._outqueue._reader.poll(timeout):
  455. return True, self._quick_get()
  456. return False, None
  457. self._poll_result = _poll_result
  458. def apply(self, func, args=(), kwds={}):
  459. '''
  460. Equivalent of `apply()` builtin
  461. '''
  462. assert self._state == RUN
  463. return self.apply_async(func, args, kwds).get()
  464. def map(self, func, iterable, chunksize=None):
  465. '''
  466. Equivalent of `map()` builtin
  467. '''
  468. assert self._state == RUN
  469. return self.map_async(func, iterable, chunksize).get()
  470. def imap(self, func, iterable, chunksize=1):
  471. '''
  472. Equivalent of `itertools.imap()` -- can be MUCH slower
  473. than `Pool.map()`
  474. '''
  475. assert self._state == RUN
  476. if chunksize == 1:
  477. result = IMapIterator(self._cache)
  478. self._taskqueue.put((((result._job, i, func, (x,), {})
  479. for i, x in enumerate(iterable)), result._set_length))
  480. return result
  481. else:
  482. assert chunksize > 1
  483. task_batches = Pool._get_tasks(func, iterable, chunksize)
  484. result = IMapIterator(self._cache)
  485. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  486. for i, x in enumerate(task_batches)), result._set_length))
  487. return (item for chunk in result for item in chunk)
  488. def imap_unordered(self, func, iterable, chunksize=1):
  489. '''
  490. Like `imap()` method but ordering of results is arbitrary
  491. '''
  492. assert self._state == RUN
  493. if chunksize == 1:
  494. result = IMapUnorderedIterator(self._cache)
  495. self._taskqueue.put((((result._job, i, func, (x,), {})
  496. for i, x in enumerate(iterable)), result._set_length))
  497. return result
  498. else:
  499. assert chunksize > 1
  500. task_batches = Pool._get_tasks(func, iterable, chunksize)
  501. result = IMapUnorderedIterator(self._cache)
  502. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  503. for i, x in enumerate(task_batches)), result._set_length))
  504. return (item for chunk in result for item in chunk)
  505. def apply_async(self, func, args=(), kwds={},
  506. callback=None, accept_callback=None, timeout_callback=None,
  507. waitforslot=False, error_callback=None):
  508. '''
  509. Asynchronous equivalent of `apply()` builtin.
  510. Callback is called when the functions return value is ready.
  511. The accept callback is called when the job is accepted to be executed.
  512. Simplified the flow is like this:
  513. >>> if accept_callback:
  514. ... accept_callback()
  515. >>> retval = func(*args, **kwds)
  516. >>> if callback:
  517. ... callback(retval)
  518. '''
  519. assert self._state == RUN
  520. result = ApplyResult(self._cache, callback,
  521. accept_callback, timeout_callback,
  522. error_callback)
  523. if waitforslot:
  524. self._putlock.acquire()
  525. self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
  526. return result
  527. def map_async(self, func, iterable, chunksize=None, callback=None):
  528. '''
  529. Asynchronous equivalent of `map()` builtin
  530. '''
  531. assert self._state == RUN
  532. if not hasattr(iterable, '__len__'):
  533. iterable = list(iterable)
  534. if chunksize is None:
  535. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  536. if extra:
  537. chunksize += 1
  538. if len(iterable) == 0:
  539. chunksize = 0
  540. task_batches = Pool._get_tasks(func, iterable, chunksize)
  541. result = MapResult(self._cache, chunksize, len(iterable), callback)
  542. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  543. for i, x in enumerate(task_batches)), None))
  544. return result
  545. @staticmethod
  546. def _get_tasks(func, it, size):
  547. it = iter(it)
  548. while 1:
  549. x = tuple(itertools.islice(it, size))
  550. if not x:
  551. return
  552. yield (func, x)
  553. def __reduce__(self):
  554. raise NotImplementedError(
  555. 'pool objects cannot be passed between '
  556. 'processes or pickled')
  557. def close(self):
  558. debug('closing pool')
  559. if self._state == RUN:
  560. self._state = CLOSE
  561. self._worker_handler.close()
  562. self._worker_handler.join()
  563. self._taskqueue.put(None)
  564. def terminate(self):
  565. debug('terminating pool')
  566. self._state = TERMINATE
  567. self._worker_handler.terminate()
  568. self._terminate()
  569. def join(self):
  570. assert self._state in (CLOSE, TERMINATE)
  571. debug('joining worker handler')
  572. self._worker_handler.join()
  573. debug('joining task handler')
  574. self._task_handler.join()
  575. debug('joining result handler')
  576. self._result_handler.join()
  577. for i, p in enumerate(self._pool):
  578. debug('joining worker %s/%s (%r)' % (i, len(self._pool), p, ))
  579. p.join()
  580. @staticmethod
  581. def _help_stuff_finish(inqueue, task_handler, size):
  582. # task_handler may be blocked trying to put items on inqueue
  583. debug('removing tasks from inqueue until task handler finished')
  584. inqueue._rlock.acquire()
  585. while task_handler.is_alive() and inqueue._reader.poll():
  586. inqueue._reader.recv()
  587. time.sleep(0)
  588. @classmethod
  589. def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
  590. worker_handler, task_handler,
  591. result_handler, cache, timeout_handler):
  592. # this is guaranteed to only be called once
  593. debug('finalizing pool')
  594. worker_handler.terminate()
  595. task_handler.terminate()
  596. taskqueue.put(None) # sentinel
  597. debug('helping task handler/workers to finish')
  598. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  599. assert result_handler.is_alive() or len(cache) == 0
  600. result_handler.terminate()
  601. outqueue.put(None) # sentinel
  602. if timeout_handler is not None:
  603. timeout_handler.terminate()
  604. # Terminate workers which haven't already finished
  605. if pool and hasattr(pool[0], 'terminate'):
  606. debug('terminating workers')
  607. for p in pool:
  608. if p.exitcode is None:
  609. p.terminate()
  610. debug('joining task handler')
  611. task_handler.join(1e100)
  612. debug('joining result handler')
  613. result_handler.join(1e100)
  614. if timeout_handler is not None:
  615. debug('joining timeout handler')
  616. timeout_handler.join(1e100)
  617. if pool and hasattr(pool[0], 'terminate'):
  618. debug('joining pool workers')
  619. for p in pool:
  620. if p.is_alive():
  621. # worker has not yet exited
  622. debug('cleaning up worker %d' % p.pid)
  623. p.join()
  624. DynamicPool = Pool
  625. #
  626. # Class whose instances are returned by `Pool.apply_async()`
  627. #
  628. class ApplyResult(object):
  629. def __init__(self, cache, callback, accept_callback=None,
  630. timeout_callback=None, error_callback=None):
  631. self._cond = threading.Condition(threading.Lock())
  632. self._job = job_counter.next()
  633. self._cache = cache
  634. self._ready = False
  635. self._callback = callback
  636. self._accept_callback = accept_callback
  637. self._errback = error_callback
  638. self._timeout_callback = timeout_callback
  639. self._accepted = False
  640. self._worker_pid = None
  641. self._time_accepted = None
  642. cache[self._job] = self
  643. def ready(self):
  644. return self._ready
  645. def accepted(self):
  646. return self._accepted
  647. def successful(self):
  648. assert self._ready
  649. return self._success
  650. def worker_pids(self):
  651. return filter(None, [self._worker_pid])
  652. def wait(self, timeout=None):
  653. self._cond.acquire()
  654. try:
  655. if not self._ready:
  656. self._cond.wait(timeout)
  657. finally:
  658. self._cond.release()
  659. def get(self, timeout=None):
  660. self.wait(timeout)
  661. if not self._ready:
  662. raise TimeoutError
  663. if self._success:
  664. return self._value
  665. else:
  666. raise self._value
  667. def _set(self, i, obj):
  668. self._success, self._value = obj
  669. if self._callback and self._success:
  670. self._callback(self._value)
  671. if self._errback and not self._success:
  672. self._errback(self._value)
  673. self._cond.acquire()
  674. try:
  675. self._ready = True
  676. self._cond.notify()
  677. finally:
  678. self._cond.release()
  679. if self._accepted:
  680. self._cache.pop(self._job, None)
  681. def _ack(self, i, time_accepted, pid):
  682. self._accepted = True
  683. self._time_accepted = time_accepted
  684. self._worker_pid = pid
  685. if self._accept_callback:
  686. self._accept_callback()
  687. if self._ready:
  688. self._cache.pop(self._job, None)
  689. #
  690. # Class whose instances are returned by `Pool.map_async()`
  691. #
  692. class MapResult(ApplyResult):
  693. def __init__(self, cache, chunksize, length, callback):
  694. ApplyResult.__init__(self, cache, callback)
  695. self._success = True
  696. self._length = length
  697. self._value = [None] * length
  698. self._accepted = [False] * length
  699. self._worker_pid = [None] * length
  700. self._time_accepted = [None] * length
  701. self._chunksize = chunksize
  702. if chunksize <= 0:
  703. self._number_left = 0
  704. self._ready = True
  705. else:
  706. self._number_left = length//chunksize + bool(length % chunksize)
  707. def _set(self, i, success_result):
  708. success, result = success_result
  709. if success:
  710. self._value[i*self._chunksize:(i+1)*self._chunksize] = result
  711. self._number_left -= 1
  712. if self._number_left == 0:
  713. if self._callback:
  714. self._callback(self._value)
  715. if self._accepted:
  716. self._cache.pop(self._job, None)
  717. self._cond.acquire()
  718. try:
  719. self._ready = True
  720. self._cond.notify()
  721. finally:
  722. self._cond.release()
  723. else:
  724. self._success = False
  725. self._value = result
  726. if self._accepted:
  727. self._cache.pop(self._job, None)
  728. self._cond.acquire()
  729. try:
  730. self._ready = True
  731. self._cond.notify()
  732. finally:
  733. self._cond.release()
  734. def _ack(self, i, time_accepted, pid):
  735. start = i * self._chunksize
  736. stop = (i + 1) * self._chunksize
  737. for j in range(start, stop):
  738. self._accepted[j] = True
  739. self._worker_pid[j] = pid
  740. self._time_accepted[j] = time_accepted
  741. if self._ready:
  742. self._cache.pop(self._job, None)
  743. def accepted(self):
  744. return all(self._accepted)
  745. def worker_pids(self):
  746. return filter(None, self._worker_pid)
  747. #
  748. # Class whose instances are returned by `Pool.imap()`
  749. #
  750. class IMapIterator(object):
  751. def __init__(self, cache):
  752. self._cond = threading.Condition(threading.Lock())
  753. self._job = job_counter.next()
  754. self._cache = cache
  755. self._items = collections.deque()
  756. self._index = 0
  757. self._length = None
  758. self._unsorted = {}
  759. cache[self._job] = self
  760. def __iter__(self):
  761. return self
  762. def next(self, timeout=None):
  763. self._cond.acquire()
  764. try:
  765. try:
  766. item = self._items.popleft()
  767. except IndexError:
  768. if self._index == self._length:
  769. raise StopIteration
  770. self._cond.wait(timeout)
  771. try:
  772. item = self._items.popleft()
  773. except IndexError:
  774. if self._index == self._length:
  775. raise StopIteration
  776. raise TimeoutError
  777. finally:
  778. self._cond.release()
  779. success, value = item
  780. if success:
  781. return value
  782. raise value
  783. __next__ = next # XXX
  784. def _set(self, i, obj):
  785. self._cond.acquire()
  786. try:
  787. if self._index == i:
  788. self._items.append(obj)
  789. self._index += 1
  790. while self._index in self._unsorted:
  791. obj = self._unsorted.pop(self._index)
  792. self._items.append(obj)
  793. self._index += 1
  794. self._cond.notify()
  795. else:
  796. self._unsorted[i] = obj
  797. if self._index == self._length:
  798. del self._cache[self._job]
  799. finally:
  800. self._cond.release()
  801. def _set_length(self, length):
  802. self._cond.acquire()
  803. try:
  804. self._length = length
  805. if self._index == self._length:
  806. self._cond.notify()
  807. del self._cache[self._job]
  808. finally:
  809. self._cond.release()
  810. #
  811. # Class whose instances are returned by `Pool.imap_unordered()`
  812. #
  813. class IMapUnorderedIterator(IMapIterator):
  814. def _set(self, i, obj):
  815. self._cond.acquire()
  816. try:
  817. self._items.append(obj)
  818. self._index += 1
  819. self._cond.notify()
  820. if self._index == self._length:
  821. del self._cache[self._job]
  822. finally:
  823. self._cond.release()
  824. #
  825. #
  826. #
  827. class ThreadPool(Pool):
  828. from multiprocessing.dummy import Process as DummyProcess
  829. Process = DummyProcess
  830. def __init__(self, processes=None, initializer=None, initargs=()):
  831. Pool.__init__(self, processes, initializer, initargs)
  832. def _setup_queues(self):
  833. self._inqueue = Queue.Queue()
  834. self._outqueue = Queue.Queue()
  835. self._quick_put = self._inqueue.put
  836. self._quick_get = self._outqueue.get
  837. def _poll_result(timeout):
  838. try:
  839. return True, self._quick_get(timeout=timeout)
  840. except Queue.Empty:
  841. return False, None
  842. self._poll_result = _poll_result
  843. @staticmethod
  844. def _help_stuff_finish(inqueue, task_handler, size):
  845. # put sentinels at head of inqueue to make workers finish
  846. inqueue.not_empty.acquire()
  847. try:
  848. inqueue.queue.clear()
  849. inqueue.queue.extend([None] * size)
  850. inqueue.not_empty.notify_all()
  851. finally:
  852. inqueue.not_empty.release()