pool.py 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
  7. #
  8. __all__ = ['Pool']
  9. #
  10. # Imports
  11. #
  12. import os
  13. import errno
  14. import threading
  15. import Queue
  16. import itertools
  17. import collections
  18. import time
  19. import signal
  20. from multiprocessing import Process, cpu_count, TimeoutError
  21. from multiprocessing.util import Finalize, debug
  22. from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
  23. from celery.exceptions import WorkerLostError
  24. #
  25. # Constants representing the state of a pool
  26. #
  27. RUN = 0
  28. CLOSE = 1
  29. TERMINATE = 2
  30. # Signal used for soft time limits.
  31. SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
  32. #
  33. # Miscellaneous
  34. #
  35. job_counter = itertools.count()
  36. def mapstar(args):
  37. return map(*args)
  38. #
  39. # Code run by worker processes
  40. #
  41. class MaybeEncodingError(Exception):
  42. """Wraps unpickleable object."""
  43. def __init__(self, exc, value):
  44. self.exc = str(exc)
  45. self.value = repr(value)
  46. super(MaybeEncodingError, self).__init__(self.exc, self.value)
  47. def __repr__(self):
  48. return "<MaybeEncodingError: %s>" % str(self)
  49. def __str__(self):
  50. return "Error sending result: '%s'. Reason: '%s'." % (
  51. self.value, self.exc)
  52. def soft_timeout_sighandler(signum, frame):
  53. raise SoftTimeLimitExceeded()
  54. def worker(inqueue, outqueue, ackqueue, initializer=None, initargs=(),
  55. maxtasks=None):
  56. assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
  57. pid = os.getpid()
  58. put = outqueue.put
  59. get = inqueue.get
  60. ack = ackqueue.put
  61. if hasattr(inqueue, '_writer'):
  62. inqueue._writer.close()
  63. outqueue._reader.close()
  64. if initializer is not None:
  65. initializer(*initargs)
  66. if SIG_SOFT_TIMEOUT is not None:
  67. signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
  68. completed = 0
  69. while maxtasks is None or (maxtasks and completed < maxtasks):
  70. try:
  71. task = get()
  72. except (EOFError, IOError):
  73. debug('worker got EOFError or IOError -- exiting')
  74. break
  75. if task is None:
  76. debug('worker got sentinel -- exiting')
  77. break
  78. job, i, func, args, kwds = task
  79. ack((job, i, time.time(), pid))
  80. try:
  81. result = (True, func(*args, **kwds))
  82. except Exception, e:
  83. result = (False, e)
  84. except BaseException, e:
  85. # Job raised SystemExit or equivalent, so tell the result
  86. # handler and exit the process so it can be replaced.
  87. err = WorkerLostError(
  88. "Worker has terminated by user request: %r" % (e, ))
  89. put((job, i, (False, err)))
  90. raise
  91. try:
  92. put((job, i, result))
  93. except Exception, exc:
  94. wrapped = MaybeEncodingError(exc, result[1])
  95. put((job, i, (False, wrapped)))
  96. completed += 1
  97. debug('worker exiting after %d tasks' % completed)
  98. #
  99. # Class representing a process pool
  100. #
  101. class PoolThread(threading.Thread):
  102. def __init__(self, *args, **kwargs):
  103. threading.Thread.__init__(self)
  104. self._state = RUN
  105. self.daemon = True
  106. def terminate(self):
  107. self._state = TERMINATE
  108. def close(self):
  109. self._state = CLOSE
  110. class Supervisor(PoolThread):
  111. def __init__(self, pool):
  112. self.pool = pool
  113. super(Supervisor, self).__init__()
  114. def run(self):
  115. debug('worker handler starting')
  116. while self._state == RUN and self.pool._state == RUN:
  117. self.pool._maintain_pool()
  118. time.sleep(0.1)
  119. debug('worker handler exiting')
  120. class TaskHandler(PoolThread):
  121. def __init__(self, taskqueue, put, outqueue, pool):
  122. self.taskqueue = taskqueue
  123. self.put = put
  124. self.outqueue = outqueue
  125. self.pool = pool
  126. super(TaskHandler, self).__init__()
  127. def run(self):
  128. taskqueue = self.taskqueue
  129. outqueue = self.outqueue
  130. put = self.put
  131. pool = self.pool
  132. for taskseq, set_length in iter(taskqueue.get, None):
  133. i = -1
  134. for i, task in enumerate(taskseq):
  135. if self._state:
  136. debug('task handler found thread._state != RUN')
  137. break
  138. try:
  139. put(task)
  140. except IOError:
  141. debug('could not put task on queue')
  142. break
  143. else:
  144. if set_length:
  145. debug('doing set_length()')
  146. set_length(i+1)
  147. continue
  148. break
  149. else:
  150. debug('task handler got sentinel')
  151. try:
  152. # tell result handler to finish when cache is empty
  153. debug('task handler sending sentinel to result handler')
  154. outqueue.put(None)
  155. # tell workers there is no more work
  156. debug('task handler sending sentinel to workers')
  157. for p in pool:
  158. put(None)
  159. except IOError:
  160. debug('task handler got IOError when sending sentinels')
  161. debug('task handler exiting')
  162. class AckHandler(PoolThread):
  163. def __init__(self, ackqueue, get, cache):
  164. self.ackqueue = ackqueue
  165. self.get = get
  166. self.cache = cache
  167. super(AckHandler, self).__init__()
  168. def run(self):
  169. debug('ack handler starting')
  170. get = self.get
  171. cache = self.cache
  172. while 1:
  173. try:
  174. task = get()
  175. except (IOError, EOFError), exc:
  176. debug('ack handler got %s -- exiting',
  177. exc.__class__.__name__)
  178. if self._state:
  179. assert self._state == TERMINATE
  180. debug('ack handler found thread._state=TERMINATE')
  181. break
  182. if task is None:
  183. debug('ack handler got sentinel')
  184. break
  185. job, i, time_accepted, pid = task
  186. try:
  187. cache[job]._ack(i, time_accepted, pid)
  188. except (KeyError, AttributeError), exc:
  189. # Object gone, or doesn't support _ack (e.g. IMapIterator)
  190. pass
  191. while cache and self._state != TERMINATE:
  192. try:
  193. task = get()
  194. except (IOError, EOFError), exc:
  195. debug('ack handler got %s -- exiting',
  196. exc.__class__.__name__)
  197. return
  198. if task is None:
  199. debug('result handler ignoring extra sentinel')
  200. continue
  201. job, i, time_accepted, pid = task
  202. try:
  203. cache[job]._ack(i, time_accepted, pid)
  204. except KeyError:
  205. pass
  206. debug('ack handler exiting: len(cache)=%s, thread._state=%s',
  207. len(cache), self._state)
  208. class TimeoutHandler(PoolThread):
  209. def __init__(self, processes, cache, t_soft, t_hard):
  210. self.processes = processes
  211. self.cache = cache
  212. self.t_soft = t_soft
  213. self.t_hard = t_hard
  214. super(TimeoutHandler, self).__init__()
  215. def run(self):
  216. processes = self.processes
  217. cache = self.cache
  218. t_hard, t_soft = self.t_hard, self.t_soft
  219. dirty = set()
  220. def _process_by_pid(pid):
  221. for index, process in enumerate(processes):
  222. if process.pid == pid:
  223. return process, index
  224. return None, None
  225. def _pop_by_pid(pid):
  226. process, index = _process_by_pid(pid)
  227. if not process:
  228. return
  229. p = processes.pop(index)
  230. assert p is process
  231. return process
  232. def _timed_out(start, timeout):
  233. if not start or not timeout:
  234. return False
  235. if time.time() >= start + timeout:
  236. return True
  237. def _on_soft_timeout(job, i):
  238. debug('soft time limit exceeded for %i' % i)
  239. process, _index = _process_by_pid(job._accept_pid)
  240. if not process:
  241. return
  242. # Run timeout callback
  243. if job._timeout_callback is not None:
  244. job._timeout_callback(soft=True)
  245. try:
  246. os.kill(job._accept_pid, SIG_SOFT_TIMEOUT)
  247. except OSError, exc:
  248. if exc.errno == errno.ESRCH:
  249. pass
  250. else:
  251. raise
  252. dirty.add(i)
  253. def _on_hard_timeout(job, i):
  254. debug('hard time limit exceeded for %i', i)
  255. # Remove from _pool
  256. process = _pop_by_pid(job._accept_pid)
  257. # Remove from cache and set return value to an exception
  258. job._set(i, (False, TimeLimitExceeded()))
  259. # Run timeout callback
  260. if job._timeout_callback is not None:
  261. job._timeout_callback(soft=False)
  262. if not process:
  263. return
  264. # Terminate the process
  265. process.terminate()
  266. # Inner-loop
  267. while self._state == RUN:
  268. # Remove dirty items not in cache anymore
  269. if dirty:
  270. dirty = set(k for k in dirty if k in cache)
  271. for i, job in cache.items():
  272. ack_time = job._time_accepted
  273. if _timed_out(ack_time, t_hard):
  274. _on_hard_timeout(job, i)
  275. elif i not in dirty and _timed_out(ack_time, t_soft):
  276. _on_soft_timeout(job, i)
  277. time.sleep(0.5) # Don't waste CPU cycles.
  278. debug('timeout handler exiting')
  279. class ResultHandler(PoolThread):
  280. def __init__(self, outqueue, get, cache, putlock, poll, workers_gone):
  281. self.outqueue = outqueue
  282. self.get = get
  283. self.cache = cache
  284. self.putlock = putlock
  285. self.poll = poll
  286. self.workers_gone = workers_gone
  287. super(ResultHandler, self).__init__()
  288. def run(self):
  289. get = self.get
  290. outqueue = self.outqueue
  291. cache = self.cache
  292. putlock = self.putlock
  293. poll = self.poll
  294. workers_gone = self.workers_gone
  295. debug('result handler starting')
  296. while 1:
  297. try:
  298. task = get()
  299. except (IOError, EOFError), exc:
  300. debug('result handler got %s -- exiting',
  301. exc.__class__.__name__)
  302. return
  303. if putlock is not None:
  304. try:
  305. putlock.release()
  306. except ValueError:
  307. pass
  308. if self._state:
  309. assert self._state == TERMINATE
  310. debug('result handler found thread._state=TERMINATE')
  311. break
  312. if task is None:
  313. debug('result handler got sentinel')
  314. break
  315. job, i, obj = task
  316. try:
  317. cache[job]._set(i, obj)
  318. except KeyError:
  319. pass
  320. if putlock is not None:
  321. try:
  322. putlock.release()
  323. except ValueError:
  324. pass
  325. while cache and self._state != TERMINATE:
  326. try:
  327. ready, task = poll(0.2)
  328. except (IOError, EOFError), exc:
  329. debug('result handler got %s -- exiting',
  330. exc.__class__.__name__)
  331. return
  332. if ready:
  333. if task is None:
  334. debug('result handler ignoring extra sentinel')
  335. continue
  336. job, i, obj = task
  337. try:
  338. cache[job]._set(i, obj)
  339. except KeyError:
  340. pass
  341. else:
  342. if workers_gone():
  343. debug("%s active job(s), but no active workers! "
  344. "Terminating..." % (len(cache), ))
  345. err = WorkerLostError(
  346. "The worker processing this job has terminated.")
  347. for job in cache.values():
  348. job._set(None, (False, err))
  349. if hasattr(outqueue, '_reader'):
  350. debug('ensuring that outqueue is not full')
  351. # If we don't make room available in outqueue then
  352. # attempts to add the sentinel (None) to outqueue may
  353. # block. There is guaranteed to be no more than 2 sentinels.
  354. try:
  355. for i in range(10):
  356. if not outqueue._reader.poll():
  357. break
  358. get()
  359. except (IOError, EOFError):
  360. pass
  361. debug('result handler exiting: len(cache)=%s, thread._state=%s',
  362. len(cache), self._state)
  363. class Pool(object):
  364. '''
  365. Class which supports an async version of the `apply()` builtin
  366. '''
  367. Process = Process
  368. Supervisor = Supervisor
  369. TaskHandler = TaskHandler
  370. AckHandler = AckHandler
  371. TimeoutHandler = TimeoutHandler
  372. ResultHandler = ResultHandler
  373. SoftTimeLimitExceeded = SoftTimeLimitExceeded
  374. def __init__(self, processes=None, initializer=None, initargs=(),
  375. maxtasksperchild=None, timeout=None, soft_timeout=None):
  376. self._setup_queues()
  377. self._taskqueue = Queue.Queue()
  378. self._cache = {}
  379. self._state = RUN
  380. self.timeout = timeout
  381. self.soft_timeout = soft_timeout
  382. self._maxtasksperchild = maxtasksperchild
  383. self._initializer = initializer
  384. self._initargs = initargs
  385. if self.soft_timeout and SIG_SOFT_TIMEOUT is None:
  386. raise NotImplementedError("Soft timeouts not supported: "
  387. "Your platform does not have the SIGUSR1 signal.")
  388. if processes is None:
  389. try:
  390. processes = cpu_count()
  391. except NotImplementedError:
  392. processes = 1
  393. self._processes = processes
  394. if initializer is not None and not hasattr(initializer, '__call__'):
  395. raise TypeError('initializer must be a callable')
  396. self._pool = []
  397. for i in range(processes):
  398. self._create_worker_process()
  399. self._worker_handler = self.Supervisor(self)
  400. self._worker_handler.start()
  401. self._putlock = threading.BoundedSemaphore(self._processes)
  402. self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put,
  403. self._outqueue, self._pool)
  404. self._task_handler.start()
  405. # Thread processing acknowledgements from the ackqueue.
  406. self._ack_handler = self.AckHandler(self._ackqueue,
  407. self._quick_get_ack, self._cache)
  408. self._ack_handler.start()
  409. # Thread killing timedout jobs.
  410. if self.timeout or self.soft_timeout:
  411. self._timeout_handler = self.TimeoutHandler(
  412. self._pool, self._cache,
  413. self.soft_timeout, self.timeout)
  414. self._timeout_handler.start()
  415. else:
  416. self._timeout_handler = None
  417. # Thread processing results in the outqueue.
  418. self._result_handler = self.ResultHandler(self._outqueue,
  419. self._quick_get, self._cache,
  420. self._putlock, self._poll_result,
  421. self._workers_gone)
  422. self._result_handler.start()
  423. self._terminate = Finalize(
  424. self, self._terminate_pool,
  425. args=(self._taskqueue, self._inqueue, self._outqueue,
  426. self._ackqueue, self._pool, self._ack_handler,
  427. self._worker_handler, self._task_handler,
  428. self._result_handler, self._cache,
  429. self._timeout_handler),
  430. exitpriority=15,
  431. )
  432. def _create_worker_process(self):
  433. w = self.Process(
  434. target=worker,
  435. args=(self._inqueue, self._outqueue, self._ackqueue,
  436. self._initializer, self._initargs,
  437. self._maxtasksperchild),
  438. )
  439. self._pool.append(w)
  440. w.name = w.name.replace('Process', 'PoolWorker')
  441. w.daemon = True
  442. w.start()
  443. return w
  444. def _workers_gone(self):
  445. self._join_exited_workers()
  446. return not self._pool
  447. def _join_exited_workers(self):
  448. """Cleanup after any worker processes which have exited due to
  449. reaching their specified lifetime. Returns True if any workers were
  450. cleaned up.
  451. """
  452. for i in reversed(range(len(self._pool))):
  453. worker = self._pool[i]
  454. if worker.exitcode is not None:
  455. # worker exited
  456. debug('cleaning up worker %d' % i)
  457. if self._putlock is not None:
  458. try:
  459. self._putlock.release()
  460. except ValueError:
  461. pass
  462. worker.join()
  463. del self._pool[i]
  464. return len(self._pool) < self._processes
  465. def _repopulate_pool(self):
  466. """Bring the number of pool processes up to the specified number,
  467. for use after reaping workers which have exited.
  468. """
  469. debug('repopulating pool')
  470. for i in range(self._processes - len(self._pool)):
  471. self._create_worker_process()
  472. debug('added worker')
  473. def _maintain_pool(self):
  474. """"Clean up any exited workers and start replacements for them.
  475. """
  476. if self._join_exited_workers():
  477. self._repopulate_pool()
  478. def _setup_queues(self):
  479. from multiprocessing.queues import SimpleQueue
  480. self._inqueue = SimpleQueue()
  481. self._outqueue = SimpleQueue()
  482. self._ackqueue = SimpleQueue()
  483. self._quick_put = self._inqueue._writer.send
  484. self._quick_get = self._outqueue._reader.recv
  485. self._quick_get_ack = self._ackqueue._reader.recv
  486. def _poll_result(timeout):
  487. if self._outqueue._reader.poll(timeout):
  488. return True, self._quick_get()
  489. return False, None
  490. self._poll_result = _poll_result
  491. def apply(self, func, args=(), kwds={}):
  492. '''
  493. Equivalent of `apply()` builtin
  494. '''
  495. assert self._state == RUN
  496. return self.apply_async(func, args, kwds).get()
  497. def map(self, func, iterable, chunksize=None):
  498. '''
  499. Equivalent of `map()` builtin
  500. '''
  501. assert self._state == RUN
  502. return self.map_async(func, iterable, chunksize).get()
  503. def imap(self, func, iterable, chunksize=1):
  504. '''
  505. Equivalent of `itertools.imap()` -- can be MUCH slower
  506. than `Pool.map()`
  507. '''
  508. assert self._state == RUN
  509. if chunksize == 1:
  510. result = IMapIterator(self._cache)
  511. self._taskqueue.put((((result._job, i, func, (x,), {})
  512. for i, x in enumerate(iterable)), result._set_length))
  513. return result
  514. else:
  515. assert chunksize > 1
  516. task_batches = Pool._get_tasks(func, iterable, chunksize)
  517. result = IMapIterator(self._cache)
  518. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  519. for i, x in enumerate(task_batches)), result._set_length))
  520. return (item for chunk in result for item in chunk)
  521. def imap_unordered(self, func, iterable, chunksize=1):
  522. '''
  523. Like `imap()` method but ordering of results is arbitrary
  524. '''
  525. assert self._state == RUN
  526. if chunksize == 1:
  527. result = IMapUnorderedIterator(self._cache)
  528. self._taskqueue.put((((result._job, i, func, (x,), {})
  529. for i, x in enumerate(iterable)), result._set_length))
  530. return result
  531. else:
  532. assert chunksize > 1
  533. task_batches = Pool._get_tasks(func, iterable, chunksize)
  534. result = IMapUnorderedIterator(self._cache)
  535. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  536. for i, x in enumerate(task_batches)), result._set_length))
  537. return (item for chunk in result for item in chunk)
  538. def apply_async(self, func, args=(), kwds={},
  539. callback=None, accept_callback=None, timeout_callback=None,
  540. waitforslot=False, error_callback=None):
  541. '''
  542. Asynchronous equivalent of `apply()` builtin.
  543. Callback is called when the functions return value is ready.
  544. The accept callback is called when the job is accepted to be executed.
  545. Simplified the flow is like this:
  546. >>> if accept_callback:
  547. ... accept_callback()
  548. >>> retval = func(*args, **kwds)
  549. >>> if callback:
  550. ... callback(retval)
  551. '''
  552. assert self._state == RUN
  553. result = ApplyResult(self._cache, callback,
  554. accept_callback, timeout_callback,
  555. error_callback)
  556. if waitforslot:
  557. self._putlock.acquire()
  558. self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
  559. return result
  560. def map_async(self, func, iterable, chunksize=None, callback=None):
  561. '''
  562. Asynchronous equivalent of `map()` builtin
  563. '''
  564. assert self._state == RUN
  565. if not hasattr(iterable, '__len__'):
  566. iterable = list(iterable)
  567. if chunksize is None:
  568. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  569. if extra:
  570. chunksize += 1
  571. if len(iterable) == 0:
  572. chunksize = 0
  573. task_batches = Pool._get_tasks(func, iterable, chunksize)
  574. result = MapResult(self._cache, chunksize, len(iterable), callback)
  575. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  576. for i, x in enumerate(task_batches)), None))
  577. return result
  578. @staticmethod
  579. def _get_tasks(func, it, size):
  580. it = iter(it)
  581. while 1:
  582. x = tuple(itertools.islice(it, size))
  583. if not x:
  584. return
  585. yield (func, x)
  586. def __reduce__(self):
  587. raise NotImplementedError(
  588. 'pool objects cannot be passed between '
  589. 'processes or pickled')
  590. def close(self):
  591. debug('closing pool')
  592. if self._state == RUN:
  593. # Worker handler can't run while the result
  594. # handler does its second pass, so wait for it to finish.
  595. self._worker_handler.close()
  596. self._worker_handler.join()
  597. self._state = CLOSE
  598. self._taskqueue.put(None)
  599. def terminate(self):
  600. debug('terminating pool')
  601. self._state = TERMINATE
  602. self._worker_handler.terminate()
  603. self._terminate()
  604. def join(self):
  605. assert self._state in (CLOSE, TERMINATE)
  606. self._worker_handler.join()
  607. self._task_handler.join()
  608. self._result_handler.join()
  609. for p in self._pool:
  610. p.join()
  611. debug('after join()')
  612. @staticmethod
  613. def _help_stuff_finish(inqueue, task_handler, size):
  614. # task_handler may be blocked trying to put items on inqueue
  615. debug('removing tasks from inqueue until task handler finished')
  616. inqueue._rlock.acquire()
  617. while task_handler.is_alive() and inqueue._reader.poll():
  618. inqueue._reader.recv()
  619. time.sleep(0)
  620. @classmethod
  621. def _terminate_pool(cls, taskqueue, inqueue, outqueue, ackqueue, pool,
  622. ack_handler, worker_handler, task_handler,
  623. result_handler, cache, timeout_handler):
  624. # this is guaranteed to only be called once
  625. debug('finalizing pool')
  626. worker_handler.terminate()
  627. task_handler.terminate()
  628. taskqueue.put(None) # sentinel
  629. debug('helping task handler/workers to finish')
  630. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  631. assert result_handler.is_alive() or len(cache) == 0
  632. result_handler.terminate()
  633. outqueue.put(None) # sentinel
  634. ack_handler.terminate()
  635. ackqueue.put(None) # sentinel
  636. if timeout_handler is not None:
  637. timeout_handler.terminate()
  638. # Terminate workers which haven't already finished
  639. if pool and hasattr(pool[0], 'terminate'):
  640. debug('terminating workers')
  641. for p in pool:
  642. if p.exitcode is None:
  643. p.terminate()
  644. debug('joining task handler')
  645. task_handler.join(1e100)
  646. debug('joining result handler')
  647. result_handler.join(1e100)
  648. debug('joining ack handler')
  649. ack_handler.join(1e100)
  650. if timeout_handler is not None:
  651. debug('joining timeout handler')
  652. timeout_handler.join(1e100)
  653. if pool and hasattr(pool[0], 'terminate'):
  654. debug('joining pool workers')
  655. for p in pool:
  656. if p.is_alive():
  657. # worker has not yet exited
  658. debug('cleaning up worker %d' % p.pid)
  659. p.join()
  660. DynamicPool = Pool
  661. #
  662. # Class whose instances are returned by `Pool.apply_async()`
  663. #
  664. class ApplyResult(object):
  665. def __init__(self, cache, callback, accept_callback=None,
  666. timeout_callback=None, error_callback=None):
  667. self._cond = threading.Condition(threading.Lock())
  668. self._job = job_counter.next()
  669. self._cache = cache
  670. self._accepted = False
  671. self._accept_pid = None
  672. self._time_accepted = None
  673. self._ready = False
  674. self._callback = callback
  675. self._errback = error_callback
  676. self._accept_callback = accept_callback
  677. self._timeout_callback = timeout_callback
  678. cache[self._job] = self
  679. def ready(self):
  680. return self._ready
  681. def accepted(self):
  682. return self._accepted
  683. def successful(self):
  684. assert self._ready
  685. return self._success
  686. def wait(self, timeout=None):
  687. self._cond.acquire()
  688. try:
  689. if not self._ready:
  690. self._cond.wait(timeout)
  691. finally:
  692. self._cond.release()
  693. def get(self, timeout=None):
  694. self.wait(timeout)
  695. if not self._ready:
  696. raise TimeoutError
  697. if self._success:
  698. return self._value
  699. else:
  700. raise self._value
  701. def _set(self, i, obj):
  702. self._success, self._value = obj
  703. if self._callback and self._success:
  704. self._callback(self._value)
  705. if self._errback and not self._success:
  706. self._errback(self._value)
  707. self._cond.acquire()
  708. try:
  709. self._ready = True
  710. self._cond.notify()
  711. finally:
  712. self._cond.release()
  713. if self._accepted:
  714. del self._cache[self._job]
  715. def _ack(self, i, time_accepted, pid):
  716. self._accepted = True
  717. self._time_accepted = time_accepted
  718. self._accept_pid = pid
  719. if self._accept_callback:
  720. self._accept_callback()
  721. if self._ready:
  722. del self._cache[self._job]
  723. #
  724. # Class whose instances are returned by `Pool.map_async()`
  725. #
  726. class MapResult(ApplyResult):
  727. def __init__(self, cache, chunksize, length, callback):
  728. ApplyResult.__init__(self, cache, callback)
  729. self._success = True
  730. self._value = [None] * length
  731. self._chunksize = chunksize
  732. if chunksize <= 0:
  733. self._number_left = 0
  734. self._ready = True
  735. else:
  736. self._number_left = length//chunksize + bool(length % chunksize)
  737. def _set(self, i, success_result):
  738. success, result = success_result
  739. if success:
  740. self._value[i*self._chunksize:(i+1)*self._chunksize] = result
  741. self._number_left -= 1
  742. if self._number_left == 0:
  743. if self._callback:
  744. self._callback(self._value)
  745. del self._cache[self._job]
  746. self._cond.acquire()
  747. try:
  748. self._ready = True
  749. self._cond.notify()
  750. finally:
  751. self._cond.release()
  752. else:
  753. self._success = False
  754. self._value = result
  755. del self._cache[self._job]
  756. self._cond.acquire()
  757. try:
  758. self._ready = True
  759. self._cond.notify()
  760. finally:
  761. self._cond.release()
  762. #
  763. # Class whose instances are returned by `Pool.imap()`
  764. #
  765. class IMapIterator(object):
  766. def __init__(self, cache):
  767. self._cond = threading.Condition(threading.Lock())
  768. self._job = job_counter.next()
  769. self._cache = cache
  770. self._items = collections.deque()
  771. self._index = 0
  772. self._length = None
  773. self._unsorted = {}
  774. cache[self._job] = self
  775. def __iter__(self):
  776. return self
  777. def next(self, timeout=None):
  778. self._cond.acquire()
  779. try:
  780. try:
  781. item = self._items.popleft()
  782. except IndexError:
  783. if self._index == self._length:
  784. raise StopIteration
  785. self._cond.wait(timeout)
  786. try:
  787. item = self._items.popleft()
  788. except IndexError:
  789. if self._index == self._length:
  790. raise StopIteration
  791. raise TimeoutError
  792. finally:
  793. self._cond.release()
  794. success, value = item
  795. if success:
  796. return value
  797. raise value
  798. __next__ = next # XXX
  799. def _set(self, i, obj):
  800. self._cond.acquire()
  801. try:
  802. if self._index == i:
  803. self._items.append(obj)
  804. self._index += 1
  805. while self._index in self._unsorted:
  806. obj = self._unsorted.pop(self._index)
  807. self._items.append(obj)
  808. self._index += 1
  809. self._cond.notify()
  810. else:
  811. self._unsorted[i] = obj
  812. if self._index == self._length:
  813. del self._cache[self._job]
  814. finally:
  815. self._cond.release()
  816. def _set_length(self, length):
  817. self._cond.acquire()
  818. try:
  819. self._length = length
  820. if self._index == self._length:
  821. self._cond.notify()
  822. del self._cache[self._job]
  823. finally:
  824. self._cond.release()
  825. #
  826. # Class whose instances are returned by `Pool.imap_unordered()`
  827. #
  828. class IMapUnorderedIterator(IMapIterator):
  829. def _set(self, i, obj):
  830. self._cond.acquire()
  831. try:
  832. self._items.append(obj)
  833. self._index += 1
  834. self._cond.notify()
  835. if self._index == self._length:
  836. del self._cache[self._job]
  837. finally:
  838. self._cond.release()
  839. #
  840. #
  841. #
  842. class ThreadPool(Pool):
  843. from multiprocessing.dummy import Process as DummyProcess
  844. Process = DummyProcess
  845. def __init__(self, processes=None, initializer=None, initargs=()):
  846. Pool.__init__(self, processes, initializer, initargs)
  847. def _setup_queues(self):
  848. self._inqueue = Queue.Queue()
  849. self._outqueue = Queue.Queue()
  850. self._ackqueue = Queue.Queue()
  851. self._quick_put = self._inqueue.put
  852. self._quick_get = self._outqueue.get
  853. self._quick_get_ack = self._ackqueue.get
  854. def _poll_result(timeout):
  855. try:
  856. return True, self._quick_get(timeout=timeout)
  857. except Queue.Empty:
  858. return False, None
  859. self._poll_result = _poll_result
  860. @staticmethod
  861. def _help_stuff_finish(inqueue, task_handler, size):
  862. # put sentinels at head of inqueue to make workers finish
  863. inqueue.not_empty.acquire()
  864. try:
  865. inqueue.queue.clear()
  866. inqueue.queue.extend([None] * size)
  867. inqueue.not_empty.notify_all()
  868. finally:
  869. inqueue.not_empty.release()