pool.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
  7. #
  8. __all__ = ['Pool']
  9. #
  10. # Imports
  11. #
  12. import os
  13. import errno
  14. import threading
  15. import Queue
  16. import itertools
  17. import collections
  18. import time
  19. import signal
  20. from multiprocessing import Process, cpu_count, TimeoutError
  21. from multiprocessing.util import Finalize, debug
  22. from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
  23. from celery.exceptions import WorkerLostError
  24. #
  25. # Constants representing the state of a pool
  26. #
  27. RUN = 0
  28. CLOSE = 1
  29. TERMINATE = 2
  30. #
  31. # Constants representing the state of a job
  32. #
  33. ACK = 0
  34. READY = 1
  35. # Signal used for soft time limits.
  36. SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
  37. #
  38. # Miscellaneous
  39. #
  40. job_counter = itertools.count()
  41. def mapstar(args):
  42. return map(*args)
  43. #
  44. # Code run by worker processes
  45. #
  46. class MaybeEncodingError(Exception):
  47. """Wraps unpickleable object."""
  48. def __init__(self, exc, value):
  49. self.exc = str(exc)
  50. self.value = repr(value)
  51. Exception.__init__(self, self.exc, self.value)
  52. def __repr__(self):
  53. return "<MaybeEncodingError: %s>" % str(self)
  54. def __str__(self):
  55. return "Error sending result: '%s'. Reason: '%s'." % (
  56. self.value, self.exc)
  57. def soft_timeout_sighandler(signum, frame):
  58. raise SoftTimeLimitExceeded()
  59. def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
  60. assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
  61. pid = os.getpid()
  62. put = outqueue.put
  63. get = inqueue.get
  64. if hasattr(inqueue, '_reader'):
  65. def poll(timeout):
  66. if inqueue._reader.poll(timeout):
  67. return True, get()
  68. return False, None
  69. else:
  70. def poll(timeout):
  71. try:
  72. return True, get(timeout=timeout)
  73. except Queue.Empty:
  74. return False, None
  75. if hasattr(inqueue, '_writer'):
  76. inqueue._writer.close()
  77. outqueue._reader.close()
  78. if initializer is not None:
  79. initializer(*initargs)
  80. if SIG_SOFT_TIMEOUT is not None:
  81. signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
  82. completed = 0
  83. while maxtasks is None or (maxtasks and completed < maxtasks):
  84. try:
  85. ready, task = poll(1.0)
  86. if not ready:
  87. continue
  88. except (EOFError, IOError):
  89. debug('worker got EOFError or IOError -- exiting')
  90. break
  91. if task is None:
  92. debug('worker got sentinel -- exiting')
  93. break
  94. job, i, func, args, kwds = task
  95. put((ACK, (job, i, time.time(), pid)))
  96. try:
  97. result = (True, func(*args, **kwds))
  98. except Exception, e:
  99. result = (False, e)
  100. try:
  101. put((READY, (job, i, result)))
  102. except Exception, exc:
  103. wrapped = MaybeEncodingError(exc, result[1])
  104. put((READY, (job, i, (False, wrapped))))
  105. completed += 1
  106. debug('worker exiting after %d tasks' % completed)
  107. #
  108. # Class representing a process pool
  109. #
  110. class PoolThread(threading.Thread):
  111. def __init__(self, *args, **kwargs):
  112. threading.Thread.__init__(self)
  113. self._state = RUN
  114. self.daemon = True
  115. def terminate(self):
  116. self._state = TERMINATE
  117. def close(self):
  118. self._state = CLOSE
  119. class Supervisor(PoolThread):
  120. def __init__(self, pool):
  121. self.pool = pool
  122. super(Supervisor, self).__init__()
  123. def run(self):
  124. debug('worker handler starting')
  125. while self._state == RUN and self.pool._state == RUN:
  126. self.pool._maintain_pool()
  127. time.sleep(0.1)
  128. debug('worker handler exiting')
  129. class TaskHandler(PoolThread):
  130. def __init__(self, taskqueue, put, outqueue, pool):
  131. self.taskqueue = taskqueue
  132. self.put = put
  133. self.outqueue = outqueue
  134. self.pool = pool
  135. super(TaskHandler, self).__init__()
  136. def run(self):
  137. taskqueue = self.taskqueue
  138. outqueue = self.outqueue
  139. put = self.put
  140. pool = self.pool
  141. for taskseq, set_length in iter(taskqueue.get, None):
  142. i = -1
  143. for i, task in enumerate(taskseq):
  144. if self._state:
  145. debug('task handler found thread._state != RUN')
  146. break
  147. try:
  148. put(task)
  149. except IOError:
  150. debug('could not put task on queue')
  151. break
  152. else:
  153. if set_length:
  154. debug('doing set_length()')
  155. set_length(i + 1)
  156. continue
  157. break
  158. else:
  159. debug('task handler got sentinel')
  160. try:
  161. # tell result handler to finish when cache is empty
  162. debug('task handler sending sentinel to result handler')
  163. outqueue.put(None)
  164. # tell workers there is no more work
  165. debug('task handler sending sentinel to workers')
  166. for p in pool:
  167. put(None)
  168. except IOError:
  169. debug('task handler got IOError when sending sentinels')
  170. debug('task handler exiting')
  171. class TimeoutHandler(PoolThread):
  172. def __init__(self, processes, cache, t_soft, t_hard):
  173. self.processes = processes
  174. self.cache = cache
  175. self.t_soft = t_soft
  176. self.t_hard = t_hard
  177. super(TimeoutHandler, self).__init__()
  178. def run(self):
  179. processes = self.processes
  180. cache = self.cache
  181. t_hard, t_soft = self.t_hard, self.t_soft
  182. dirty = set()
  183. def _process_by_pid(pid):
  184. for index, process in enumerate(processes):
  185. if process.pid == pid:
  186. return process, index
  187. return None, None
  188. def _timed_out(start, timeout):
  189. if not start or not timeout:
  190. return False
  191. if time.time() >= start + timeout:
  192. return True
  193. def _on_soft_timeout(job, i):
  194. debug('soft time limit exceeded for %i' % i)
  195. process, _index = _process_by_pid(job._worker_pid)
  196. if not process:
  197. return
  198. # Run timeout callback
  199. if job._timeout_callback is not None:
  200. job._timeout_callback(soft=True)
  201. try:
  202. os.kill(job._worker_pid, SIG_SOFT_TIMEOUT)
  203. except OSError, exc:
  204. if exc.errno == errno.ESRCH:
  205. pass
  206. else:
  207. raise
  208. dirty.add(i)
  209. def _on_hard_timeout(job, i):
  210. debug('hard time limit exceeded for %i', i)
  211. # Remove from _pool
  212. process, _index = _process_by_pid(job._worker_pid)
  213. # Remove from cache and set return value to an exception
  214. job._set(i, (False, TimeLimitExceeded()))
  215. # Run timeout callback
  216. if job._timeout_callback is not None:
  217. job._timeout_callback(soft=False)
  218. if not process:
  219. return
  220. # Terminate the process
  221. process.terminate()
  222. # Inner-loop
  223. while self._state == RUN:
  224. # Remove dirty items not in cache anymore
  225. if dirty:
  226. dirty = set(k for k in dirty if k in cache)
  227. for i, job in cache.items():
  228. ack_time = job._time_accepted
  229. if _timed_out(ack_time, t_hard):
  230. _on_hard_timeout(job, i)
  231. elif i not in dirty and _timed_out(ack_time, t_soft):
  232. _on_soft_timeout(job, i)
  233. time.sleep(0.5) # Don't waste CPU cycles.
  234. debug('timeout handler exiting')
  235. class ResultHandler(PoolThread):
  236. def __init__(self, outqueue, get, cache, poll,
  237. join_exited_workers, putlock):
  238. self.outqueue = outqueue
  239. self.get = get
  240. self.cache = cache
  241. self.poll = poll
  242. self.join_exited_workers = join_exited_workers
  243. self.putlock = putlock
  244. super(ResultHandler, self).__init__()
  245. def run(self):
  246. get = self.get
  247. outqueue = self.outqueue
  248. cache = self.cache
  249. poll = self.poll
  250. join_exited_workers = self.join_exited_workers
  251. putlock = self.putlock
  252. def on_ack(job, i, time_accepted, pid):
  253. try:
  254. cache[job]._ack(i, time_accepted, pid)
  255. except (KeyError, AttributeError):
  256. # Object gone or doesn't support _ack (e.g. IMAPIterator).
  257. pass
  258. def on_ready(job, i, obj):
  259. if putlock is not None:
  260. try:
  261. putlock.release()
  262. except ValueError:
  263. pass
  264. try:
  265. cache[job]._set(i, obj)
  266. except KeyError:
  267. pass
  268. state_handlers = {ACK: on_ack, READY: on_ready}
  269. def on_state_change(task):
  270. state, args = task
  271. try:
  272. state_handlers[state](*args)
  273. except KeyError:
  274. debug("Unknown job state: %s (args=%s)" % (state, args))
  275. debug('result handler starting')
  276. while 1:
  277. try:
  278. ready, task = poll(0.2)
  279. except (IOError, EOFError), exc:
  280. debug('result handler got %r -- exiting' % (exc, ))
  281. return
  282. if self._state:
  283. assert self._state == TERMINATE
  284. debug('result handler found thread._state=TERMINATE')
  285. break
  286. if ready:
  287. if task is None:
  288. debug('result handler got sentinel')
  289. break
  290. on_state_change(task)
  291. # Notify waiting threads
  292. if putlock is not None:
  293. try:
  294. putlock.release()
  295. except ValueError:
  296. pass
  297. while cache and self._state != TERMINATE:
  298. try:
  299. ready, task = poll(0.2)
  300. except (IOError, EOFError), exc:
  301. debug('result handler got %r -- exiting' % (exc, ))
  302. return
  303. if ready:
  304. if task is None:
  305. debug('result handler ignoring extra sentinel')
  306. continue
  307. on_state_change(task)
  308. join_exited_workers()
  309. if hasattr(outqueue, '_reader'):
  310. debug('ensuring that outqueue is not full')
  311. # If we don't make room available in outqueue then
  312. # attempts to add the sentinel (None) to outqueue may
  313. # block. There is guaranteed to be no more than 2 sentinels.
  314. try:
  315. for i in range(10):
  316. if not outqueue._reader.poll():
  317. break
  318. get()
  319. except (IOError, EOFError):
  320. pass
  321. debug('result handler exiting: len(cache)=%s, thread._state=%s',
  322. len(cache), self._state)
  323. class Pool(object):
  324. '''
  325. Class which supports an async version of the `apply()` builtin
  326. '''
  327. Process = Process
  328. Supervisor = Supervisor
  329. TaskHandler = TaskHandler
  330. TimeoutHandler = TimeoutHandler
  331. ResultHandler = ResultHandler
  332. SoftTimeLimitExceeded = SoftTimeLimitExceeded
  333. def __init__(self, processes=None, initializer=None, initargs=(),
  334. maxtasksperchild=None, timeout=None, soft_timeout=None):
  335. self._setup_queues()
  336. self._taskqueue = Queue.Queue()
  337. self._cache = {}
  338. self._state = RUN
  339. self.timeout = timeout
  340. self.soft_timeout = soft_timeout
  341. self._maxtasksperchild = maxtasksperchild
  342. self._initializer = initializer
  343. self._initargs = initargs
  344. if self.soft_timeout and SIG_SOFT_TIMEOUT is None:
  345. raise NotImplementedError("Soft timeouts not supported: "
  346. "Your platform does not have the SIGUSR1 signal.")
  347. if processes is None:
  348. try:
  349. processes = cpu_count()
  350. except NotImplementedError:
  351. processes = 1
  352. self._processes = processes
  353. if initializer is not None and not hasattr(initializer, '__call__'):
  354. raise TypeError('initializer must be a callable')
  355. self._pool = []
  356. for i in range(processes):
  357. self._create_worker_process()
  358. self._worker_handler = self.Supervisor(self)
  359. self._worker_handler.start()
  360. self._putlock = threading.BoundedSemaphore(self._processes)
  361. self._task_handler = self.TaskHandler(self._taskqueue,
  362. self._quick_put,
  363. self._outqueue,
  364. self._pool)
  365. self._task_handler.start()
  366. # Thread killing timedout jobs.
  367. if self.timeout or self.soft_timeout:
  368. self._timeout_handler = self.TimeoutHandler(
  369. self._pool, self._cache,
  370. self.soft_timeout, self.timeout)
  371. self._timeout_handler.start()
  372. else:
  373. self._timeout_handler = None
  374. # Thread processing results in the outqueue.
  375. self._result_handler = self.ResultHandler(self._outqueue,
  376. self._quick_get, self._cache,
  377. self._poll_result,
  378. self._join_exited_workers,
  379. self._putlock)
  380. self._result_handler.start()
  381. self._terminate = Finalize(
  382. self, self._terminate_pool,
  383. args=(self._taskqueue, self._inqueue, self._outqueue,
  384. self._pool, self._worker_handler, self._task_handler,
  385. self._result_handler, self._cache,
  386. self._timeout_handler),
  387. exitpriority=15,
  388. )
  389. def _create_worker_process(self):
  390. w = self.Process(
  391. target=worker,
  392. args=(self._inqueue, self._outqueue,
  393. self._initializer, self._initargs,
  394. self._maxtasksperchild),
  395. )
  396. self._pool.append(w)
  397. w.name = w.name.replace('Process', 'PoolWorker')
  398. w.daemon = True
  399. w.start()
  400. return w
  401. def _join_exited_workers(self):
  402. """Cleanup after any worker processes which have exited due to
  403. reaching their specified lifetime. Returns True if any workers were
  404. cleaned up.
  405. """
  406. cleaned = []
  407. for i in reversed(range(len(self._pool))):
  408. worker = self._pool[i]
  409. if worker.exitcode is not None:
  410. # worker exited
  411. debug('cleaning up worker %d' % i)
  412. if self._putlock is not None:
  413. try:
  414. self._putlock.release()
  415. except ValueError:
  416. pass
  417. worker.join()
  418. cleaned.append(worker.pid)
  419. del self._pool[i]
  420. if cleaned:
  421. for job in self._cache.values():
  422. for worker_pid in job.worker_pids():
  423. if worker_pid in cleaned:
  424. err = WorkerLostError("Worker exited prematurely.")
  425. job._set(None, (False, err))
  426. continue
  427. return True
  428. return False
  429. def _repopulate_pool(self):
  430. """Bring the number of pool processes up to the specified number,
  431. for use after reaping workers which have exited.
  432. """
  433. debug('repopulating pool')
  434. for i in range(self._processes - len(self._pool)):
  435. if self._state != RUN:
  436. return
  437. self._create_worker_process()
  438. debug('added worker')
  439. def _maintain_pool(self):
  440. """"Clean up any exited workers and start replacements for them.
  441. """
  442. if self._join_exited_workers():
  443. self._repopulate_pool()
  444. def _setup_queues(self):
  445. from multiprocessing.queues import SimpleQueue
  446. self._inqueue = SimpleQueue()
  447. self._outqueue = SimpleQueue()
  448. self._quick_put = self._inqueue._writer.send
  449. self._quick_get = self._outqueue._reader.recv
  450. def _poll_result(timeout):
  451. if self._outqueue._reader.poll(timeout):
  452. return True, self._quick_get()
  453. return False, None
  454. self._poll_result = _poll_result
  455. def apply(self, func, args=(), kwds={}):
  456. '''
  457. Equivalent of `apply()` builtin
  458. '''
  459. assert self._state == RUN
  460. return self.apply_async(func, args, kwds).get()
  461. def map(self, func, iterable, chunksize=None):
  462. '''
  463. Equivalent of `map()` builtin
  464. '''
  465. assert self._state == RUN
  466. return self.map_async(func, iterable, chunksize).get()
  467. def imap(self, func, iterable, chunksize=1):
  468. '''
  469. Equivalent of `itertools.imap()` -- can be MUCH slower
  470. than `Pool.map()`
  471. '''
  472. assert self._state == RUN
  473. if chunksize == 1:
  474. result = IMapIterator(self._cache)
  475. self._taskqueue.put((((result._job, i, func, (x,), {})
  476. for i, x in enumerate(iterable)), result._set_length))
  477. return result
  478. else:
  479. assert chunksize > 1
  480. task_batches = Pool._get_tasks(func, iterable, chunksize)
  481. result = IMapIterator(self._cache)
  482. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  483. for i, x in enumerate(task_batches)), result._set_length))
  484. return (item for chunk in result for item in chunk)
  485. def imap_unordered(self, func, iterable, chunksize=1):
  486. '''
  487. Like `imap()` method but ordering of results is arbitrary
  488. '''
  489. assert self._state == RUN
  490. if chunksize == 1:
  491. result = IMapUnorderedIterator(self._cache)
  492. self._taskqueue.put((((result._job, i, func, (x,), {})
  493. for i, x in enumerate(iterable)), result._set_length))
  494. return result
  495. else:
  496. assert chunksize > 1
  497. task_batches = Pool._get_tasks(func, iterable, chunksize)
  498. result = IMapUnorderedIterator(self._cache)
  499. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  500. for i, x in enumerate(task_batches)), result._set_length))
  501. return (item for chunk in result for item in chunk)
  502. def apply_async(self, func, args=(), kwds={},
  503. callback=None, accept_callback=None, timeout_callback=None,
  504. waitforslot=False, error_callback=None):
  505. '''
  506. Asynchronous equivalent of `apply()` builtin.
  507. Callback is called when the functions return value is ready.
  508. The accept callback is called when the job is accepted to be executed.
  509. Simplified the flow is like this:
  510. >>> if accept_callback:
  511. ... accept_callback()
  512. >>> retval = func(*args, **kwds)
  513. >>> if callback:
  514. ... callback(retval)
  515. '''
  516. assert self._state == RUN
  517. result = ApplyResult(self._cache, callback,
  518. accept_callback, timeout_callback,
  519. error_callback)
  520. if waitforslot:
  521. self._putlock.acquire()
  522. self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
  523. return result
  524. def map_async(self, func, iterable, chunksize=None, callback=None):
  525. '''
  526. Asynchronous equivalent of `map()` builtin
  527. '''
  528. assert self._state == RUN
  529. if not hasattr(iterable, '__len__'):
  530. iterable = list(iterable)
  531. if chunksize is None:
  532. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  533. if extra:
  534. chunksize += 1
  535. if len(iterable) == 0:
  536. chunksize = 0
  537. task_batches = Pool._get_tasks(func, iterable, chunksize)
  538. result = MapResult(self._cache, chunksize, len(iterable), callback)
  539. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  540. for i, x in enumerate(task_batches)), None))
  541. return result
  542. @staticmethod
  543. def _get_tasks(func, it, size):
  544. it = iter(it)
  545. while 1:
  546. x = tuple(itertools.islice(it, size))
  547. if not x:
  548. return
  549. yield (func, x)
  550. def __reduce__(self):
  551. raise NotImplementedError(
  552. 'pool objects cannot be passed between '
  553. 'processes or pickled')
  554. def close(self):
  555. debug('closing pool')
  556. if self._state == RUN:
  557. self._state = CLOSE
  558. self._worker_handler.close()
  559. self._worker_handler.join()
  560. self._taskqueue.put(None)
  561. def terminate(self):
  562. debug('terminating pool')
  563. self._state = TERMINATE
  564. self._worker_handler.terminate()
  565. self._terminate()
  566. def join(self):
  567. assert self._state in (CLOSE, TERMINATE)
  568. debug('joining worker handler')
  569. self._worker_handler.join()
  570. debug('joining task handler')
  571. self._task_handler.join()
  572. debug('joining result handler')
  573. self._result_handler.join()
  574. for i, p in enumerate(self._pool):
  575. debug('joining worker %s/%s (%r)' % (i, len(self._pool), p, ))
  576. p.join()
  577. @staticmethod
  578. def _help_stuff_finish(inqueue, task_handler, size):
  579. # task_handler may be blocked trying to put items on inqueue
  580. debug('removing tasks from inqueue until task handler finished')
  581. inqueue._rlock.acquire()
  582. while task_handler.is_alive() and inqueue._reader.poll():
  583. inqueue._reader.recv()
  584. time.sleep(0)
  585. @classmethod
  586. def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
  587. worker_handler, task_handler,
  588. result_handler, cache, timeout_handler):
  589. # this is guaranteed to only be called once
  590. debug('finalizing pool')
  591. worker_handler.terminate()
  592. task_handler.terminate()
  593. taskqueue.put(None) # sentinel
  594. debug('helping task handler/workers to finish')
  595. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  596. assert result_handler.is_alive() or len(cache) == 0
  597. result_handler.terminate()
  598. outqueue.put(None) # sentinel
  599. if timeout_handler is not None:
  600. timeout_handler.terminate()
  601. # Terminate workers which haven't already finished
  602. if pool and hasattr(pool[0], 'terminate'):
  603. debug('terminating workers')
  604. for p in pool:
  605. if p.exitcode is None:
  606. p.terminate()
  607. debug('joining task handler')
  608. task_handler.join(1e100)
  609. debug('joining result handler')
  610. result_handler.join(1e100)
  611. if timeout_handler is not None:
  612. debug('joining timeout handler')
  613. timeout_handler.join(1e100)
  614. if pool and hasattr(pool[0], 'terminate'):
  615. debug('joining pool workers')
  616. for p in pool:
  617. if p.is_alive():
  618. # worker has not yet exited
  619. debug('cleaning up worker %d' % p.pid)
  620. p.join()
  621. DynamicPool = Pool
  622. #
  623. # Class whose instances are returned by `Pool.apply_async()`
  624. #
  625. class ApplyResult(object):
  626. def __init__(self, cache, callback, accept_callback=None,
  627. timeout_callback=None, error_callback=None):
  628. self._cond = threading.Condition(threading.Lock())
  629. self._job = job_counter.next()
  630. self._cache = cache
  631. self._ready = False
  632. self._callback = callback
  633. self._accept_callback = accept_callback
  634. self._errback = error_callback
  635. self._timeout_callback = timeout_callback
  636. self._accepted = False
  637. self._worker_pid = None
  638. self._time_accepted = None
  639. cache[self._job] = self
  640. def ready(self):
  641. return self._ready
  642. def accepted(self):
  643. return self._accepted
  644. def successful(self):
  645. assert self._ready
  646. return self._success
  647. def worker_pids(self):
  648. return filter(None, [self._worker_pid])
  649. def wait(self, timeout=None):
  650. self._cond.acquire()
  651. try:
  652. if not self._ready:
  653. self._cond.wait(timeout)
  654. finally:
  655. self._cond.release()
  656. def get(self, timeout=None):
  657. self.wait(timeout)
  658. if not self._ready:
  659. raise TimeoutError
  660. if self._success:
  661. return self._value
  662. else:
  663. raise self._value
  664. def _set(self, i, obj):
  665. self._success, self._value = obj
  666. if self._callback and self._success:
  667. self._callback(self._value)
  668. if self._errback and not self._success:
  669. self._errback(self._value)
  670. self._cond.acquire()
  671. try:
  672. self._ready = True
  673. self._cond.notify()
  674. finally:
  675. self._cond.release()
  676. if self._accepted:
  677. self._cache.pop(self._job, None)
  678. def _ack(self, i, time_accepted, pid):
  679. self._accepted = True
  680. self._time_accepted = time_accepted
  681. self._worker_pid = pid
  682. if self._accept_callback:
  683. self._accept_callback()
  684. if self._ready:
  685. self._cache.pop(self._job, None)
  686. #
  687. # Class whose instances are returned by `Pool.map_async()`
  688. #
  689. class MapResult(ApplyResult):
  690. def __init__(self, cache, chunksize, length, callback):
  691. ApplyResult.__init__(self, cache, callback)
  692. self._success = True
  693. self._length = length
  694. self._value = [None] * length
  695. self._accepted = [False] * length
  696. self._worker_pid = [None] * length
  697. self._time_accepted = [None] * length
  698. self._chunksize = chunksize
  699. if chunksize <= 0:
  700. self._number_left = 0
  701. self._ready = True
  702. else:
  703. self._number_left = length // chunksize + bool(length % chunksize)
  704. def _set(self, i, success_result):
  705. success, result = success_result
  706. if success:
  707. self._value[i * self._chunksize:(i + 1) * self._chunksize] = result
  708. self._number_left -= 1
  709. if self._number_left == 0:
  710. if self._callback:
  711. self._callback(self._value)
  712. if self._accepted:
  713. self._cache.pop(self._job, None)
  714. self._cond.acquire()
  715. try:
  716. self._ready = True
  717. self._cond.notify()
  718. finally:
  719. self._cond.release()
  720. else:
  721. self._success = False
  722. self._value = result
  723. if self._accepted:
  724. self._cache.pop(self._job, None)
  725. self._cond.acquire()
  726. try:
  727. self._ready = True
  728. self._cond.notify()
  729. finally:
  730. self._cond.release()
  731. def _ack(self, i, time_accepted, pid):
  732. start = i * self._chunksize
  733. stop = (i + 1) * self._chunksize
  734. for j in range(start, stop):
  735. self._accepted[j] = True
  736. self._worker_pid[j] = pid
  737. self._time_accepted[j] = time_accepted
  738. if self._ready:
  739. self._cache.pop(self._job, None)
  740. def accepted(self):
  741. return all(self._accepted)
  742. def worker_pids(self):
  743. return filter(None, self._worker_pid)
  744. #
  745. # Class whose instances are returned by `Pool.imap()`
  746. #
  747. class IMapIterator(object):
  748. def __init__(self, cache):
  749. self._cond = threading.Condition(threading.Lock())
  750. self._job = job_counter.next()
  751. self._cache = cache
  752. self._items = collections.deque()
  753. self._index = 0
  754. self._length = None
  755. self._unsorted = {}
  756. cache[self._job] = self
  757. def __iter__(self):
  758. return self
  759. def next(self, timeout=None):
  760. self._cond.acquire()
  761. try:
  762. try:
  763. item = self._items.popleft()
  764. except IndexError:
  765. if self._index == self._length:
  766. raise StopIteration
  767. self._cond.wait(timeout)
  768. try:
  769. item = self._items.popleft()
  770. except IndexError:
  771. if self._index == self._length:
  772. raise StopIteration
  773. raise TimeoutError
  774. finally:
  775. self._cond.release()
  776. success, value = item
  777. if success:
  778. return value
  779. raise value
  780. __next__ = next # XXX
  781. def _set(self, i, obj):
  782. self._cond.acquire()
  783. try:
  784. if self._index == i:
  785. self._items.append(obj)
  786. self._index += 1
  787. while self._index in self._unsorted:
  788. obj = self._unsorted.pop(self._index)
  789. self._items.append(obj)
  790. self._index += 1
  791. self._cond.notify()
  792. else:
  793. self._unsorted[i] = obj
  794. if self._index == self._length:
  795. del self._cache[self._job]
  796. finally:
  797. self._cond.release()
  798. def _set_length(self, length):
  799. self._cond.acquire()
  800. try:
  801. self._length = length
  802. if self._index == self._length:
  803. self._cond.notify()
  804. del self._cache[self._job]
  805. finally:
  806. self._cond.release()
  807. #
  808. # Class whose instances are returned by `Pool.imap_unordered()`
  809. #
  810. class IMapUnorderedIterator(IMapIterator):
  811. def _set(self, i, obj):
  812. self._cond.acquire()
  813. try:
  814. self._items.append(obj)
  815. self._index += 1
  816. self._cond.notify()
  817. if self._index == self._length:
  818. del self._cache[self._job]
  819. finally:
  820. self._cond.release()
  821. #
  822. #
  823. #
  824. class ThreadPool(Pool):
  825. from multiprocessing.dummy import Process as DummyProcess
  826. Process = DummyProcess
  827. def __init__(self, processes=None, initializer=None, initargs=()):
  828. Pool.__init__(self, processes, initializer, initargs)
  829. def _setup_queues(self):
  830. self._inqueue = Queue.Queue()
  831. self._outqueue = Queue.Queue()
  832. self._quick_put = self._inqueue.put
  833. self._quick_get = self._outqueue.get
  834. def _poll_result(timeout):
  835. try:
  836. return True, self._quick_get(timeout=timeout)
  837. except Queue.Empty:
  838. return False, None
  839. self._poll_result = _poll_result
  840. @staticmethod
  841. def _help_stuff_finish(inqueue, task_handler, size):
  842. # put sentinels at head of inqueue to make workers finish
  843. inqueue.not_empty.acquire()
  844. try:
  845. inqueue.queue.clear()
  846. inqueue.queue.extend([None] * size)
  847. inqueue.not_empty.notify_all()
  848. finally:
  849. inqueue.not_empty.release()