processes.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.concurrency.processes
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. Pool implementation using :mod:`multiprocessing`.
  6. We use the billiard fork of multiprocessing which contains
  7. numerous improvements.
  8. """
  9. from __future__ import absolute_import
  10. import errno
  11. import os
  12. import select
  13. import socket
  14. import struct
  15. from collections import deque
  16. from pickle import HIGHEST_PROTOCOL
  17. from time import sleep, time
  18. from billiard import forking_enable
  19. from billiard import pool as _pool
  20. from billiard.exceptions import WorkerLostError
  21. from billiard.pool import (
  22. RUN, CLOSE, TERMINATE, ACK, NACK, EX_RECYCLE, WorkersJoined, CoroStop,
  23. )
  24. from billiard.queues import _SimpleQueue
  25. from kombu.serialization import pickle as _pickle
  26. from kombu.utils import fxrange
  27. from kombu.utils.compat import get_errno
  28. from kombu.utils.eventio import SELECT_BAD_FD
  29. from celery import platforms
  30. from celery import signals
  31. from celery._state import set_default_app
  32. from celery.concurrency.base import BasePool
  33. from celery.five import items, values
  34. from celery.task import trace
  35. from celery.utils.log import get_logger
  36. from celery.worker.hub import READ, WRITE, ERR
  37. #: List of signals to reset when a child process starts.
  38. WORKER_SIGRESET = frozenset(['SIGTERM',
  39. 'SIGHUP',
  40. 'SIGTTIN',
  41. 'SIGTTOU',
  42. 'SIGUSR1'])
  43. #: List of signals to ignore when a child process starts.
  44. WORKER_SIGIGNORE = frozenset(['SIGINT'])
  45. UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR, errno.EBADF])
  46. MAXTASKS_NO_BILLIARD = """\
  47. maxtasksperchild enabled but billiard C extension not installed!
  48. This may lead to a deadlock, please install the billiard C extension.
  49. """
  50. #: Constant sent by child process when started (ready to accept work)
  51. WORKER_UP = 15
  52. logger = get_logger(__name__)
  53. warning, debug = logger.warning, logger.debug
  54. def process_initializer(app, hostname):
  55. """Pool child process initializer."""
  56. platforms.signals.reset(*WORKER_SIGRESET)
  57. platforms.signals.ignore(*WORKER_SIGIGNORE)
  58. platforms.set_mp_process_title('celeryd', hostname=hostname)
  59. # This is for Windows and other platforms not supporting
  60. # fork(). Note that init_worker makes sure it's only
  61. # run once per process.
  62. app.loader.init_worker()
  63. app.loader.init_worker_process()
  64. app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0)),
  65. os.environ.get('CELERY_LOG_FILE') or None,
  66. bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
  67. str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
  68. if os.environ.get('FORKED_BY_MULTIPROCESSING'):
  69. # pool did execv after fork
  70. trace.setup_worker_optimizations(app)
  71. else:
  72. app.set_current()
  73. set_default_app(app)
  74. app.finalize()
  75. trace._tasks = app._tasks # enables fast_trace_task optimization.
  76. from celery.task.trace import build_tracer
  77. for name, task in items(app.tasks):
  78. task.__trace__ = build_tracer(name, task, app.loader, hostname)
  79. signals.worker_process_init.send(sender=None)
  80. def _select(self, readers=None, writers=None, err=None, timeout=0):
  81. readers = set() if readers is None else readers
  82. writers = set() if writers is None else writers
  83. err = set() if err is None else err
  84. try:
  85. r, w, e = select.select(readers, writers, err, timeout)
  86. if e:
  87. seen = set()
  88. r = r | set(f for f in r + e if f not in seen and not seen.add(f))
  89. return r, w, 0
  90. except (select.error, socket.error) as exc:
  91. if get_errno(exc) == errno.EINTR:
  92. return [], [], 1
  93. elif get_errno(exc) in SELECT_BAD_FD:
  94. for fd in readers | writers | err:
  95. try:
  96. select.select([fd], [], [], 0)
  97. except (select.error, socket.error) as exc:
  98. if get_errno(exc) not in SELECT_BAD_FD:
  99. raise
  100. readers.discard(fd)
  101. writers.discard(fd)
  102. err.discard(fd)
  103. return [], [], 1
  104. else:
  105. raise
  106. class promise(object):
  107. def __init__(self, fun, *partial_args, **partial_kwargs):
  108. self.fun = fun
  109. self.args = partial_args
  110. self.kwargs = partial_kwargs
  111. self.ready = False
  112. def __call__(self, *args, **kwargs):
  113. try:
  114. return self.fun(*tuple(self.args) + tuple(args),
  115. **dict(self.kwargs, **kwargs))
  116. finally:
  117. self.ready = True
  118. class Worker(_pool.Worker):
  119. def on_loop_start(self, pid):
  120. self.outq.put((WORKER_UP, (pid, )))
  121. class ResultHandler(_pool.ResultHandler):
  122. def __init__(self, *args, **kwargs):
  123. self.fileno_to_outq = kwargs.pop('fileno_to_outq')
  124. self.on_process_alive = kwargs.pop('on_process_alive')
  125. super(ResultHandler, self).__init__(*args, **kwargs)
  126. self.state_handlers[WORKER_UP] = self.on_process_alive
  127. def _process_result(self):
  128. fileno_to_outq = self.fileno_to_outq
  129. on_state_change = self.on_state_change
  130. while 1:
  131. fileno = (yield)
  132. proc = fileno_to_outq[fileno]
  133. reader = proc.outq._reader
  134. try:
  135. if reader.poll(0):
  136. ready, task = True, reader.recv()
  137. else:
  138. ready, task = False, None
  139. except (IOError, EOFError) as exc:
  140. debug('result handler got %r -- exiting' % (exc, ))
  141. raise CoroStop()
  142. if self._state:
  143. assert self._state == TERMINATE
  144. debug('result handler found thread._state==TERMINATE')
  145. raise CoroStop()
  146. if ready:
  147. if task is None:
  148. debug('result handler got sentinel -- exiting')
  149. raise CoroStop()
  150. on_state_change(task)
  151. def handle_event(self, fileno=None, event=None):
  152. if self._state == RUN:
  153. it = self._it
  154. if it is None:
  155. it = self._it = self._process_result()
  156. next(it)
  157. try:
  158. it.send(fileno)
  159. except (StopIteration, CoroStop):
  160. self._it = None
  161. def on_stop_not_started(self):
  162. cache = self.cache
  163. check_timeouts = self.check_timeouts
  164. fileno_to_outq = self.fileno_to_outq
  165. on_state_change = self.on_state_change
  166. join_exited_workers = self.join_exited_workers
  167. outqueues = set(fileno_to_outq)
  168. while cache and outqueues and self._state != TERMINATE:
  169. if check_timeouts is not None:
  170. check_timeouts()
  171. for fd in outqueues:
  172. try:
  173. proc = fileno_to_outq[fd]
  174. except KeyError:
  175. outqueues.discard(fd)
  176. break
  177. reader = proc.outq._reader
  178. try:
  179. if reader.poll(0):
  180. task = reader.recv()
  181. else:
  182. task = None
  183. sleep(0.5)
  184. except (IOError, EOFError):
  185. outqueues.discard(fd)
  186. break
  187. else:
  188. if task:
  189. on_state_change(task)
  190. try:
  191. join_exited_workers(shutdown=True)
  192. except WorkersJoined:
  193. debug('result handler: all workers terminated')
  194. return
  195. class AsynPool(_pool.Pool):
  196. ResultHandler = ResultHandler
  197. Worker = Worker
  198. def __init__(self, processes=None, synack=False, *args, **kwargs):
  199. processes = self.cpu_count() if processes is None else processes
  200. self.synack = synack
  201. self._queues = dict((self.create_process_queues(), None)
  202. for _ in range(processes))
  203. self._fileno_to_inq = {}
  204. self._fileno_to_outq = {}
  205. self._fileno_to_synq = {}
  206. self._all_inqueues = set()
  207. super(AsynPool, self).__init__(processes, *args, **kwargs)
  208. for proc in self._pool:
  209. self._fileno_to_inq[proc.inqW_fd] = proc
  210. self._fileno_to_outq[proc.outqR_fd] = proc
  211. self._fileno_to_synq[proc.synqW_fd] = proc
  212. def _finalize_args(self):
  213. orig = super(AsynPool, self)._finalize_args()
  214. return (self._fileno_to_inq, orig)
  215. def get_process_queues(self):
  216. return next(q for q, owner in items(self._queues)
  217. if owner is None)
  218. def on_grow(self, n):
  219. self._queues.update(
  220. dict((self.create_process_queues(), None)
  221. for _ in range(self._processes - len(self._queues)))
  222. )
  223. def on_shrink(self, n):
  224. queues = self._queues
  225. for i in range(n):
  226. if len(queues) > self._processes:
  227. try:
  228. queues.pop(next(
  229. q for q, owner in items(queues) if owner is None
  230. ), None)
  231. except StopIteration:
  232. break
  233. def create_process_queues(self):
  234. inq, outq, synq = _SimpleQueue(), _SimpleQueue(), None
  235. inq._writer.setblocking(0)
  236. if self.synack:
  237. synq = _SimpleQueue()
  238. synq._writer.setblocking(0)
  239. return inq, outq, synq
  240. def on_process_alive(self, pid):
  241. try:
  242. proc = next(w for w in self._pool if w.pid == pid)
  243. except StopIteration:
  244. return
  245. self._fileno_to_inq[proc.inqW_fd] = proc
  246. self._fileno_to_synq[proc.synqW_fd] = proc
  247. self._all_inqueues.add(proc.inqW_fd)
  248. def on_job_process_down(self, job, pid_gone):
  249. if job._write_to:
  250. self.on_partial_read(job, job._write_to)
  251. elif job._scheduled_for:
  252. self._put_back(job)
  253. def on_job_process_lost(self, job, pid, exitcode):
  254. self.mark_as_worker_lost(job, exitcode)
  255. def _process_cleanup_queues(self, proc):
  256. try:
  257. self._queues[self._find_worker_queues(proc)] = None
  258. except (KeyError, ValueError):
  259. pass
  260. @staticmethod
  261. def _stop_task_handler(task_handler):
  262. for proc in task_handler.pool:
  263. proc.inq._writer.setblocking(1)
  264. proc.inq.put(None)
  265. def create_result_handler(self):
  266. return super(AsynPool, self).create_result_handler(
  267. fileno_to_outq=self._fileno_to_outq,
  268. on_process_alive=self.on_process_alive,
  269. )
  270. def _process_register_queues(self, proc, queues):
  271. self._queues[queues] = proc
  272. def _find_worker_queues(self, proc):
  273. try:
  274. return next(q for q, owner in items(self._queues)
  275. if owner == proc)
  276. except StopIteration:
  277. raise ValueError(proc)
  278. def _setup_queues(self):
  279. self._inqueue = self._outqueue = \
  280. self._quick_put = self._quick_get = self._poll_result = None
  281. def process_flush_queues(self, proc):
  282. resq = proc.outq._reader
  283. if not resq.closed:
  284. while resq.poll(0):
  285. self.handle_result_event(resq.fileno())
  286. def on_partial_read(self, job, proc):
  287. # worker terminated by signal:
  288. # we cannot reuse the sockets again, because we don't know if
  289. # the process wrote/read anything frmo them, and if so we cannot
  290. # restore the message boundaries.
  291. if proc.exitcode != EX_RECYCLE:
  292. # job was not acked, so find another worker to send it to.
  293. if not job._accepted:
  294. self._put_back(job)
  295. for conn in (proc.inq, proc.outq, proc.synq):
  296. if conn:
  297. for sock in (conn._reader, conn._writer):
  298. if not sock.closed:
  299. sock.close()
  300. #os.close(sock.fileno())
  301. self._queues.pop((proc.inq, proc.outq, proc.synq), None)
  302. self._queues[self.create_process_queues()] = None
  303. self.on_inqueue_close(proc.inqW_fd)
  304. @classmethod
  305. def _set_result_sentinel(cls, _outqueue, _pool):
  306. pass
  307. def _help_stuff_finish_args(self):
  308. return (self._pool, )
  309. @classmethod
  310. def _help_stuff_finish(cls, pool):
  311. # task_handler may be blocked trying to put items on inqueue
  312. debug(
  313. 'removing tasks from inqueue until task handler finished',
  314. )
  315. fileno_to_proc = dict((w.inq._reader.fileno(), w) for w in pool)
  316. inqR = set(fileno_to_proc)
  317. while inqR:
  318. readable, _, again = _select(inqR, timeout=0.5)
  319. if again:
  320. continue
  321. if not readable:
  322. break
  323. for fd in readable:
  324. fileno_to_proc[fd]._reader.recv()
  325. sleep(0)
  326. class TaskPool(BasePool):
  327. """Multiprocessing Pool implementation."""
  328. Pool = AsynPool
  329. BlockingPool = _pool.Pool
  330. uses_semaphore = True
  331. def on_start(self):
  332. """Run the task pool.
  333. Will pre-fork all workers so they're ready to accept tasks.
  334. """
  335. if self.options.get('maxtasksperchild'):
  336. try:
  337. import _billiard # noqa
  338. _billiard.Connection.send_offset
  339. except (ImportError, AttributeError):
  340. # billiard C extension not installed
  341. warning(MAXTASKS_NO_BILLIARD)
  342. forking_enable(self.forking_enable)
  343. Pool = (self.BlockingPool if self.options.get('threads', True)
  344. else self.Pool)
  345. P = self._pool = Pool(processes=self.limit,
  346. initializer=process_initializer,
  347. synack=False,
  348. **self.options)
  349. self.on_apply = P.apply_async
  350. self.on_soft_timeout = P._timeout_handler.on_soft_timeout
  351. self.on_hard_timeout = P._timeout_handler.on_hard_timeout
  352. self.maintain_pool = P.maintain_pool
  353. self.terminate_job = self._pool.terminate_job
  354. self.grow = self._pool.grow
  355. self.shrink = self._pool.shrink
  356. self.restart = self._pool.restart
  357. self.maybe_handle_result = P._result_handler.handle_event
  358. self.outbound_buffer = deque()
  359. self.handle_result_event = P.handle_result_event
  360. self._active_writes = set()
  361. self._active_writers = set()
  362. def did_start_ok(self):
  363. return self._pool.did_start_ok()
  364. def on_stop(self):
  365. """Gracefully stop the pool."""
  366. if self._pool is not None and self._pool._state in (RUN, CLOSE):
  367. self._pool.close()
  368. self._pool.join()
  369. self._pool = None
  370. def on_terminate(self):
  371. """Force terminate the pool."""
  372. if self._pool is not None:
  373. self._pool.terminate()
  374. self._pool = None
  375. def on_close(self):
  376. if self._pool is not None and self._pool._state == RUN:
  377. self._pool.close()
  378. def _get_info(self):
  379. return {'max-concurrency': self.limit,
  380. 'processes': [p.pid for p in self._pool._pool],
  381. 'max-tasks-per-child': self._pool._maxtasksperchild,
  382. 'put-guarded-by-semaphore': self.putlocks,
  383. 'timeouts': (self._pool.soft_timeout, self._pool.timeout)}
  384. def on_poll_init(self, w, hub,
  385. now=time, protocol=HIGHEST_PROTOCOL, pack=struct.pack,
  386. dumps=_pickle.dumps):
  387. pool = self._pool
  388. apply_after = hub.timer.apply_after
  389. apply_at = hub.timer.apply_at
  390. maintain_pool = self.maintain_pool
  391. on_soft_timeout = self.on_soft_timeout
  392. on_hard_timeout = self.on_hard_timeout
  393. fileno_to_inq = pool._fileno_to_inq
  394. fileno_to_outq = pool._fileno_to_outq
  395. fileno_to_synq = pool._fileno_to_synq
  396. outbound = self.outbound_buffer
  397. pop_message = outbound.popleft
  398. put_message = outbound.append
  399. all_inqueues = pool._all_inqueues
  400. active_writes = self._active_writes
  401. diff = all_inqueues.difference
  402. hub_add, hub_remove = hub.add, hub.remove
  403. mark_write_fd_as_active = active_writes.add
  404. mark_write_gen_as_active = self._active_writers.add
  405. write_generator_gone = self._active_writers.discard
  406. get_job = pool._cache.__getitem__
  407. pool._put_back = put_message
  408. # did_start_ok will verify that pool processes were able to start,
  409. # but this will only work the first time we start, as
  410. # maxtasksperchild will mess up metrics.
  411. if not w.consumer.restart_count and not pool.did_start_ok():
  412. raise WorkerLostError('Could not start worker processes')
  413. hub_add(pool.process_sentinels, self.maintain_pool, READ | ERR)
  414. hub_add(fileno_to_outq, self.handle_result_event, READ | ERR)
  415. for handler, interval in items(self.timers):
  416. hub.timer.apply_interval(interval * 1000.0, handler)
  417. # need to handle pool results before every task
  418. # since multiple tasks can be received in a single poll()
  419. # XXX do we need this now?!?
  420. # hub.on_task.append(pool.maybe_handle_result)
  421. def on_timeout_set(R, soft, hard):
  422. def _on_soft_timeout():
  423. if hard:
  424. R._tref = apply_at(now() + (hard - soft),
  425. on_hard_timeout, (R, ))
  426. on_soft_timeout(R)
  427. if soft:
  428. R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
  429. elif hard:
  430. R._tref = apply_after(hard * 1000.0,
  431. on_hard_timeout, (R, ))
  432. self._pool.on_timeout_set = on_timeout_set
  433. def on_timeout_cancel(result):
  434. try:
  435. result._tref.cancel()
  436. delattr(result, '_tref')
  437. except AttributeError:
  438. pass
  439. self._pool.on_timeout_cancel = on_timeout_cancel
  440. def on_process_up(proc):
  441. fileno_to_outq[proc.outqR_fd] = proc
  442. hub_add(proc.sentinel, maintain_pool, READ | ERR)
  443. hub_add(proc.outqR_fd, pool.handle_result_event, READ | ERR)
  444. self._pool.on_process_up = on_process_up
  445. def on_process_down(proc):
  446. fileno_to_outq.pop(proc.outqR_fd, None)
  447. fileno_to_inq.pop(proc.inqW_fd, None)
  448. fileno_to_synq.pop(proc.synqW_fd, None)
  449. all_inqueues.discard(proc.inqW_fd)
  450. hub_remove(proc.sentinel)
  451. hub_remove(proc.outqR_fd)
  452. self._pool.on_process_down = on_process_down
  453. class Ack(object):
  454. __slots__ = ('id', 'fd', '_payload')
  455. def __init__(self, id, fd, payload):
  456. self.id = id
  457. self.fd = fd
  458. self._payload = payload
  459. def __eq__(self, other):
  460. return self.i == other.i
  461. def __hash__(self):
  462. return self.i
  463. def _write_ack(fd, ack, callback=None):
  464. header, body, body_size = ack._payload
  465. try:
  466. try:
  467. proc = fileno_to_synq[fd]
  468. except KeyError:
  469. raise StopIteration()
  470. send_offset = proc.synq._writer.send_offset
  471. Hw = Bw = 0
  472. while Hw < 4:
  473. try:
  474. Hw += send_offset(header, Hw)
  475. except Exception as exc:
  476. if get_errno(exc) not in UNAVAIL:
  477. raise
  478. yield
  479. while Bw < body_size:
  480. try:
  481. Bw += send_offset(body, Bw)
  482. except Exception as exc:
  483. if get_errno(exc) not in UNAVAIL:
  484. raise
  485. # suspend until more data
  486. yield
  487. finally:
  488. if callback:
  489. callback()
  490. active_writes.discard(fd)
  491. def _write_job(fd, job, callback=None):
  492. header, body, body_size = job._payload
  493. try:
  494. try:
  495. proc = fileno_to_inq[fd]
  496. except KeyError:
  497. put_message(job)
  498. raise StopIteration()
  499. # job result keeps track of what process the job is sent to.
  500. job._write_to = proc
  501. send_offset = proc.inq._writer.send_offset
  502. Hw = Bw = 0
  503. while Hw < 4:
  504. try:
  505. Hw += send_offset(header, Hw)
  506. except Exception as exc:
  507. if get_errno(exc) not in UNAVAIL:
  508. raise
  509. # suspend until more data
  510. yield
  511. while Bw < body_size:
  512. try:
  513. Bw += send_offset(body, Bw)
  514. except Exception as exc:
  515. if get_errno(exc) not in UNAVAIL:
  516. raise
  517. # suspend until more data
  518. yield
  519. finally:
  520. if callback:
  521. callback()
  522. active_writes.discard(fd)
  523. def on_inqueue_close(fd):
  524. active_writes.discard(fd)
  525. all_inqueues.discard(fd)
  526. self._pool.on_inqueue_close = on_inqueue_close
  527. def schedule_writes(ready_fd, events):
  528. if ready_fd in active_writes:
  529. return
  530. try:
  531. job = pop_message()
  532. except IndexError:
  533. for inqfd in diff(active_writes):
  534. hub_remove(inqfd)
  535. else:
  536. if not job._accepted:
  537. callback = promise(write_generator_gone)
  538. try:
  539. job._scheduled_for = fileno_to_inq[ready_fd]
  540. except KeyError:
  541. # process gone since scheduled, put back
  542. return put_message(job)
  543. cor = _write_job(ready_fd, job, callback=callback)
  544. mark_write_gen_as_active(cor)
  545. mark_write_fd_as_active(ready_fd)
  546. callback.args = (cor, ) # tricky as we need to pass ref
  547. hub_add((ready_fd, ), cor, WRITE | ERR)
  548. def _create_payload(type_, args):
  549. body = dumps((type_, args), protocol=protocol)
  550. size = len(body)
  551. header = pack('>I', size)
  552. return header, body, size
  553. MESSAGES = {ACK: _create_payload(ACK, (0, )),
  554. NACK: _create_payload(NACK, (0, ))}
  555. def send_ack(response, pid, job, fd):
  556. msg = Ack(job, fd, MESSAGES[response])
  557. callback = promise(write_generator_gone)
  558. cor = _write_ack(fd, msg, callback=callback)
  559. mark_write_gen_as_active(cor)
  560. mark_write_fd_as_active(fd)
  561. callback.args = (cor, )
  562. hub_add((fd, ), cor, WRITE | ERR)
  563. self._pool.send_ack = send_ack
  564. def on_poll_start(hub):
  565. if outbound:
  566. hub_add(diff(active_writes), schedule_writes, WRITE | ERR)
  567. self.on_poll_start = on_poll_start
  568. def quick_put(tup):
  569. body = dumps(tup, protocol=protocol)
  570. body_size = len(body)
  571. header = pack('>I', body_size)
  572. # index 1,0 is the job ID.
  573. job = get_job(tup[1][0])
  574. job._payload = header, buffer(body), body_size
  575. put_message(job)
  576. self._pool._quick_put = quick_put
  577. def handle_timeouts(self):
  578. if self._pool._timeout_handler:
  579. self._pool._timeout_handler.handle_event()
  580. def flush(self):
  581. # cancel all tasks that have not been accepted to that NACK is sent.
  582. for job in values(self._pool._cache):
  583. if not job._accepted:
  584. job._cancel()
  585. # clear the outgoing buffer as the tasks will be redelivered by
  586. # the broker anyway.
  587. if self.outbound_buffer:
  588. self.outbound_buffer.clear()
  589. try:
  590. # but we must continue to write the payloads we already started
  591. # otherwise we will not recover the message boundaries.
  592. # the messages will be NACK'ed later.
  593. if self._pool._state == RUN:
  594. # flush outgoing buffers
  595. intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)
  596. while self._active_writers:
  597. writers = list(self._active_writers)
  598. for gen in writers:
  599. if (gen.__name__ == '_write_job' and
  600. gen.gi_frame.f_lasti != -1):
  601. # has not started writing the job so can
  602. # safely discard
  603. self._active_writers.discard(gen)
  604. else:
  605. try:
  606. next(gen)
  607. except StopIteration:
  608. self._active_writers.discard(gen)
  609. # workers may have exited in the meantime.
  610. self.maintain_pool()
  611. sleep(next(intervals)) # don't busyloop
  612. finally:
  613. self.outbound_buffer.clear()
  614. self._active_writers.clear()
  615. @property
  616. def num_processes(self):
  617. return self._pool._processes
  618. @property
  619. def timers(self):
  620. return {self.maintain_pool: 5.0}