pool.py 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
  7. #
  8. __all__ = ['Pool']
  9. #
  10. # Imports
  11. #
  12. import os
  13. import errno
  14. import threading
  15. import Queue
  16. import itertools
  17. import collections
  18. import time
  19. import signal
  20. from multiprocessing import Process, cpu_count, TimeoutError
  21. from multiprocessing.util import Finalize, debug
  22. from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
  23. #
  24. # Constants representing the state of a pool
  25. #
  26. RUN = 0
  27. CLOSE = 1
  28. TERMINATE = 2
  29. # Signal used for soft time limits.
  30. SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
  31. #
  32. # Miscellaneous
  33. #
  34. job_counter = itertools.count()
  35. def mapstar(args):
  36. return map(*args)
  37. #
  38. # Code run by worker processes
  39. #
  40. def soft_timeout_sighandler(signum, frame):
  41. raise SoftTimeLimitExceeded()
  42. class MaybeEncodingError(Exception):
  43. """Wraps unpickleable object."""
  44. def __init__(self, exc, value):
  45. self.exc = str(exc)
  46. self.value = repr(value)
  47. super(MaybeEncodingError, self).__init__(self.exc, self.value)
  48. def __str__(self):
  49. return "Error sending result: '%s'. Reason: '%s'." % (self.value,
  50. self.exc)
  51. def __repr__(self):
  52. return "<MaybeEncodingError: %s>" % str(self)
  53. def worker(inqueue, outqueue, ackqueue, initializer=None, initargs=(),
  54. maxtasks=None):
  55. assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
  56. pid = os.getpid()
  57. put = outqueue.put
  58. get = inqueue.get
  59. ack = ackqueue.put
  60. if hasattr(inqueue, '_writer'):
  61. inqueue._writer.close()
  62. outqueue._reader.close()
  63. if initializer is not None:
  64. initializer(*initargs)
  65. if SIG_SOFT_TIMEOUT is not None:
  66. signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
  67. completed = 0
  68. while maxtasks is None or (maxtasks and completed < maxtasks):
  69. try:
  70. task = get()
  71. except (EOFError, IOError):
  72. debug('worker got EOFError or IOError -- exiting')
  73. break
  74. if task is None:
  75. debug('worker got sentinel -- exiting')
  76. break
  77. job, i, func, args, kwds = task
  78. ack((job, i, time.time(), pid))
  79. try:
  80. result = (True, func(*args, **kwds))
  81. except Exception, e:
  82. result = (False, e)
  83. try:
  84. put((job, i, result))
  85. except Exception, exc:
  86. wrapped = MaybeEncodingError(exc, result[1])
  87. debug('Got possible encoding error while sending result: %s' % wrapped)
  88. put((job, i, (False, wrapped)))
  89. completed += 1
  90. debug('worker exiting after %d tasks' % completed)
  91. #
  92. # Class representing a process pool
  93. #
  94. class PoolThread(threading.Thread):
  95. def __init__(self, *args, **kwargs):
  96. threading.Thread.__init__(self)
  97. self._state = RUN
  98. self.daemon = True
  99. def terminate(self):
  100. self._state = TERMINATE
  101. def close(self):
  102. self._state = CLOSE
  103. class Supervisor(PoolThread):
  104. def __init__(self, pool):
  105. self.pool = pool
  106. super(Supervisor, self).__init__()
  107. def run(self):
  108. debug('worker handler starting')
  109. while self._state == RUN and self.pool._state == RUN:
  110. self.pool._maintain_pool()
  111. time.sleep(0.1)
  112. debug('worker handler exiting')
  113. class TaskHandler(PoolThread):
  114. def __init__(self, taskqueue, put, outqueue, pool):
  115. self.taskqueue = taskqueue
  116. self.put = put
  117. self.outqueue = outqueue
  118. self.pool = pool
  119. super(TaskHandler, self).__init__()
  120. def run(self):
  121. taskqueue = self.taskqueue
  122. outqueue = self.outqueue
  123. put = self.put
  124. pool = self.pool
  125. for taskseq, set_length in iter(taskqueue.get, None):
  126. i = -1
  127. for i, task in enumerate(taskseq):
  128. if self._state:
  129. debug('task handler found thread._state != RUN')
  130. break
  131. try:
  132. put(task)
  133. except IOError:
  134. debug('could not put task on queue')
  135. break
  136. else:
  137. if set_length:
  138. debug('doing set_length()')
  139. set_length(i+1)
  140. continue
  141. break
  142. else:
  143. debug('task handler got sentinel')
  144. try:
  145. # tell result handler to finish when cache is empty
  146. debug('task handler sending sentinel to result handler')
  147. outqueue.put(None)
  148. # tell workers there is no more work
  149. debug('task handler sending sentinel to workers')
  150. for p in pool:
  151. put(None)
  152. except IOError:
  153. debug('task handler got IOError when sending sentinels')
  154. debug('task handler exiting')
  155. class AckHandler(PoolThread):
  156. def __init__(self, ackqueue, get, cache):
  157. self.ackqueue = ackqueue
  158. self.get = get
  159. self.cache = cache
  160. super(AckHandler, self).__init__()
  161. def run(self):
  162. debug('ack handler starting')
  163. get = self.get
  164. cache = self.cache
  165. while 1:
  166. try:
  167. task = get()
  168. except (IOError, EOFError), exc:
  169. debug('ack handler got %s -- exiting',
  170. exc.__class__.__name__)
  171. if self._state:
  172. assert self._state == TERMINATE
  173. debug('ack handler found thread._state=TERMINATE')
  174. break
  175. if task is None:
  176. debug('ack handler got sentinel')
  177. break
  178. job, i, time_accepted, pid = task
  179. try:
  180. cache[job]._ack(i, time_accepted, pid)
  181. except (KeyError, AttributeError), exc:
  182. # Object gone, or doesn't support _ack (e.g. IMapIterator)
  183. pass
  184. while cache and self._state != TERMINATE:
  185. try:
  186. task = get()
  187. except (IOError, EOFError), exc:
  188. debug('ack handler got %s -- exiting',
  189. exc.__class__.__name__)
  190. return
  191. if task is None:
  192. debug('result handler ignoring extra sentinel')
  193. continue
  194. job, i, time_accepted, pid = task
  195. try:
  196. cache[job]._ack(i, time_accepted, pid)
  197. except KeyError:
  198. pass
  199. debug('ack handler exiting: len(cache)=%s, thread._state=%s',
  200. len(cache), self._state)
  201. class TimeoutHandler(PoolThread):
  202. def __init__(self, processes, cache, t_soft, t_hard):
  203. self.processes = processes
  204. self.cache = cache
  205. self.t_soft = t_soft
  206. self.t_hard = t_hard
  207. super(TimeoutHandler, self).__init__()
  208. def run(self):
  209. processes = self.processes
  210. cache = self.cache
  211. t_hard, t_soft = self.t_hard, self.t_soft
  212. dirty = set()
  213. def _process_by_pid(pid):
  214. for index, process in enumerate(processes):
  215. if process.pid == pid:
  216. return process, index
  217. return None, None
  218. def _pop_by_pid(pid):
  219. process, index = _process_by_pid(pid)
  220. if not process:
  221. return
  222. p = processes.pop(index)
  223. assert p is process
  224. return process
  225. def _timed_out(start, timeout):
  226. if not start or not timeout:
  227. return False
  228. if time.time() >= start + timeout:
  229. return True
  230. def _on_soft_timeout(job, i):
  231. debug('soft time limit exceeded for %i' % i)
  232. process, _index = _process_by_pid(job._accept_pid)
  233. if not process:
  234. return
  235. # Run timeout callback
  236. if job._timeout_callback is not None:
  237. job._timeout_callback(soft=True)
  238. try:
  239. os.kill(job._accept_pid, SIG_SOFT_TIMEOUT)
  240. except OSError, exc:
  241. if exc.errno == errno.ESRCH:
  242. pass
  243. else:
  244. raise
  245. dirty.add(i)
  246. def _on_hard_timeout(job, i):
  247. debug('hard time limit exceeded for %i', i)
  248. # Remove from _pool
  249. process = _pop_by_pid(job._accept_pid)
  250. # Remove from cache and set return value to an exception
  251. job._set(i, (False, TimeLimitExceeded()))
  252. # Run timeout callback
  253. if job._timeout_callback is not None:
  254. job._timeout_callback(soft=False)
  255. if not process:
  256. return
  257. # Terminate the process
  258. process.terminate()
  259. # Inner-loop
  260. while self._state == RUN:
  261. # Remove dirty items not in cache anymore
  262. if dirty:
  263. dirty = set(k for k in dirty if k in cache)
  264. for i, job in cache.items():
  265. ack_time = job._time_accepted
  266. if _timed_out(ack_time, t_hard):
  267. _on_hard_timeout(job, i)
  268. elif i not in dirty and _timed_out(ack_time, t_soft):
  269. _on_soft_timeout(job, i)
  270. time.sleep(0.5) # Don't waste CPU cycles.
  271. debug('timeout handler exiting')
  272. class ResultHandler(PoolThread):
  273. def __init__(self, outqueue, get, cache, putlock):
  274. self.outqueue = outqueue
  275. self.get = get
  276. self.cache = cache
  277. self.putlock = putlock
  278. super(ResultHandler, self).__init__()
  279. def run(self):
  280. get = self.get
  281. outqueue = self.outqueue
  282. cache = self.cache
  283. putlock = self.putlock
  284. debug('result handler starting')
  285. while 1:
  286. try:
  287. task = get()
  288. except (IOError, EOFError), exc:
  289. debug('result handler got %s -- exiting',
  290. exc.__class__.__name__)
  291. return
  292. if putlock is not None:
  293. try:
  294. putlock.release()
  295. except ValueError:
  296. pass
  297. if self._state:
  298. assert self._state == TERMINATE
  299. debug('result handler found thread._state=TERMINATE')
  300. break
  301. if task is None:
  302. debug('result handler got sentinel')
  303. break
  304. job, i, obj = task
  305. try:
  306. cache[job]._set(i, obj)
  307. except KeyError:
  308. pass
  309. if putlock is not None:
  310. try:
  311. putlock.release()
  312. except ValueError:
  313. pass
  314. while cache and self._state != TERMINATE:
  315. try:
  316. task = get()
  317. except (IOError, EOFError), exc:
  318. debug('result handler got %s -- exiting',
  319. exc.__class__.__name__)
  320. return
  321. if task is None:
  322. debug('result handler ignoring extra sentinel')
  323. continue
  324. job, i, obj = task
  325. try:
  326. cache[job]._set(i, obj)
  327. except KeyError:
  328. pass
  329. if hasattr(outqueue, '_reader'):
  330. debug('ensuring that outqueue is not full')
  331. # If we don't make room available in outqueue then
  332. # attempts to add the sentinel (None) to outqueue may
  333. # block. There is guaranteed to be no more than 2 sentinels.
  334. try:
  335. for i in range(10):
  336. if not outqueue._reader.poll():
  337. break
  338. get()
  339. except (IOError, EOFError):
  340. pass
  341. debug('result handler exiting: len(cache)=%s, thread._state=%s',
  342. len(cache), self._state)
  343. class Pool(object):
  344. '''
  345. Class which supports an async version of the `apply()` builtin
  346. '''
  347. Process = Process
  348. Supervisor = Supervisor
  349. TaskHandler = TaskHandler
  350. AckHandler = AckHandler
  351. TimeoutHandler = TimeoutHandler
  352. ResultHandler = ResultHandler
  353. SoftTimeLimitExceeded = SoftTimeLimitExceeded
  354. def __init__(self, processes=None, initializer=None, initargs=(),
  355. maxtasksperchild=None, timeout=None, soft_timeout=None):
  356. self._setup_queues()
  357. self._taskqueue = Queue.Queue()
  358. self._cache = {}
  359. self._state = RUN
  360. self.timeout = timeout
  361. self.soft_timeout = soft_timeout
  362. self._maxtasksperchild = maxtasksperchild
  363. self._initializer = initializer
  364. self._initargs = initargs
  365. if self.soft_timeout and SIG_SOFT_TIMEOUT is None:
  366. raise NotImplementedError("Soft timeouts not supported: "
  367. "Your platform does not have the SIGUSR1 signal.")
  368. if processes is None:
  369. try:
  370. processes = cpu_count()
  371. except NotImplementedError:
  372. processes = 1
  373. self._processes = processes
  374. if initializer is not None and not hasattr(initializer, '__call__'):
  375. raise TypeError('initializer must be a callable')
  376. self._pool = []
  377. for i in range(processes):
  378. self._create_worker_process()
  379. self._worker_handler = self.Supervisor(self)
  380. self._worker_handler.start()
  381. self._putlock = threading.BoundedSemaphore(self._processes)
  382. self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put,
  383. self._outqueue, self._pool)
  384. self._task_handler.start()
  385. # Thread processing acknowledgements from the ackqueue.
  386. self._ack_handler = self.AckHandler(self._ackqueue,
  387. self._quick_get_ack, self._cache)
  388. self._ack_handler.start()
  389. # Thread killing timedout jobs.
  390. if self.timeout or self.soft_timeout:
  391. self._timeout_handler = self.TimeoutHandler(
  392. self._pool, self._cache,
  393. self.soft_timeout, self.timeout)
  394. self._timeout_handler.start()
  395. else:
  396. self._timeout_handler = None
  397. # Thread processing results in the outqueue.
  398. self._result_handler = self.ResultHandler(self._outqueue,
  399. self._quick_get, self._cache,
  400. self._putlock)
  401. self._result_handler.start()
  402. self._terminate = Finalize(
  403. self, self._terminate_pool,
  404. args=(self._taskqueue, self._inqueue, self._outqueue,
  405. self._ackqueue, self._pool, self._ack_handler,
  406. self._worker_handler, self._task_handler,
  407. self._result_handler, self._cache,
  408. self._timeout_handler),
  409. exitpriority=15,
  410. )
  411. def _create_worker_process(self):
  412. w = self.Process(
  413. target=worker,
  414. args=(self._inqueue, self._outqueue, self._ackqueue,
  415. self._initializer, self._initargs,
  416. self._maxtasksperchild),
  417. )
  418. self._pool.append(w)
  419. w.name = w.name.replace('Process', 'PoolWorker')
  420. w.daemon = True
  421. w.start()
  422. return w
  423. def _join_exited_workers(self):
  424. """Cleanup after any worker processes which have exited due to
  425. reaching their specified lifetime. Returns True if any workers were
  426. cleaned up.
  427. """
  428. for i in reversed(range(len(self._pool))):
  429. worker = self._pool[i]
  430. if worker.exitcode is not None:
  431. # worker exited
  432. debug('cleaning up worker %d' % i)
  433. if self._putlock is not None:
  434. try:
  435. self._putlock.release()
  436. except ValueError:
  437. pass
  438. worker.join()
  439. del self._pool[i]
  440. return len(self._pool) < self._processes
  441. def _repopulate_pool(self):
  442. """Bring the number of pool processes up to the specified number,
  443. for use after reaping workers which have exited.
  444. """
  445. debug('repopulating pool')
  446. for i in range(self._processes - len(self._pool)):
  447. self._create_worker_process()
  448. debug('added worker')
  449. def _maintain_pool(self):
  450. """"Clean up any exited workers and start replacements for them.
  451. """
  452. if self._join_exited_workers():
  453. self._repopulate_pool()
  454. def _setup_queues(self):
  455. from multiprocessing.queues import SimpleQueue
  456. self._inqueue = SimpleQueue()
  457. self._outqueue = SimpleQueue()
  458. self._ackqueue = SimpleQueue()
  459. self._quick_put = self._inqueue._writer.send
  460. self._quick_get = self._outqueue._reader.recv
  461. self._quick_get_ack = self._ackqueue._reader.recv
  462. def apply(self, func, args=(), kwds={}):
  463. '''
  464. Equivalent of `apply()` builtin
  465. '''
  466. assert self._state == RUN
  467. return self.apply_async(func, args, kwds).get()
  468. def map(self, func, iterable, chunksize=None):
  469. '''
  470. Equivalent of `map()` builtin
  471. '''
  472. assert self._state == RUN
  473. return self.map_async(func, iterable, chunksize).get()
  474. def imap(self, func, iterable, chunksize=1):
  475. '''
  476. Equivalent of `itertools.imap()` -- can be MUCH slower
  477. than `Pool.map()`
  478. '''
  479. assert self._state == RUN
  480. if chunksize == 1:
  481. result = IMapIterator(self._cache)
  482. self._taskqueue.put((((result._job, i, func, (x,), {})
  483. for i, x in enumerate(iterable)), result._set_length))
  484. return result
  485. else:
  486. assert chunksize > 1
  487. task_batches = Pool._get_tasks(func, iterable, chunksize)
  488. result = IMapIterator(self._cache)
  489. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  490. for i, x in enumerate(task_batches)), result._set_length))
  491. return (item for chunk in result for item in chunk)
  492. def imap_unordered(self, func, iterable, chunksize=1):
  493. '''
  494. Like `imap()` method but ordering of results is arbitrary
  495. '''
  496. assert self._state == RUN
  497. if chunksize == 1:
  498. result = IMapUnorderedIterator(self._cache)
  499. self._taskqueue.put((((result._job, i, func, (x,), {})
  500. for i, x in enumerate(iterable)), result._set_length))
  501. return result
  502. else:
  503. assert chunksize > 1
  504. task_batches = Pool._get_tasks(func, iterable, chunksize)
  505. result = IMapUnorderedIterator(self._cache)
  506. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  507. for i, x in enumerate(task_batches)), result._set_length))
  508. return (item for chunk in result for item in chunk)
  509. def apply_async(self, func, args=(), kwds={},
  510. callback=None, accept_callback=None, timeout_callback=None,
  511. waitforslot=False, error_callback=None):
  512. '''
  513. Asynchronous equivalent of `apply()` builtin.
  514. Callback is called when the functions return value is ready.
  515. The accept callback is called when the job is accepted to be executed.
  516. Simplified the flow is like this:
  517. >>> if accept_callback:
  518. ... accept_callback()
  519. >>> retval = func(*args, **kwds)
  520. >>> if callback:
  521. ... callback(retval)
  522. '''
  523. assert self._state == RUN
  524. result = ApplyResult(self._cache, callback,
  525. accept_callback, timeout_callback,
  526. error_callback)
  527. if waitforslot:
  528. self._putlock.acquire()
  529. self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
  530. return result
  531. def map_async(self, func, iterable, chunksize=None, callback=None):
  532. '''
  533. Asynchronous equivalent of `map()` builtin
  534. '''
  535. assert self._state == RUN
  536. if not hasattr(iterable, '__len__'):
  537. iterable = list(iterable)
  538. if chunksize is None:
  539. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  540. if extra:
  541. chunksize += 1
  542. if len(iterable) == 0:
  543. chunksize = 0
  544. task_batches = Pool._get_tasks(func, iterable, chunksize)
  545. result = MapResult(self._cache, chunksize, len(iterable), callback)
  546. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  547. for i, x in enumerate(task_batches)), None))
  548. return result
  549. @staticmethod
  550. def _get_tasks(func, it, size):
  551. it = iter(it)
  552. while 1:
  553. x = tuple(itertools.islice(it, size))
  554. if not x:
  555. return
  556. yield (func, x)
  557. def __reduce__(self):
  558. raise NotImplementedError(
  559. 'pool objects cannot be passed between '
  560. 'processes or pickled')
  561. def close(self):
  562. debug('closing pool')
  563. if self._state == RUN:
  564. self._state = CLOSE
  565. self._worker_handler.close()
  566. self._taskqueue.put(None)
  567. def terminate(self):
  568. debug('terminating pool')
  569. self._state = TERMINATE
  570. self._worker_handler.terminate()
  571. self._terminate()
  572. def join(self):
  573. assert self._state in (CLOSE, TERMINATE)
  574. self._worker_handler.join()
  575. self._task_handler.join()
  576. self._result_handler.join()
  577. for p in self._pool:
  578. p.join()
  579. debug('after join()')
  580. @staticmethod
  581. def _help_stuff_finish(inqueue, task_handler, size):
  582. # task_handler may be blocked trying to put items on inqueue
  583. debug('removing tasks from inqueue until task handler finished')
  584. inqueue._rlock.acquire()
  585. while task_handler.is_alive() and inqueue._reader.poll():
  586. inqueue._reader.recv()
  587. time.sleep(0)
  588. @classmethod
  589. def _terminate_pool(cls, taskqueue, inqueue, outqueue, ackqueue, pool,
  590. ack_handler, worker_handler, task_handler,
  591. result_handler, cache, timeout_handler):
  592. # this is guaranteed to only be called once
  593. debug('finalizing pool')
  594. worker_handler.terminate()
  595. task_handler.terminate()
  596. taskqueue.put(None) # sentinel
  597. debug('helping task handler/workers to finish')
  598. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  599. assert result_handler.is_alive() or len(cache) == 0
  600. result_handler.terminate()
  601. outqueue.put(None) # sentinel
  602. ack_handler.terminate()
  603. ackqueue.put(None) # sentinel
  604. if timeout_handler is not None:
  605. timeout_handler.terminate()
  606. # Terminate workers which haven't already finished
  607. if pool and hasattr(pool[0], 'terminate'):
  608. debug('terminating workers')
  609. for p in pool:
  610. if p.exitcode is None:
  611. p.terminate()
  612. debug('joining task handler')
  613. task_handler.join(1e100)
  614. debug('joining result handler')
  615. result_handler.join(1e100)
  616. debug('joining ack handler')
  617. ack_handler.join(1e100)
  618. if timeout_handler is not None:
  619. debug('joining timeout handler')
  620. timeout_handler.join(1e100)
  621. if pool and hasattr(pool[0], 'terminate'):
  622. debug('joining pool workers')
  623. for p in pool:
  624. if p.is_alive():
  625. # worker has not yet exited
  626. debug('cleaning up worker %d' % p.pid)
  627. p.join()
  628. DynamicPool = Pool
  629. #
  630. # Class whose instances are returned by `Pool.apply_async()`
  631. #
  632. class ApplyResult(object):
  633. def __init__(self, cache, callback, accept_callback=None,
  634. timeout_callback=None, error_callback=None):
  635. self._cond = threading.Condition(threading.Lock())
  636. self._job = job_counter.next()
  637. self._cache = cache
  638. self._accepted = False
  639. self._accept_pid = None
  640. self._time_accepted = None
  641. self._ready = False
  642. self._callback = callback
  643. self._errback = error_callback
  644. self._accept_callback = accept_callback
  645. self._timeout_callback = timeout_callback
  646. cache[self._job] = self
  647. def ready(self):
  648. return self._ready
  649. def accepted(self):
  650. return self._accepted
  651. def successful(self):
  652. assert self._ready
  653. return self._success
  654. def wait(self, timeout=None):
  655. self._cond.acquire()
  656. try:
  657. if not self._ready:
  658. self._cond.wait(timeout)
  659. finally:
  660. self._cond.release()
  661. def get(self, timeout=None):
  662. self.wait(timeout)
  663. if not self._ready:
  664. raise TimeoutError
  665. if self._success:
  666. return self._value
  667. else:
  668. raise self._value
  669. def _set(self, i, obj):
  670. self._success, self._value = obj
  671. if self._callback and self._success:
  672. self._callback(self._value)
  673. if self._errback and not self._success:
  674. self._errback(self._value)
  675. self._cond.acquire()
  676. try:
  677. self._ready = True
  678. self._cond.notify()
  679. finally:
  680. self._cond.release()
  681. if self._accepted:
  682. del self._cache[self._job]
  683. def _ack(self, i, time_accepted, pid):
  684. self._accepted = True
  685. self._time_accepted = time_accepted
  686. self._accept_pid = pid
  687. if self._accept_callback:
  688. self._accept_callback()
  689. if self._ready:
  690. del self._cache[self._job]
  691. #
  692. # Class whose instances are returned by `Pool.map_async()`
  693. #
  694. class MapResult(ApplyResult):
  695. def __init__(self, cache, chunksize, length, callback):
  696. ApplyResult.__init__(self, cache, callback)
  697. self._success = True
  698. self._value = [None] * length
  699. self._chunksize = chunksize
  700. if chunksize <= 0:
  701. self._number_left = 0
  702. self._ready = True
  703. else:
  704. self._number_left = length//chunksize + bool(length % chunksize)
  705. def _set(self, i, success_result):
  706. success, result = success_result
  707. if success:
  708. self._value[i*self._chunksize:(i+1)*self._chunksize] = result
  709. self._number_left -= 1
  710. if self._number_left == 0:
  711. if self._callback:
  712. self._callback(self._value)
  713. del self._cache[self._job]
  714. self._cond.acquire()
  715. try:
  716. self._ready = True
  717. self._cond.notify()
  718. finally:
  719. self._cond.release()
  720. else:
  721. self._success = False
  722. self._value = result
  723. del self._cache[self._job]
  724. self._cond.acquire()
  725. try:
  726. self._ready = True
  727. self._cond.notify()
  728. finally:
  729. self._cond.release()
  730. #
  731. # Class whose instances are returned by `Pool.imap()`
  732. #
  733. class IMapIterator(object):
  734. def __init__(self, cache):
  735. self._cond = threading.Condition(threading.Lock())
  736. self._job = job_counter.next()
  737. self._cache = cache
  738. self._items = collections.deque()
  739. self._index = 0
  740. self._length = None
  741. self._unsorted = {}
  742. cache[self._job] = self
  743. def __iter__(self):
  744. return self
  745. def next(self, timeout=None):
  746. self._cond.acquire()
  747. try:
  748. try:
  749. item = self._items.popleft()
  750. except IndexError:
  751. if self._index == self._length:
  752. raise StopIteration
  753. self._cond.wait(timeout)
  754. try:
  755. item = self._items.popleft()
  756. except IndexError:
  757. if self._index == self._length:
  758. raise StopIteration
  759. raise TimeoutError
  760. finally:
  761. self._cond.release()
  762. success, value = item
  763. if success:
  764. return value
  765. raise value
  766. __next__ = next # XXX
  767. def _set(self, i, obj):
  768. self._cond.acquire()
  769. try:
  770. if self._index == i:
  771. self._items.append(obj)
  772. self._index += 1
  773. while self._index in self._unsorted:
  774. obj = self._unsorted.pop(self._index)
  775. self._items.append(obj)
  776. self._index += 1
  777. self._cond.notify()
  778. else:
  779. self._unsorted[i] = obj
  780. if self._index == self._length:
  781. del self._cache[self._job]
  782. finally:
  783. self._cond.release()
  784. def _set_length(self, length):
  785. self._cond.acquire()
  786. try:
  787. self._length = length
  788. if self._index == self._length:
  789. self._cond.notify()
  790. del self._cache[self._job]
  791. finally:
  792. self._cond.release()
  793. #
  794. # Class whose instances are returned by `Pool.imap_unordered()`
  795. #
  796. class IMapUnorderedIterator(IMapIterator):
  797. def _set(self, i, obj):
  798. self._cond.acquire()
  799. try:
  800. self._items.append(obj)
  801. self._index += 1
  802. self._cond.notify()
  803. if self._index == self._length:
  804. del self._cache[self._job]
  805. finally:
  806. self._cond.release()
  807. #
  808. #
  809. #
  810. class ThreadPool(Pool):
  811. from multiprocessing.dummy import Process as DummyProcess
  812. Process = DummyProcess
  813. def __init__(self, processes=None, initializer=None, initargs=()):
  814. Pool.__init__(self, processes, initializer, initargs)
  815. def _setup_queues(self):
  816. self._inqueue = Queue.Queue()
  817. self._outqueue = Queue.Queue()
  818. self._ackqueue = Queue.Queue()
  819. self._quick_put = self._inqueue.put
  820. self._quick_get = self._outqueue.get
  821. self._quick_get_ack = self._ackqueue.get
  822. @staticmethod
  823. def _help_stuff_finish(inqueue, task_handler, size):
  824. # put sentinels at head of inqueue to make workers finish
  825. inqueue.not_empty.acquire()
  826. try:
  827. inqueue.queue.clear()
  828. inqueue.queue.extend([None] * size)
  829. inqueue.not_empty.notify_all()
  830. finally:
  831. inqueue.not_empty.release()