__init__.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.worker.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import atexit
  11. import logging
  12. import socket
  13. import sys
  14. import time
  15. import traceback
  16. from functools import partial
  17. from weakref import WeakValueDictionary
  18. from billiard.exceptions import WorkerLostError
  19. from billiard.util import Finalize
  20. from kombu.syn import detect_environment
  21. from celery import concurrency as _concurrency
  22. from celery import platforms
  23. from celery import signals
  24. from celery.app import app_or_default
  25. from celery.app.abstract import configurated, from_config
  26. from celery.exceptions import SystemTerminate, TaskRevokedError
  27. from celery.utils.functional import noop
  28. from celery.utils.imports import qualname, reload_from_cwd
  29. from celery.utils.log import get_logger
  30. from celery.utils.threads import Event
  31. from celery.utils.timer2 import Schedule
  32. from . import bootsteps
  33. from . import state
  34. from .buckets import TaskBucket, AsyncTaskBucket, FastQueue
  35. from .hub import Hub, BoundedSemaphore
  36. #: Worker states
  37. RUN = 0x1
  38. CLOSE = 0x2
  39. TERMINATE = 0x3
  40. #: Default socket timeout at shutdown.
  41. SHUTDOWN_SOCKET_TIMEOUT = 5.0
  42. logger = get_logger(__name__)
  43. class Namespace(bootsteps.Namespace):
  44. """This is the boot-step namespace of the :class:`WorkController`.
  45. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  46. own set of built-in boot-step modules.
  47. """
  48. name = 'worker'
  49. builtin_boot_steps = ('celery.worker.autoscale',
  50. 'celery.worker.autoreload',
  51. 'celery.worker.consumer',
  52. 'celery.worker.mediator')
  53. def modules(self):
  54. return self.builtin_boot_steps + self.app.conf.CELERYD_BOOT_STEPS
  55. class Pool(bootsteps.StartStopComponent):
  56. """The pool component.
  57. Describes how to initialize the worker pool, and starts and stops
  58. the pool during worker startup/shutdown.
  59. Adds attributes:
  60. * autoscale
  61. * pool
  62. * max_concurrency
  63. * min_concurrency
  64. """
  65. name = 'worker.pool'
  66. requires = ('queues', 'beat', )
  67. def __init__(self, w,
  68. autoscale=None, autoreload=False, no_execv=False, **kwargs):
  69. w.autoscale = autoscale
  70. w.pool = None
  71. w.max_concurrency = None
  72. w.min_concurrency = w.concurrency
  73. w.no_execv = no_execv
  74. if w.autoscale:
  75. w.max_concurrency, w.min_concurrency = w.autoscale
  76. self.autoreload_enabled = autoreload
  77. def on_poll_init(self, pool, w, hub):
  78. apply_after = hub.timer.apply_after
  79. apply_at = hub.timer.apply_at
  80. on_soft_timeout = pool.on_soft_timeout
  81. on_hard_timeout = pool.on_hard_timeout
  82. maintain_pool = pool.maintain_pool
  83. add_reader = hub.add_reader
  84. remove = hub.remove
  85. now = time.time
  86. cache = pool._pool._cache
  87. # did_start_ok will verify that pool processes were able to start,
  88. # but this will only work the first time we start, as
  89. # maxtasksperchild will mess up metrics.
  90. if not w.consumer.restart_count and not pool.did_start_ok():
  91. raise WorkerLostError('Could not start worker processes')
  92. # need to handle pool results before every task
  93. # since multiple tasks can be received in a single poll()
  94. hub.on_task.append(pool.maybe_handle_result)
  95. hub.update_readers(pool.readers)
  96. for handler, interval in pool.timers.iteritems():
  97. hub.timer.apply_interval(interval * 1000.0, handler)
  98. trefs = pool._tref_for_id = WeakValueDictionary()
  99. def _discard_tref(job):
  100. try:
  101. tref = trefs.pop(job)
  102. tref.cancel()
  103. del(tref)
  104. except (KeyError, AttributeError):
  105. pass # out of scope
  106. def _on_hard_timeout(job):
  107. try:
  108. result = cache[job]
  109. except KeyError:
  110. pass # job ready
  111. else:
  112. on_hard_timeout(result)
  113. finally:
  114. # remove tref
  115. _discard_tref(job)
  116. def _on_soft_timeout(job, soft, hard, hub):
  117. if hard:
  118. trefs[job] = apply_at(
  119. now() + (hard - soft),
  120. _on_hard_timeout, (job, ),
  121. )
  122. try:
  123. result = cache[job]
  124. except KeyError:
  125. pass # job ready
  126. else:
  127. on_soft_timeout(result)
  128. finally:
  129. if not hard:
  130. # remove tref
  131. _discard_tref(job)
  132. def on_timeout_set(R, soft, hard):
  133. if soft:
  134. trefs[R._job] = apply_after(
  135. soft * 1000.0,
  136. _on_soft_timeout, (R._job, soft, hard, hub),
  137. )
  138. elif hard:
  139. trefs[R._job] = apply_after(
  140. hard * 1000.0,
  141. _on_hard_timeout, (R._job, )
  142. )
  143. def on_timeout_cancel(R):
  144. _discard_tref(R._job)
  145. pool.init_callbacks(
  146. on_process_up=lambda w: add_reader(w.sentinel, maintain_pool),
  147. on_process_down=lambda w: remove(w.sentinel),
  148. on_timeout_set=on_timeout_set,
  149. on_timeout_cancel=on_timeout_cancel,
  150. )
  151. def create(self, w, semaphore=None, max_restarts=None):
  152. threaded = not w.use_eventloop
  153. procs = w.min_concurrency
  154. forking_enable = w.no_execv or not w.force_execv
  155. if not threaded:
  156. semaphore = w.semaphore = BoundedSemaphore(procs)
  157. w._quick_acquire = w.semaphore.acquire
  158. w._quick_release = w.semaphore.release
  159. max_restarts = 100
  160. allow_restart = self.autoreload_enabled or w.pool_restarts
  161. pool = w.pool = self.instantiate(
  162. w.pool_cls, w.min_concurrency,
  163. initargs=(w.app, w.hostname),
  164. maxtasksperchild=w.max_tasks_per_child,
  165. timeout=w.task_time_limit,
  166. soft_timeout=w.task_soft_time_limit,
  167. putlocks=w.pool_putlocks and threaded,
  168. lost_worker_timeout=w.worker_lost_wait,
  169. threads=threaded,
  170. max_restarts=max_restarts,
  171. allow_restart=allow_restart,
  172. forking_enable=forking_enable,
  173. semaphore=semaphore,
  174. callbacks_propagate=(
  175. w._conninfo.connection_errors + w._conninfo.channel_errors
  176. ),
  177. )
  178. if w.hub:
  179. w.hub.on_init.append(partial(self.on_poll_init, pool, w))
  180. return pool
  181. class Beat(bootsteps.StartStopComponent):
  182. """Component used to embed a celerybeat process.
  183. This will only be enabled if the ``beat``
  184. argument is set.
  185. """
  186. name = 'worker.beat'
  187. def __init__(self, w, beat=False, **kwargs):
  188. self.enabled = w.beat = beat
  189. w.beat = None
  190. def create(self, w):
  191. from celery.beat import EmbeddedService
  192. b = w.beat = EmbeddedService(app=w.app,
  193. schedule_filename=w.schedule_filename,
  194. scheduler_cls=w.scheduler_cls)
  195. return b
  196. class Queues(bootsteps.Component):
  197. """This component initializes the internal queues
  198. used by the worker."""
  199. name = 'worker.queues'
  200. requires = ('ev', )
  201. def create(self, w):
  202. BucketType = TaskBucket
  203. w.start_mediator = w.pool_cls.requires_mediator
  204. if not w.pool_cls.rlimit_safe:
  205. BucketType = AsyncTaskBucket
  206. process_task = w.process_task
  207. if w.use_eventloop:
  208. BucketType = AsyncTaskBucket
  209. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  210. process_task = w.process_task_sem
  211. if w.disable_rate_limits or not w.start_mediator:
  212. w.ready_queue = FastQueue()
  213. if not w.start_mediator:
  214. w.ready_queue.put = process_task
  215. else:
  216. w.ready_queue = BucketType(
  217. task_registry=w.app.tasks, callback=process_task, worker=w,
  218. )
  219. class EvLoop(bootsteps.StartStopComponent):
  220. name = 'worker.ev'
  221. def __init__(self, w, **kwargs):
  222. w.hub = None
  223. def include_if(self, w):
  224. return w.use_eventloop
  225. def create(self, w):
  226. w.timer = Schedule(max_interval=10)
  227. hub = w.hub = Hub(w.timer)
  228. return hub
  229. class Timers(bootsteps.Component):
  230. """This component initializes the internal timers used by the worker."""
  231. name = 'worker.timers'
  232. requires = ('pool', )
  233. def include_if(self, w):
  234. return not w.use_eventloop
  235. def create(self, w):
  236. if not w.timer_cls:
  237. # Default Timer is set by the pool, as e.g. eventlet
  238. # needs a custom implementation.
  239. w.timer_cls = w.pool.Timer
  240. w.timer = self.instantiate(w.pool.Timer,
  241. max_interval=w.timer_precision,
  242. on_timer_error=self.on_timer_error,
  243. on_timer_tick=self.on_timer_tick)
  244. def on_timer_error(self, exc):
  245. logger.error('Timer error: %r', exc, exc_info=True)
  246. def on_timer_tick(self, delay):
  247. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  248. class StateDB(bootsteps.Component):
  249. """This component sets up the workers state db if enabled."""
  250. name = 'worker.state-db'
  251. def __init__(self, w, **kwargs):
  252. self.enabled = w.state_db
  253. w._persistence = None
  254. def create(self, w):
  255. w._persistence = state.Persistent(w.state_db)
  256. atexit.register(w._persistence.save)
  257. class WorkController(configurated):
  258. """Unmanaged worker instance."""
  259. RUN = RUN
  260. CLOSE = CLOSE
  261. TERMINATE = TERMINATE
  262. app = None
  263. concurrency = from_config()
  264. loglevel = logging.ERROR
  265. logfile = from_config('log_file')
  266. send_events = from_config()
  267. pool_cls = from_config('pool')
  268. consumer_cls = from_config('consumer')
  269. mediator_cls = from_config('mediator')
  270. timer_cls = from_config('timer')
  271. timer_precision = from_config('timer_precision')
  272. autoscaler_cls = from_config('autoscaler')
  273. autoreloader_cls = from_config('autoreloader')
  274. schedule_filename = from_config()
  275. scheduler_cls = from_config('celerybeat_scheduler')
  276. task_time_limit = from_config()
  277. task_soft_time_limit = from_config()
  278. max_tasks_per_child = from_config()
  279. pool_putlocks = from_config()
  280. pool_restarts = from_config()
  281. force_execv = from_config()
  282. prefetch_multiplier = from_config()
  283. state_db = from_config()
  284. disable_rate_limits = from_config()
  285. worker_lost_wait = from_config()
  286. _state = None
  287. _running = 0
  288. def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
  289. queues=None, app=None, pidfile=None, use_eventloop=None,
  290. **kwargs):
  291. self.app = app_or_default(app or self.app)
  292. self._shutdown_complete = Event()
  293. self.setup_defaults(kwargs, namespace='celeryd')
  294. self.app.select_queues(queues) # select queues subset.
  295. # Options
  296. self.loglevel = loglevel or self.loglevel
  297. self.hostname = hostname or socket.gethostname()
  298. self.ready_callback = ready_callback
  299. self._finalize = [
  300. Finalize(self, self.stop, exitpriority=1),
  301. Finalize(self, self._send_worker_shutdown, exitpriority=10),
  302. ]
  303. self.pidfile = pidfile
  304. self.pidlock = None
  305. # this connection is not established, only used for params
  306. self._conninfo = self.app.connection()
  307. self.use_eventloop = (
  308. self.should_use_eventloop() if use_eventloop is None
  309. else use_eventloop
  310. )
  311. # Update celery_include to have all known task modules, so that we
  312. # ensure all task modules are imported in case an execv happens.
  313. task_modules = set(task.__class__.__module__
  314. for task in self.app.tasks.itervalues())
  315. self.app.conf.CELERY_INCLUDE = tuple(
  316. set(self.app.conf.CELERY_INCLUDE) | task_modules,
  317. )
  318. # Initialize boot steps
  319. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  320. self.components = []
  321. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  322. def _send_worker_shutdown(self):
  323. signals.worker_shutdown.send(sender=self)
  324. def start(self):
  325. """Starts the workers main loop."""
  326. self._state = self.RUN
  327. if self.pidfile:
  328. self.pidlock = platforms.create_pidlock(self.pidfile)
  329. try:
  330. for i, component in enumerate(self.components):
  331. logger.debug('Starting %s...', qualname(component))
  332. self._running = i + 1
  333. if component:
  334. component.start()
  335. logger.debug('%s OK!', qualname(component))
  336. except SystemTerminate:
  337. self.terminate()
  338. except Exception, exc:
  339. logger.error('Unrecoverable error: %r', exc,
  340. exc_info=True)
  341. self.stop()
  342. except (KeyboardInterrupt, SystemExit):
  343. self.stop()
  344. # Will only get here if running green,
  345. # makes sure all greenthreads have exited.
  346. self._shutdown_complete.wait()
  347. def process_task_sem(self, req):
  348. return self._quick_acquire(self.process_task, req)
  349. def process_task(self, req):
  350. """Process task by sending it to the pool of workers."""
  351. try:
  352. req.execute_using_pool(self.pool)
  353. except TaskRevokedError:
  354. try:
  355. self._quick_release() # Issue 877
  356. except AttributeError:
  357. pass
  358. except Exception, exc:
  359. logger.critical('Internal error: %r\n%s',
  360. exc, traceback.format_exc(), exc_info=True)
  361. except SystemTerminate:
  362. self.terminate()
  363. raise
  364. except BaseException, exc:
  365. self.stop()
  366. raise exc
  367. def signal_consumer_close(self):
  368. try:
  369. self.consumer.close()
  370. except AttributeError:
  371. pass
  372. def should_use_eventloop(self):
  373. return (detect_environment() == 'default' and
  374. self._conninfo.is_evented and not self.app.IS_WINDOWS)
  375. def stop(self, in_sighandler=False):
  376. """Graceful shutdown of the worker server."""
  377. self.signal_consumer_close()
  378. if not in_sighandler or self.pool.signal_safe:
  379. self._shutdown(warm=True)
  380. def terminate(self, in_sighandler=False):
  381. """Not so graceful shutdown of the worker server."""
  382. self.signal_consumer_close()
  383. if not in_sighandler or self.pool.signal_safe:
  384. self._shutdown(warm=False)
  385. def _shutdown(self, warm=True):
  386. what = 'Stopping' if warm else 'Terminating'
  387. socket_timeout = socket.getdefaulttimeout()
  388. socket.setdefaulttimeout(SHUTDOWN_SOCKET_TIMEOUT) # Issue 975
  389. if self._state in (self.CLOSE, self.TERMINATE):
  390. return
  391. self.app.loader.shutdown_worker()
  392. if self.pool:
  393. self.pool.close()
  394. if self._state != self.RUN or self._running != len(self.components):
  395. # Not fully started, can safely exit.
  396. self._state = self.TERMINATE
  397. self._shutdown_complete.set()
  398. return
  399. self._state = self.CLOSE
  400. for component in reversed(self.components):
  401. logger.debug('%s %s...', what, qualname(component))
  402. if component:
  403. stop = component.stop
  404. if not warm:
  405. stop = getattr(component, 'terminate', None) or stop
  406. stop()
  407. self.timer.stop()
  408. self.consumer.close_connection()
  409. if self.pidlock:
  410. self.pidlock.release()
  411. self._state = self.TERMINATE
  412. socket.setdefaulttimeout(socket_timeout)
  413. self._shutdown_complete.set()
  414. def reload(self, modules=None, reload=False, reloader=None):
  415. modules = self.app.loader.task_modules if modules is None else modules
  416. imp = self.app.loader.import_from_cwd
  417. for module in set(modules or ()):
  418. if module not in sys.modules:
  419. logger.debug('importing module %s', module)
  420. imp(module)
  421. elif reload:
  422. logger.debug('reloading module %s', module)
  423. reload_from_cwd(sys.modules[module], reloader)
  424. self.pool.restart()
  425. @property
  426. def state(self):
  427. return state