processes.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.concurrency.processes
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. Pool implementation using :mod:`multiprocessing`.
  6. We use the billiard fork of multiprocessing which contains
  7. numerous improvements.
  8. """
  9. from __future__ import absolute_import
  10. import errno
  11. import os
  12. import select
  13. import socket
  14. import struct
  15. from collections import deque
  16. from pickle import HIGHEST_PROTOCOL
  17. from time import sleep, time
  18. from billiard import forking_enable
  19. from billiard import pool as _pool
  20. from billiard.exceptions import WorkerLostError
  21. from billiard.pool import RUN, CLOSE, TERMINATE, WorkersJoined, CoroStop
  22. from billiard.queues import _SimpleQueue
  23. from kombu.serialization import pickle as _pickle
  24. from kombu.utils import fxrange
  25. from kombu.utils.compat import get_errno
  26. from kombu.utils.eventio import SELECT_BAD_FD
  27. from celery import platforms
  28. from celery import signals
  29. from celery._state import set_default_app
  30. from celery.concurrency.base import BasePool
  31. from celery.five import items
  32. from celery.task import trace
  33. from celery.utils.log import get_logger
  34. from celery.worker.hub import READ, WRITE, ERR
  35. #: List of signals to reset when a child process starts.
  36. WORKER_SIGRESET = frozenset(['SIGTERM',
  37. 'SIGHUP',
  38. 'SIGTTIN',
  39. 'SIGTTOU',
  40. 'SIGUSR1'])
  41. #: List of signals to ignore when a child process starts.
  42. WORKER_SIGIGNORE = frozenset(['SIGINT'])
  43. UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR, errno.EBADF])
  44. MAXTASKS_NO_BILLIARD = """\
  45. maxtasksperchild enabled but billiard C extension not installed!
  46. This may lead to a deadlock, please install the billiard C extension.
  47. """
  48. logger = get_logger(__name__)
  49. warning, debug = logger.warning, logger.debug
  50. def process_initializer(app, hostname):
  51. """Initializes the process so it can be used to process tasks."""
  52. platforms.signals.reset(*WORKER_SIGRESET)
  53. platforms.signals.ignore(*WORKER_SIGIGNORE)
  54. platforms.set_mp_process_title('celeryd', hostname=hostname)
  55. # This is for Windows and other platforms not supporting
  56. # fork(). Note that init_worker makes sure it's only
  57. # run once per process.
  58. app.loader.init_worker()
  59. app.loader.init_worker_process()
  60. app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0)),
  61. os.environ.get('CELERY_LOG_FILE') or None,
  62. bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
  63. str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
  64. if os.environ.get('FORKED_BY_MULTIPROCESSING'):
  65. # pool did execv after fork
  66. trace.setup_worker_optimizations(app)
  67. else:
  68. app.set_current()
  69. set_default_app(app)
  70. app.finalize()
  71. trace._tasks = app._tasks # enables fast_trace_task optimization.
  72. from celery.task.trace import build_tracer
  73. for name, task in items(app.tasks):
  74. task.__trace__ = build_tracer(name, task, app.loader, hostname)
  75. signals.worker_process_init.send(sender=None)
  76. def _select(self, readers=None, writers=None, err=None, timeout=0):
  77. readers = set() if readers is None else readers
  78. writers = set() if writers is None else writers
  79. err = set() if err is None else err
  80. try:
  81. r, w, e = select.select(readers, writers, err, timeout)
  82. if e:
  83. _seen = set()
  84. r = [f for f in r + e if f not in _seen and not _seen.add(f)]
  85. return r, w, 0
  86. except (select.error, socket.error) as exc:
  87. if get_errno(exc) == errno.EINTR:
  88. return
  89. elif get_errno(exc) in SELECT_BAD_FD:
  90. for fd in readers | writers | err:
  91. try:
  92. select.select([fd], [], [], 0)
  93. except (select.error, socket.error) as exc:
  94. if get_errno(exc) not in SELECT_BAD_FD:
  95. raise
  96. readers.discard(fd)
  97. writers.discard(fd)
  98. err.discard(fd)
  99. return [], [], 1
  100. class promise(object):
  101. def __init__(self, fun, *partial_args, **partial_kwargs):
  102. self.fun = fun
  103. self.args = partial_args
  104. self.kwargs = partial_kwargs
  105. self.ready = False
  106. def __call__(self, *args, **kwargs):
  107. try:
  108. return self.fun(*tuple(self.args) + tuple(args),
  109. **dict(self.kwargs, **kwargs))
  110. finally:
  111. self.ready = True
  112. class ResultHandler(_pool.ResultHandler):
  113. def __init__(self, *args, **kwargs):
  114. self.fileno_to_outq = kwargs.pop('fileno_to_outq')
  115. super(ResultHandler, self).__init__(*args, **kwargs)
  116. def _process_result(self):
  117. fileno_to_outq = self.fileno_to_outq
  118. on_state_change = self.on_state_change
  119. while 1:
  120. fileno = (yield)
  121. proc = fileno_to_outq[fileno]
  122. reader = proc.outq._reader
  123. try:
  124. if reader.poll(0):
  125. ready, task = True, reader.recv()
  126. else:
  127. ready, task = False, None
  128. except (IOError, EOFError) as exc:
  129. debug('result handler got %r -- exiting' % (exc, ))
  130. raise CoroStop()
  131. if self._state:
  132. assert self._state == TERMINATE
  133. debug('result handler found thread._state==TERMINATE')
  134. raise CoroStop()
  135. if ready:
  136. if task is None:
  137. debug('result handler got sentinel -- exiting')
  138. raise CoroStop()
  139. on_state_change(task)
  140. def handle_event(self, fileno=None, event=None):
  141. if self._state == RUN:
  142. it = self._it
  143. if it is None:
  144. it = self._it = self._process_result()
  145. next(it)
  146. try:
  147. it.send(fileno)
  148. except (StopIteration, CoroStop):
  149. self._it = None
  150. def on_stop_not_started(self):
  151. cache = self.cache
  152. check_timeouts = self.check_timeouts
  153. fileno_to_outq = self.fileno_to_outq
  154. on_state_change = self.on_state_change
  155. join_exited_workers = self.join_exited_workers
  156. outqueues = set(fileno_to_outq)
  157. while cache and outqueues and self._state != TERMINATE:
  158. if check_timeouts is not None:
  159. check_timeouts()
  160. for fd in outqueues:
  161. proc = fileno_to_outq[fd]
  162. reader = proc.outq._reader
  163. try:
  164. if reader.poll(0):
  165. task = reader.recv()
  166. else:
  167. task = None
  168. sleep(0.5)
  169. except (IOError, EOFError):
  170. outqueues.discard(fd)
  171. continue
  172. else:
  173. if task:
  174. on_state_change(task)
  175. try:
  176. join_exited_workers(shutdown=True)
  177. except WorkersJoined:
  178. debug('result handler: all workers terminated')
  179. return
  180. class AsynPool(_pool.Pool):
  181. ResultHandler = ResultHandler
  182. def __init__(self, processes=None, *args, **kwargs):
  183. processes = self.cpu_count() if processes is None else processes
  184. self._queuepairs = dict((self.create_process_queuepair(), None)
  185. for _ in range(processes))
  186. self._fileno_to_inq = {}
  187. self._fileno_to_outq = {}
  188. super(AsynPool, self).__init__(processes, *args, **kwargs)
  189. for proc in self._pool:
  190. self._fileno_to_inq[proc.inqW_fd] = proc
  191. self._fileno_to_outq[proc.outqR_fd] = proc
  192. def _finalize_args(self):
  193. orig = super(AsynPool, self)._finalize_args()
  194. return (self._fileno_to_inq, orig)
  195. def get_process_queuepair(self):
  196. return next(pair for pair, owner in items(self._queuepairs)
  197. if owner is None)
  198. def create_process_queuepair(self):
  199. inq, outq = _SimpleQueue(), _SimpleQueue()
  200. inq._writer.setblocking(0)
  201. return inq, outq
  202. def on_job_process_down(self, job):
  203. if not job._accepted and job._write_to:
  204. self.on_partial_reaad(job, job._write_to)
  205. def on_job_process_lost(self, job, pid, exitcode):
  206. self.mark_as_worker_lost(job, exitcode)
  207. def _process_cleanup_queuepair(self, proc):
  208. try:
  209. self._queuepairs[self._find_worker_queuepair(proc)] = None
  210. except (KeyError, ValueError):
  211. pass
  212. @staticmethod
  213. def _stop_task_handler(task_handler):
  214. for worker in task_handler.pool:
  215. # send sentinels
  216. worker.inq.put(None)
  217. def create_result_handler(self):
  218. return super(AsynPool, self).create_result_handler(
  219. fileno_to_outq=self._fileno_to_outq,
  220. )
  221. def _process_register_queuepair(self, proc, pair):
  222. self._queuepairs[pair] = proc
  223. def _find_worker_queuepair(self, proc):
  224. for pair, owner in items(self._queuepairs):
  225. if owner == proc:
  226. return pair
  227. raise ValueError(proc)
  228. def _setup_queues(self):
  229. self._inqueue = self._outqueue = \
  230. self._quick_put = self._quick_get = self._poll_result = None
  231. def on_partial_read(self, job, proc):
  232. resq = proc.outq._reader
  233. # empty result queue buffer
  234. while resq.poll(0):
  235. self.handle_result_event(resq.fileno())
  236. # worker terminated by signal:
  237. # we cannot reuse the sockets again, because we don't know if
  238. # the process wrote/read anything frmo them, and if so we cannot
  239. # restore the message boundaries.
  240. if proc.exitcode < 0:
  241. # job was not acked, so find another worker to send it to.
  242. if not job._accepted:
  243. self._put_back(job)
  244. for conn in (proc.inq, proc.outq):
  245. for sock in (conn._reader, conn._writer):
  246. if not sock.closed:
  247. os.close(sock.fileno())
  248. self._queuepairs[(proc.inq, proc.outq)] = \
  249. self._queuepairs[self.create_process_queuepair()] = None
  250. @classmethod
  251. def _set_result_sentinel(cls, _outqueue, _pool):
  252. pass
  253. def _help_stuff_finish_args(self):
  254. return (self._fileno_to_inq, )
  255. @classmethod
  256. def _help_stuff_finish(cls, fileno_to_inq):
  257. # task_handler may be blocked trying to put items on inqueue
  258. debug(
  259. 'removing tasks from inqueue until task handler finished',
  260. )
  261. inqueues = set(fileno_to_inq)
  262. while inqueues:
  263. readable, _, again = _select(inqueues, timeout=0.5)
  264. if again:
  265. continue
  266. if not readable:
  267. break
  268. for fd in readable:
  269. fileno_to_inq[fd]._reader.recv()
  270. sleep(0)
  271. class TaskPool(BasePool):
  272. """Multiprocessing Pool implementation."""
  273. Pool = _pool.Pool
  274. uses_semaphore = True
  275. def on_start(self):
  276. """Run the task pool.
  277. Will pre-fork all workers so they're ready to accept tasks.
  278. """
  279. if self.options.get('maxtasksperchild'):
  280. try:
  281. import _billiard # noqa
  282. _billiard.Connection.send_offset
  283. except (ImportError, AttributeError):
  284. # billiard C extension not installed
  285. warning(MAXTASKS_NO_BILLIARD)
  286. forking_enable(self.forking_enable)
  287. Pool = self.Pool if self.options.get('threads', True) else AsynPool
  288. P = self._pool = Pool(processes=self.limit,
  289. initializer=process_initializer,
  290. **self.options)
  291. self.on_apply = P.apply_async
  292. self.on_soft_timeout = P._timeout_handler.on_soft_timeout
  293. self.on_hard_timeout = P._timeout_handler.on_hard_timeout
  294. self.maintain_pool = P.maintain_pool
  295. self.terminate_job = self._pool.terminate_job
  296. self.grow = self._pool.grow
  297. self.shrink = self._pool.shrink
  298. self.restart = self._pool.restart
  299. self.maybe_handle_result = P._result_handler.handle_event
  300. self.outbound_buffer = deque()
  301. self.handle_result_event = P.handle_result_event
  302. self._all_inqueues = set(p.inqW_fd for p in P._pool)
  303. self._active_writes = set()
  304. self._active_writers = set()
  305. def did_start_ok(self):
  306. return self._pool.did_start_ok()
  307. def on_stop(self):
  308. """Gracefully stop the pool."""
  309. if self._pool is not None and self._pool._state in (RUN, CLOSE):
  310. self._pool.close()
  311. self._pool.join()
  312. self._pool = None
  313. def on_terminate(self):
  314. """Force terminate the pool."""
  315. if self._pool is not None:
  316. self._pool.terminate()
  317. self._pool = None
  318. def on_close(self):
  319. if self._pool is not None and self._pool._state == RUN:
  320. self._pool.close()
  321. def _get_info(self):
  322. return {'max-concurrency': self.limit,
  323. 'processes': [p.pid for p in self._pool._pool],
  324. 'max-tasks-per-child': self._pool._maxtasksperchild,
  325. 'put-guarded-by-semaphore': self.putlocks,
  326. 'timeouts': (self._pool.soft_timeout, self._pool.timeout)}
  327. def on_poll_init(self, w, hub,
  328. now=time, protocol=HIGHEST_PROTOCOL, pack=struct.pack,
  329. dumps=_pickle.dumps):
  330. pool = self._pool
  331. apply_after = hub.timer.apply_after
  332. apply_at = hub.timer.apply_at
  333. maintain_pool = self.maintain_pool
  334. on_soft_timeout = self.on_soft_timeout
  335. on_hard_timeout = self.on_hard_timeout
  336. outbound = self.outbound_buffer
  337. pop_message = outbound.popleft
  338. put_message = outbound.append
  339. fileno_to_inq = pool._fileno_to_inq
  340. fileno_to_outq = pool._fileno_to_outq
  341. all_inqueues = self._all_inqueues
  342. active_writes = self._active_writes
  343. add_coro = hub.add_coro
  344. diff = all_inqueues.difference
  345. hub_add, hub_remove = hub.add, hub.remove
  346. mark_write_fd_as_active = active_writes.add
  347. mark_write_gen_as_active = self._active_writers.add
  348. write_generator_gone = self._active_writers.discard
  349. get_job = pool._cache.__getitem__
  350. pool._put_back = put_message
  351. # did_start_ok will verify that pool processes were able to start,
  352. # but this will only work the first time we start, as
  353. # maxtasksperchild will mess up metrics.
  354. if not w.consumer.restart_count and not pool.did_start_ok():
  355. raise WorkerLostError('Could not start worker processes')
  356. hub_add(pool.process_sentinels, self.maintain_pool, READ | ERR)
  357. hub_add(fileno_to_outq, self.handle_result_event, READ | ERR)
  358. for handler, interval in items(self.timers):
  359. hub.timer.apply_interval(interval * 1000.0, handler)
  360. # need to handle pool results before every task
  361. # since multiple tasks can be received in a single poll()
  362. # XXX do we need this now?!?
  363. # hub.on_task.append(pool.maybe_handle_result)
  364. def on_timeout_set(R, soft, hard):
  365. def _on_soft_timeout():
  366. if hard:
  367. R._tref = apply_at(now() + (hard - soft),
  368. on_hard_timeout, (R, ))
  369. on_soft_timeout(R)
  370. if soft:
  371. R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
  372. elif hard:
  373. R._tref = apply_after(hard * 1000.0,
  374. on_hard_timeout, (R, ))
  375. self._pool.on_timeout_set = on_timeout_set
  376. def on_timeout_cancel(result):
  377. try:
  378. result._tref.cancel()
  379. delattr(result, '_tref')
  380. except AttributeError:
  381. pass
  382. self._pool.on_timeout_cancel = on_timeout_cancel
  383. def on_process_up(proc):
  384. fileno_to_inq[proc.inqW_fd] = proc
  385. fileno_to_outq[proc.outqR_fd] = proc
  386. all_inqueues.add(proc.inqW_fd)
  387. hub_add(proc.sentinel, maintain_pool, READ | ERR)
  388. hub_add(proc.outqR_fd, pool.handle_result_event, READ | ERR)
  389. self._pool.on_process_up = on_process_up
  390. def on_process_down(proc):
  391. fileno_to_outq.pop(proc.outqR_fd, None)
  392. fileno_to_inq.pop(proc.inqW_fd, None)
  393. all_inqueues.discard(proc.inqW_fd)
  394. hub_remove(proc.sentinel)
  395. hub_remove(proc.outqR_fd)
  396. self._pool.on_process_down = on_process_down
  397. def _write_to(fd, job, callback=None):
  398. header, body, body_size = job._payload
  399. try:
  400. try:
  401. proc = fileno_to_inq[fd]
  402. except KeyError:
  403. put_message(job)
  404. raise StopIteration()
  405. send_offset = proc.inq._writer.send_offset
  406. # job result keeps track of what process the job is sent to.
  407. job._write_to = proc
  408. Hw = Bw = 0
  409. while Hw < 4:
  410. try:
  411. Hw += send_offset(header, Hw)
  412. except Exception as exc:
  413. if get_errno(exc) not in UNAVAIL:
  414. raise
  415. # suspend until more data
  416. yield
  417. while Bw < body_size:
  418. try:
  419. Bw += send_offset(body, Bw)
  420. except Exception as exc:
  421. if get_errno(exc) not in UNAVAIL:
  422. raise
  423. # suspend until more data
  424. yield
  425. finally:
  426. if callback:
  427. callback()
  428. active_writes.discard(fd)
  429. def schedule_writes(ready_fd, events):
  430. try:
  431. job = pop_message()
  432. except IndexError:
  433. for inqfd in diff(active_writes):
  434. hub_remove(inqfd)
  435. else:
  436. if not job._accepted:
  437. callback = promise(write_generator_gone)
  438. cor = _write_to(ready_fd, job, callback=callback)
  439. mark_write_gen_as_active(cor)
  440. mark_write_fd_as_active(ready_fd)
  441. callback.args = (cor, ) # tricky as we need to pass ref
  442. add_coro((ready_fd, ), cor, WRITE)
  443. def on_poll_start(hub):
  444. if outbound:
  445. hub_add(diff(active_writes), schedule_writes, hub.WRITE)
  446. self.on_poll_start = on_poll_start
  447. def quick_put(tup):
  448. body = dumps(tup, protocol=protocol)
  449. body_size = len(body)
  450. header = pack('>I', body_size)
  451. # index 0 is the job ID.
  452. job = get_job(tup[0])
  453. job._payload = header, buffer(body), body_size
  454. put_message(job)
  455. self._pool._quick_put = quick_put
  456. def handle_timeouts(self):
  457. if self._pool._timeout_handler:
  458. self._pool._timeout_handler.handle_event()
  459. def flush(self):
  460. if self.outbound_buffer:
  461. self.outbound_buffer.clear()
  462. try:
  463. if self._pool._state == RUN:
  464. # flush outgoing buffers
  465. intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)
  466. while self._active_writers:
  467. writers = list(self._active_writers)
  468. for gen in writers:
  469. if gen.gi_frame.f_lasti != -1: # generator started?
  470. try:
  471. next(gen)
  472. except StopIteration:
  473. self._active_writers.discard(gen)
  474. # workers may have exited in the meantime.
  475. self.maintain_pool()
  476. sleep(next(intervals)) # don't busyloop
  477. finally:
  478. self.outbound_buffer.clear()
  479. self._active_writers.clear()
  480. @property
  481. def num_processes(self):
  482. return self._pool._processes
  483. @property
  484. def timers(self):
  485. return {self.maintain_pool: 5.0}