pool.py 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028
  1. #
  2. # Module providing the `Pool` class for managing a process pool
  3. #
  4. # multiprocessing/pool.py
  5. #
  6. # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
  7. #
  8. __all__ = ['Pool']
  9. #
  10. # Imports
  11. #
  12. import os
  13. import errno
  14. import threading
  15. import Queue
  16. import itertools
  17. import collections
  18. import time
  19. import signal
  20. from multiprocessing import Process, cpu_count, TimeoutError
  21. from multiprocessing.util import Finalize, debug
  22. from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
  23. #
  24. # Constants representing the state of a pool
  25. #
  26. RUN = 0
  27. CLOSE = 1
  28. TERMINATE = 2
  29. #
  30. # Constants representing the state of a job
  31. #
  32. ACK = 0
  33. READY = 1
  34. # Signal used for soft time limits.
  35. SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
  36. #
  37. # Exceptions
  38. #
  39. class WorkerLostError(Exception):
  40. """The worker processing a job has exited prematurely."""
  41. pass
  42. #
  43. # Miscellaneous
  44. #
  45. job_counter = itertools.count()
  46. def mapstar(args):
  47. return map(*args)
  48. #
  49. # Code run by worker processes
  50. #
  51. class MaybeEncodingError(Exception):
  52. """Wraps unpickleable object."""
  53. def __init__(self, exc, value):
  54. self.exc = str(exc)
  55. self.value = repr(value)
  56. Exception.__init__(self, self.exc, self.value)
  57. def __repr__(self):
  58. return "<MaybeEncodingError: %s>" % str(self)
  59. def __str__(self):
  60. return "Error sending result: '%s'. Reason: '%s'." % (
  61. self.value, self.exc)
  62. def soft_timeout_sighandler(signum, frame):
  63. raise SoftTimeLimitExceeded()
  64. def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
  65. assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
  66. pid = os.getpid()
  67. put = outqueue.put
  68. get = inqueue.get
  69. if hasattr(inqueue, '_writer'):
  70. inqueue._writer.close()
  71. outqueue._reader.close()
  72. if initializer is not None:
  73. initializer(*initargs)
  74. if SIG_SOFT_TIMEOUT is not None:
  75. signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler)
  76. completed = 0
  77. while maxtasks is None or (maxtasks and completed < maxtasks):
  78. try:
  79. task = get()
  80. except (EOFError, IOError):
  81. debug('worker got EOFError or IOError -- exiting')
  82. break
  83. if task is None:
  84. debug('worker got sentinel -- exiting')
  85. break
  86. job, i, func, args, kwds = task
  87. put((ACK, (job, i, time.time(), pid)))
  88. try:
  89. result = (True, func(*args, **kwds))
  90. except Exception, e:
  91. result = (False, e)
  92. try:
  93. put((READY, (job, i, result)))
  94. except Exception, exc:
  95. wrapped = MaybeEncodingError(exc, result[1])
  96. put((READY, (job, i, (False, wrapped))))
  97. completed += 1
  98. debug('worker exiting after %d tasks' % completed)
  99. #
  100. # Class representing a process pool
  101. #
  102. class PoolThread(threading.Thread):
  103. def __init__(self, *args, **kwargs):
  104. threading.Thread.__init__(self)
  105. self._state = RUN
  106. self.daemon = True
  107. def terminate(self):
  108. self._state = TERMINATE
  109. def close(self):
  110. self._state = CLOSE
  111. class Supervisor(PoolThread):
  112. def __init__(self, pool):
  113. self.pool = pool
  114. super(Supervisor, self).__init__()
  115. def run(self):
  116. debug('worker handler starting')
  117. while self._state == RUN and self.pool._state == RUN:
  118. self.pool._maintain_pool()
  119. time.sleep(0.1)
  120. debug('worker handler exiting')
  121. class TaskHandler(PoolThread):
  122. def __init__(self, taskqueue, put, outqueue, pool):
  123. self.taskqueue = taskqueue
  124. self.put = put
  125. self.outqueue = outqueue
  126. self.pool = pool
  127. super(TaskHandler, self).__init__()
  128. def run(self):
  129. taskqueue = self.taskqueue
  130. outqueue = self.outqueue
  131. put = self.put
  132. pool = self.pool
  133. for taskseq, set_length in iter(taskqueue.get, None):
  134. i = -1
  135. for i, task in enumerate(taskseq):
  136. if self._state:
  137. debug('task handler found thread._state != RUN')
  138. break
  139. try:
  140. put(task)
  141. except IOError:
  142. debug('could not put task on queue')
  143. break
  144. else:
  145. if set_length:
  146. debug('doing set_length()')
  147. set_length(i + 1)
  148. continue
  149. break
  150. else:
  151. debug('task handler got sentinel')
  152. try:
  153. # tell result handler to finish when cache is empty
  154. debug('task handler sending sentinel to result handler')
  155. outqueue.put(None)
  156. # tell workers there is no more work
  157. debug('task handler sending sentinel to workers')
  158. for p in pool:
  159. put(None)
  160. except IOError:
  161. debug('task handler got IOError when sending sentinels')
  162. debug('task handler exiting')
  163. class TimeoutHandler(PoolThread):
  164. def __init__(self, processes, cache, t_soft, t_hard):
  165. self.processes = processes
  166. self.cache = cache
  167. self.t_soft = t_soft
  168. self.t_hard = t_hard
  169. super(TimeoutHandler, self).__init__()
  170. def run(self):
  171. processes = self.processes
  172. cache = self.cache
  173. t_hard, t_soft = self.t_hard, self.t_soft
  174. dirty = set()
  175. def _process_by_pid(pid):
  176. for index, process in enumerate(processes):
  177. if process.pid == pid:
  178. return process, index
  179. return None, None
  180. def _timed_out(start, timeout):
  181. if not start or not timeout:
  182. return False
  183. if time.time() >= start + timeout:
  184. return True
  185. def _on_soft_timeout(job, i):
  186. debug('soft time limit exceeded for %i' % i)
  187. process, _index = _process_by_pid(job._worker_pid)
  188. if not process:
  189. return
  190. # Run timeout callback
  191. if job._timeout_callback is not None:
  192. job._timeout_callback(soft=True)
  193. try:
  194. os.kill(job._worker_pid, SIG_SOFT_TIMEOUT)
  195. except OSError, exc:
  196. if exc.errno == errno.ESRCH:
  197. pass
  198. else:
  199. raise
  200. dirty.add(i)
  201. def _on_hard_timeout(job, i):
  202. debug('hard time limit exceeded for %i', i)
  203. # Remove from _pool
  204. process, _index = _process_by_pid(job._worker_pid)
  205. # Remove from cache and set return value to an exception
  206. job._set(i, (False, TimeLimitExceeded()))
  207. # Run timeout callback
  208. if job._timeout_callback is not None:
  209. job._timeout_callback(soft=False)
  210. if not process:
  211. return
  212. # Terminate the process
  213. process.terminate()
  214. # Inner-loop
  215. while self._state == RUN:
  216. # Remove dirty items not in cache anymore
  217. if dirty:
  218. dirty = set(k for k in dirty if k in cache)
  219. for i, job in cache.items():
  220. ack_time = job._time_accepted
  221. if _timed_out(ack_time, t_hard):
  222. _on_hard_timeout(job, i)
  223. elif i not in dirty and _timed_out(ack_time, t_soft):
  224. _on_soft_timeout(job, i)
  225. time.sleep(0.5) # Don't waste CPU cycles.
  226. debug('timeout handler exiting')
  227. class ResultHandler(PoolThread):
  228. def __init__(self, outqueue, get, cache, poll,
  229. join_exited_workers, putlock):
  230. self.outqueue = outqueue
  231. self.get = get
  232. self.cache = cache
  233. self.poll = poll
  234. self.join_exited_workers = join_exited_workers
  235. self.putlock = putlock
  236. super(ResultHandler, self).__init__()
  237. def run(self):
  238. get = self.get
  239. outqueue = self.outqueue
  240. cache = self.cache
  241. poll = self.poll
  242. join_exited_workers = self.join_exited_workers
  243. putlock = self.putlock
  244. def on_ack(job, i, time_accepted, pid):
  245. try:
  246. cache[job]._ack(i, time_accepted, pid)
  247. except (KeyError, AttributeError):
  248. # Object gone or doesn't support _ack (e.g. IMAPIterator).
  249. pass
  250. def on_ready(job, i, obj):
  251. try:
  252. cache[job]._set(i, obj)
  253. except KeyError:
  254. pass
  255. state_handlers = {ACK: on_ack, READY: on_ready}
  256. def on_state_change(task):
  257. state, args = task
  258. try:
  259. state_handlers[state](*args)
  260. except KeyError:
  261. debug("Unknown job state: %s (args=%s)" % (state, args))
  262. debug('result handler starting')
  263. while 1:
  264. try:
  265. ready, task = poll(0.2)
  266. except (IOError, EOFError), exc:
  267. debug('result handler got %r -- exiting' % (exc, ))
  268. return
  269. if self._state:
  270. assert self._state == TERMINATE
  271. debug('result handler found thread._state=TERMINATE')
  272. break
  273. if ready:
  274. if task is None:
  275. debug('result handler got sentinel')
  276. break
  277. if putlock is not None:
  278. try:
  279. putlock.release()
  280. except ValueError:
  281. pass
  282. on_state_change(task)
  283. if putlock is not None:
  284. try:
  285. putlock.release()
  286. except ValueError:
  287. pass
  288. while cache and self._state != TERMINATE:
  289. try:
  290. ready, task = poll(0.2)
  291. except (IOError, EOFError), exc:
  292. debug('result handler got %r -- exiting' % (exc, ))
  293. return
  294. if ready:
  295. if task is None:
  296. debug('result handler ignoring extra sentinel')
  297. continue
  298. on_state_change(task)
  299. join_exited_workers()
  300. job, i, obj = task
  301. try:
  302. cache[job]._set(i, obj)
  303. except KeyError:
  304. pass
  305. if hasattr(outqueue, '_reader'):
  306. debug('ensuring that outqueue is not full')
  307. # If we don't make room available in outqueue then
  308. # attempts to add the sentinel (None) to outqueue may
  309. # block. There is guaranteed to be no more than 2 sentinels.
  310. try:
  311. for i in range(10):
  312. if not outqueue._reader.poll():
  313. break
  314. get()
  315. except (IOError, EOFError):
  316. pass
  317. debug('result handler exiting: len(cache)=%s, thread._state=%s',
  318. len(cache), self._state)
  319. class Pool(object):
  320. '''
  321. Class which supports an async version of the `apply()` builtin
  322. '''
  323. Process = Process
  324. Supervisor = Supervisor
  325. TaskHandler = TaskHandler
  326. TimeoutHandler = TimeoutHandler
  327. ResultHandler = ResultHandler
  328. SoftTimeLimitExceeded = SoftTimeLimitExceeded
  329. def __init__(self, processes=None, initializer=None, initargs=(),
  330. maxtasksperchild=None, timeout=None, soft_timeout=None):
  331. self._setup_queues()
  332. self._taskqueue = Queue.Queue()
  333. self._cache = {}
  334. self._state = RUN
  335. self.timeout = timeout
  336. self.soft_timeout = soft_timeout
  337. self._maxtasksperchild = maxtasksperchild
  338. self._initializer = initializer
  339. self._initargs = initargs
  340. if self.soft_timeout and SIG_SOFT_TIMEOUT is None:
  341. raise NotImplementedError("Soft timeouts not supported: "
  342. "Your platform does not have the SIGUSR1 signal.")
  343. if processes is None:
  344. try:
  345. processes = cpu_count()
  346. except NotImplementedError:
  347. processes = 1
  348. self._processes = processes
  349. if initializer is not None and not hasattr(initializer, '__call__'):
  350. raise TypeError('initializer must be a callable')
  351. self._pool = []
  352. for i in range(processes):
  353. self._create_worker_process()
  354. self._worker_handler = self.Supervisor(self)
  355. self._worker_handler.start()
  356. self._putlock = threading.BoundedSemaphore(self._processes)
  357. self._task_handler = self.TaskHandler(self._taskqueue,
  358. self._quick_put,
  359. self._outqueue,
  360. self._pool)
  361. self._task_handler.start()
  362. # Thread killing timedout jobs.
  363. if self.timeout or self.soft_timeout:
  364. self._timeout_handler = self.TimeoutHandler(
  365. self._pool, self._cache,
  366. self.soft_timeout, self.timeout)
  367. self._timeout_handler.start()
  368. else:
  369. self._timeout_handler = None
  370. # Thread processing results in the outqueue.
  371. self._result_handler = self.ResultHandler(self._outqueue,
  372. self._quick_get, self._cache,
  373. self._poll_result,
  374. self._join_exited_workers,
  375. self._putlock)
  376. self._result_handler.start()
  377. self._terminate = Finalize(
  378. self, self._terminate_pool,
  379. args=(self._taskqueue, self._inqueue, self._outqueue,
  380. self._pool, self._worker_handler, self._task_handler,
  381. self._result_handler, self._cache,
  382. self._timeout_handler),
  383. exitpriority=15,
  384. )
  385. def _create_worker_process(self):
  386. w = self.Process(
  387. target=worker,
  388. args=(self._inqueue, self._outqueue,
  389. self._initializer, self._initargs,
  390. self._maxtasksperchild),
  391. )
  392. self._pool.append(w)
  393. w.name = w.name.replace('Process', 'PoolWorker')
  394. w.daemon = True
  395. w.start()
  396. return w
  397. def _join_exited_workers(self):
  398. """Cleanup after any worker processes which have exited due to
  399. reaching their specified lifetime. Returns True if any workers were
  400. cleaned up.
  401. """
  402. cleaned = []
  403. for i in reversed(range(len(self._pool))):
  404. worker = self._pool[i]
  405. if worker.exitcode is not None:
  406. # worker exited
  407. debug('cleaning up worker %d' % i)
  408. if self._putlock is not None:
  409. try:
  410. self._putlock.release()
  411. except ValueError:
  412. pass
  413. worker.join()
  414. cleaned.append(worker.pid)
  415. del self._pool[i]
  416. if cleaned:
  417. for job in self._cache.values():
  418. for worker_pid in job.worker_pids():
  419. if worker_pid in cleaned:
  420. err = WorkerLostError("Worker exited prematurely.")
  421. job._set(None, (False, err))
  422. continue
  423. return True
  424. return False
  425. def _repopulate_pool(self):
  426. """Bring the number of pool processes up to the specified number,
  427. for use after reaping workers which have exited.
  428. """
  429. debug('repopulating pool')
  430. for i in range(self._processes - len(self._pool)):
  431. if self._state != RUN:
  432. return
  433. self._create_worker_process()
  434. debug('added worker')
  435. def _maintain_pool(self):
  436. """"Clean up any exited workers and start replacements for them.
  437. """
  438. if self._join_exited_workers():
  439. self._repopulate_pool()
  440. def _setup_queues(self):
  441. from multiprocessing.queues import SimpleQueue
  442. self._inqueue = SimpleQueue()
  443. self._outqueue = SimpleQueue()
  444. self._quick_put = self._inqueue._writer.send
  445. self._quick_get = self._outqueue._reader.recv
  446. def _poll_result(timeout):
  447. if self._outqueue._reader.poll(timeout):
  448. return True, self._quick_get()
  449. return False, None
  450. self._poll_result = _poll_result
  451. def apply(self, func, args=(), kwds={}):
  452. '''
  453. Equivalent of `apply()` builtin
  454. '''
  455. assert self._state == RUN
  456. return self.apply_async(func, args, kwds).get()
  457. def map(self, func, iterable, chunksize=None):
  458. '''
  459. Equivalent of `map()` builtin
  460. '''
  461. assert self._state == RUN
  462. return self.map_async(func, iterable, chunksize).get()
  463. def imap(self, func, iterable, chunksize=1):
  464. '''
  465. Equivalent of `itertools.imap()` -- can be MUCH slower
  466. than `Pool.map()`
  467. '''
  468. assert self._state == RUN
  469. if chunksize == 1:
  470. result = IMapIterator(self._cache)
  471. self._taskqueue.put((((result._job, i, func, (x,), {})
  472. for i, x in enumerate(iterable)), result._set_length))
  473. return result
  474. else:
  475. assert chunksize > 1
  476. task_batches = Pool._get_tasks(func, iterable, chunksize)
  477. result = IMapIterator(self._cache)
  478. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  479. for i, x in enumerate(task_batches)), result._set_length))
  480. return (item for chunk in result for item in chunk)
  481. def imap_unordered(self, func, iterable, chunksize=1):
  482. '''
  483. Like `imap()` method but ordering of results is arbitrary
  484. '''
  485. assert self._state == RUN
  486. if chunksize == 1:
  487. result = IMapUnorderedIterator(self._cache)
  488. self._taskqueue.put((((result._job, i, func, (x,), {})
  489. for i, x in enumerate(iterable)), result._set_length))
  490. return result
  491. else:
  492. assert chunksize > 1
  493. task_batches = Pool._get_tasks(func, iterable, chunksize)
  494. result = IMapUnorderedIterator(self._cache)
  495. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  496. for i, x in enumerate(task_batches)), result._set_length))
  497. return (item for chunk in result for item in chunk)
  498. def apply_async(self, func, args=(), kwds={},
  499. callback=None, accept_callback=None, timeout_callback=None,
  500. waitforslot=False, error_callback=None):
  501. '''
  502. Asynchronous equivalent of `apply()` builtin.
  503. Callback is called when the functions return value is ready.
  504. The accept callback is called when the job is accepted to be executed.
  505. Simplified the flow is like this:
  506. >>> if accept_callback:
  507. ... accept_callback()
  508. >>> retval = func(*args, **kwds)
  509. >>> if callback:
  510. ... callback(retval)
  511. '''
  512. assert self._state == RUN
  513. result = ApplyResult(self._cache, callback,
  514. accept_callback, timeout_callback,
  515. error_callback)
  516. if waitforslot:
  517. self._putlock.acquire()
  518. self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
  519. return result
  520. def map_async(self, func, iterable, chunksize=None, callback=None):
  521. '''
  522. Asynchronous equivalent of `map()` builtin
  523. '''
  524. assert self._state == RUN
  525. if not hasattr(iterable, '__len__'):
  526. iterable = list(iterable)
  527. if chunksize is None:
  528. chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
  529. if extra:
  530. chunksize += 1
  531. if len(iterable) == 0:
  532. chunksize = 0
  533. task_batches = Pool._get_tasks(func, iterable, chunksize)
  534. result = MapResult(self._cache, chunksize, len(iterable), callback)
  535. self._taskqueue.put((((result._job, i, mapstar, (x,), {})
  536. for i, x in enumerate(task_batches)), None))
  537. return result
  538. @staticmethod
  539. def _get_tasks(func, it, size):
  540. it = iter(it)
  541. while 1:
  542. x = tuple(itertools.islice(it, size))
  543. if not x:
  544. return
  545. yield (func, x)
  546. def __reduce__(self):
  547. raise NotImplementedError(
  548. 'pool objects cannot be passed between '
  549. 'processes or pickled')
  550. def close(self):
  551. debug('closing pool')
  552. if self._state == RUN:
  553. self._state = CLOSE
  554. self._worker_handler.close()
  555. self._worker_handler.join()
  556. self._taskqueue.put(None)
  557. def terminate(self):
  558. debug('terminating pool')
  559. self._state = TERMINATE
  560. self._worker_handler.terminate()
  561. self._terminate()
  562. def join(self):
  563. assert self._state in (CLOSE, TERMINATE)
  564. debug('joining worker handler')
  565. self._worker_handler.join()
  566. debug('joining task handler')
  567. self._task_handler.join()
  568. debug('joining result handler')
  569. self._result_handler.join()
  570. for i, p in enumerate(self._pool):
  571. debug('joining worker %s/%s (%r)' % (i, len(self._pool), p, ))
  572. p.join()
  573. @staticmethod
  574. def _help_stuff_finish(inqueue, task_handler, size):
  575. # task_handler may be blocked trying to put items on inqueue
  576. debug('removing tasks from inqueue until task handler finished')
  577. inqueue._rlock.acquire()
  578. while task_handler.is_alive() and inqueue._reader.poll():
  579. inqueue._reader.recv()
  580. time.sleep(0)
  581. @classmethod
  582. def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
  583. worker_handler, task_handler,
  584. result_handler, cache, timeout_handler):
  585. # this is guaranteed to only be called once
  586. debug('finalizing pool')
  587. worker_handler.terminate()
  588. task_handler.terminate()
  589. taskqueue.put(None) # sentinel
  590. debug('helping task handler/workers to finish')
  591. cls._help_stuff_finish(inqueue, task_handler, len(pool))
  592. assert result_handler.is_alive() or len(cache) == 0
  593. result_handler.terminate()
  594. outqueue.put(None) # sentinel
  595. if timeout_handler is not None:
  596. timeout_handler.terminate()
  597. # Terminate workers which haven't already finished
  598. if pool and hasattr(pool[0], 'terminate'):
  599. debug('terminating workers')
  600. for p in pool:
  601. if p.exitcode is None:
  602. p.terminate()
  603. debug('joining task handler')
  604. task_handler.join(1e100)
  605. debug('joining result handler')
  606. result_handler.join(1e100)
  607. if timeout_handler is not None:
  608. debug('joining timeout handler')
  609. timeout_handler.join(1e100)
  610. if pool and hasattr(pool[0], 'terminate'):
  611. debug('joining pool workers')
  612. for p in pool:
  613. if p.is_alive():
  614. # worker has not yet exited
  615. debug('cleaning up worker %d' % p.pid)
  616. p.join()
  617. DynamicPool = Pool
  618. #
  619. # Class whose instances are returned by `Pool.apply_async()`
  620. #
  621. class ApplyResult(object):
  622. def __init__(self, cache, callback, accept_callback=None,
  623. timeout_callback=None, error_callback=None):
  624. self._cond = threading.Condition(threading.Lock())
  625. self._job = job_counter.next()
  626. self._cache = cache
  627. self._ready = False
  628. self._callback = callback
  629. self._accept_callback = accept_callback
  630. self._errback = error_callback
  631. self._timeout_callback = timeout_callback
  632. self._accepted = False
  633. self._worker_pid = None
  634. self._time_accepted = None
  635. cache[self._job] = self
  636. def ready(self):
  637. return self._ready
  638. def accepted(self):
  639. return self._accepted
  640. def successful(self):
  641. assert self._ready
  642. return self._success
  643. def worker_pids(self):
  644. return filter(None, [self._worker_pid])
  645. def wait(self, timeout=None):
  646. self._cond.acquire()
  647. try:
  648. if not self._ready:
  649. self._cond.wait(timeout)
  650. finally:
  651. self._cond.release()
  652. def get(self, timeout=None):
  653. self.wait(timeout)
  654. if not self._ready:
  655. raise TimeoutError
  656. if self._success:
  657. return self._value
  658. else:
  659. raise self._value
  660. def _set(self, i, obj):
  661. self._success, self._value = obj
  662. if self._callback and self._success:
  663. self._callback(self._value)
  664. if self._errback and not self._success:
  665. self._errback(self._value)
  666. self._cond.acquire()
  667. try:
  668. self._ready = True
  669. self._cond.notify()
  670. finally:
  671. self._cond.release()
  672. if self._accepted:
  673. self._cache.pop(self._job, None)
  674. def _ack(self, i, time_accepted, pid):
  675. self._accepted = True
  676. self._time_accepted = time_accepted
  677. self._worker_pid = pid
  678. if self._accept_callback:
  679. self._accept_callback()
  680. if self._ready:
  681. self._cache.pop(self._job, None)
  682. #
  683. # Class whose instances are returned by `Pool.map_async()`
  684. #
  685. class MapResult(ApplyResult):
  686. def __init__(self, cache, chunksize, length, callback):
  687. ApplyResult.__init__(self, cache, callback)
  688. self._success = True
  689. self._length = length
  690. self._value = [None] * length
  691. self._accepted = [False] * length
  692. self._worker_pid = [None] * length
  693. self._time_accepted = [None] * length
  694. self._chunksize = chunksize
  695. if chunksize <= 0:
  696. self._number_left = 0
  697. self._ready = True
  698. else:
  699. self._number_left = length // chunksize + bool(length % chunksize)
  700. def _set(self, i, success_result):
  701. success, result = success_result
  702. if success:
  703. self._value[i * self._chunksize:(i + 1) * self._chunksize] = result
  704. self._number_left -= 1
  705. if self._number_left == 0:
  706. if self._callback:
  707. self._callback(self._value)
  708. if self._accepted:
  709. self._cache.pop(self._job, None)
  710. self._cond.acquire()
  711. try:
  712. self._ready = True
  713. self._cond.notify()
  714. finally:
  715. self._cond.release()
  716. else:
  717. self._success = False
  718. self._value = result
  719. if self._accepted:
  720. self._cache.pop(self._job, None)
  721. self._cond.acquire()
  722. try:
  723. self._ready = True
  724. self._cond.notify()
  725. finally:
  726. self._cond.release()
  727. def _ack(self, i, time_accepted, pid):
  728. start = i * self._chunksize
  729. stop = (i + 1) * self._chunksize
  730. for j in range(start, stop):
  731. self._accepted[j] = True
  732. self._worker_pid[j] = pid
  733. self._time_accepted[j] = time_accepted
  734. if self._ready:
  735. self._cache.pop(self._job, None)
  736. def accepted(self):
  737. return all(self._accepted)
  738. def worker_pids(self):
  739. return filter(None, self._worker_pid)
  740. #
  741. # Class whose instances are returned by `Pool.imap()`
  742. #
  743. class IMapIterator(object):
  744. def __init__(self, cache):
  745. self._cond = threading.Condition(threading.Lock())
  746. self._job = job_counter.next()
  747. self._cache = cache
  748. self._items = collections.deque()
  749. self._index = 0
  750. self._length = None
  751. self._unsorted = {}
  752. cache[self._job] = self
  753. def __iter__(self):
  754. return self
  755. def next(self, timeout=None):
  756. self._cond.acquire()
  757. try:
  758. try:
  759. item = self._items.popleft()
  760. except IndexError:
  761. if self._index == self._length:
  762. raise StopIteration
  763. self._cond.wait(timeout)
  764. try:
  765. item = self._items.popleft()
  766. except IndexError:
  767. if self._index == self._length:
  768. raise StopIteration
  769. raise TimeoutError
  770. finally:
  771. self._cond.release()
  772. success, value = item
  773. if success:
  774. return value
  775. raise value
  776. __next__ = next # XXX
  777. def _set(self, i, obj):
  778. self._cond.acquire()
  779. try:
  780. if self._index == i:
  781. self._items.append(obj)
  782. self._index += 1
  783. while self._index in self._unsorted:
  784. obj = self._unsorted.pop(self._index)
  785. self._items.append(obj)
  786. self._index += 1
  787. self._cond.notify()
  788. else:
  789. self._unsorted[i] = obj
  790. if self._index == self._length:
  791. del self._cache[self._job]
  792. finally:
  793. self._cond.release()
  794. def _set_length(self, length):
  795. self._cond.acquire()
  796. try:
  797. self._length = length
  798. if self._index == self._length:
  799. self._cond.notify()
  800. del self._cache[self._job]
  801. finally:
  802. self._cond.release()
  803. #
  804. # Class whose instances are returned by `Pool.imap_unordered()`
  805. #
  806. class IMapUnorderedIterator(IMapIterator):
  807. def _set(self, i, obj):
  808. self._cond.acquire()
  809. try:
  810. self._items.append(obj)
  811. self._index += 1
  812. self._cond.notify()
  813. if self._index == self._length:
  814. del self._cache[self._job]
  815. finally:
  816. self._cond.release()
  817. #
  818. #
  819. #
  820. class ThreadPool(Pool):
  821. from multiprocessing.dummy import Process as DummyProcess
  822. Process = DummyProcess
  823. def __init__(self, processes=None, initializer=None, initargs=()):
  824. Pool.__init__(self, processes, initializer, initargs)
  825. def _setup_queues(self):
  826. self._inqueue = Queue.Queue()
  827. self._outqueue = Queue.Queue()
  828. self._quick_put = self._inqueue.put
  829. self._quick_get = self._outqueue.get
  830. def _poll_result(timeout):
  831. try:
  832. return True, self._quick_get(timeout=timeout)
  833. except Queue.Empty:
  834. return False, None
  835. self._poll_result = _poll_result
  836. @staticmethod
  837. def _help_stuff_finish(inqueue, task_handler, size):
  838. # put sentinels at head of inqueue to make workers finish
  839. inqueue.not_empty.acquire()
  840. try:
  841. inqueue.queue.clear()
  842. inqueue.queue.extend([None] * size)
  843. inqueue.not_empty.notify_all()
  844. finally:
  845. inqueue.not_empty.release()