__init__.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by boot-steps
  7. (mod:`celery.worker.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import atexit
  11. import socket
  12. import sys
  13. import time
  14. import traceback
  15. from functools import partial
  16. from threading import Event
  17. from billiard import cpu_count, forking_enable
  18. from billiard.exceptions import WorkerLostError
  19. from kombu.syn import detect_environment
  20. from kombu.utils.finalize import Finalize
  21. from celery import concurrency as _concurrency
  22. from celery import platforms
  23. from celery import signals
  24. from celery.app import app_or_default, set_default_app
  25. from celery.app.abstract import configurated, from_config
  26. from celery.exceptions import (
  27. ImproperlyConfigured, SystemTerminate, TaskRevokedError,
  28. )
  29. from celery.task import trace
  30. from celery.utils import worker_direct
  31. from celery.utils.imports import qualname, reload_from_cwd
  32. from celery.utils.log import get_logger, mlevel
  33. from celery.utils.timer2 import Schedule
  34. from . import bootsteps
  35. from . import state
  36. from .buckets import TaskBucket, FastQueue
  37. from .hub import Hub, BoundedSemaphore
  38. try:
  39. from greenlet import GreenletExit
  40. IGNORE_ERRORS = (GreenletExit, )
  41. except ImportError: # pragma: no cover
  42. IGNORE_ERRORS = ()
  43. RUN = 0x1
  44. CLOSE = 0x2
  45. TERMINATE = 0x3
  46. UNKNOWN_QUEUE = """\
  47. Trying to select queue subset of {0!r}, but queue {1} is not
  48. defined in the CELERY_QUEUES setting.
  49. If you want to automatically declare unknown queues you can
  50. enable the CELERY_CREATE_MISSING_QUEUES setting.
  51. """
  52. logger = get_logger(__name__)
  53. class Namespace(bootsteps.Namespace):
  54. """This is the boot-step namespace of the :class:`WorkController`.
  55. It loads modules from :setting:`CELERYD_BOOT_STEPS`, and its
  56. own set of built-in boot-step modules.
  57. """
  58. name = 'worker'
  59. builtin_boot_steps = ('celery.worker.autoscale',
  60. 'celery.worker.autoreload',
  61. 'celery.worker.consumer',
  62. 'celery.worker.mediator')
  63. def modules(self):
  64. return (self.builtin_boot_steps
  65. + self.app.conf.CELERYD_BOOT_STEPS)
  66. class Pool(bootsteps.StartStopComponent):
  67. """The pool component.
  68. Describes how to initialize the worker pool, and starts and stops
  69. the pool during worker startup/shutdown.
  70. Adds attributes:
  71. * autoscale
  72. * pool
  73. * max_concurrency
  74. * min_concurrency
  75. """
  76. name = 'worker.pool'
  77. requires = ('queues', )
  78. def __init__(self, w, autoscale=None, no_execv=False, **kwargs):
  79. if isinstance(autoscale, basestring):
  80. max_c, _, min_c = autoscale.partition(',')
  81. autoscale = [int(max_c), min_c and int(min_c) or 0]
  82. w.autoscale = autoscale
  83. w.pool = None
  84. w.max_concurrency = None
  85. w.min_concurrency = w.concurrency
  86. w.no_execv = no_execv
  87. if w.autoscale:
  88. w.max_concurrency, w.min_concurrency = w.autoscale
  89. def on_poll_init(self, pool, hub):
  90. apply_after = hub.timer.apply_after
  91. apply_at = hub.timer.apply_at
  92. on_soft_timeout = pool.on_soft_timeout
  93. on_hard_timeout = pool.on_hard_timeout
  94. maintain_pool = pool.maintain_pool
  95. add_reader = hub.add_reader
  96. remove = hub.remove
  97. now = time.time
  98. if not pool.did_start_ok():
  99. raise WorkerLostError('Could not start worker processes')
  100. # need to handle pool results before every task
  101. # since multiple tasks can be received in a single poll()
  102. hub.on_task.append(pool.maybe_handle_result)
  103. hub.update_readers(pool.readers)
  104. for handler, interval in pool.timers.iteritems():
  105. hub.timer.apply_interval(interval * 1000.0, handler)
  106. def on_timeout_set(R, soft, hard):
  107. def _on_soft_timeout():
  108. if hard:
  109. R._tref = apply_at(now() + (hard - soft),
  110. on_hard_timeout, (R, ))
  111. on_soft_timeout(R)
  112. if soft:
  113. R._tref = apply_after(soft * 1000.0, _on_soft_timeout)
  114. elif hard:
  115. R._tref = apply_after(hard * 1000.0,
  116. on_hard_timeout, (R, ))
  117. def on_timeout_cancel(result):
  118. try:
  119. result._tref.cancel()
  120. delattr(result, '_tref')
  121. except AttributeError:
  122. pass
  123. pool.init_callbacks(
  124. on_process_up=lambda w: add_reader(w.sentinel, maintain_pool),
  125. on_process_down=lambda w: remove(w.sentinel),
  126. on_timeout_set=on_timeout_set,
  127. on_timeout_cancel=on_timeout_cancel,
  128. )
  129. def create(self, w, semaphore=None, max_restarts=None):
  130. threaded = not w.use_eventloop
  131. forking_enable(not threaded or (w.no_execv or not w.force_execv))
  132. procs = w.min_concurrency
  133. if not threaded:
  134. semaphore = w.semaphore = BoundedSemaphore(procs)
  135. max_restarts = 100
  136. pool = w.pool = self.instantiate(w.pool_cls, w.min_concurrency,
  137. initargs=(w.app, w.hostname),
  138. maxtasksperchild=w.max_tasks_per_child,
  139. timeout=w.task_time_limit,
  140. soft_timeout=w.task_soft_time_limit,
  141. putlocks=w.pool_putlocks and threaded,
  142. lost_worker_timeout=w.worker_lost_wait,
  143. threads=threaded,
  144. max_restarts=max_restarts,
  145. semaphore=semaphore)
  146. if w.hub:
  147. w.hub.on_init.append(partial(self.on_poll_init, pool))
  148. return pool
  149. class Beat(bootsteps.StartStopComponent):
  150. """Component used to embed a celerybeat process.
  151. This will only be enabled if the ``beat``
  152. argument is set.
  153. """
  154. name = 'worker.beat'
  155. def __init__(self, w, beat=False, **kwargs):
  156. self.enabled = w.beat = beat
  157. w.beat = None
  158. def create(self, w):
  159. from celery.beat import EmbeddedService
  160. b = w.beat = EmbeddedService(app=w.app,
  161. schedule_filename=w.schedule_filename,
  162. scheduler_cls=w.scheduler_cls)
  163. return b
  164. class Queues(bootsteps.Component):
  165. """This component initializes the internal queues
  166. used by the worker."""
  167. name = 'worker.queues'
  168. requires = ('ev', )
  169. def create(self, w):
  170. w.start_mediator = True
  171. if not w.pool_cls.rlimit_safe:
  172. w.disable_rate_limits = True
  173. if w.disable_rate_limits:
  174. w.ready_queue = FastQueue()
  175. if w.use_eventloop:
  176. w.start_mediator = False
  177. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  178. w.ready_queue.put = w.process_task_sem
  179. else:
  180. w.ready_queue.put = w.process_task
  181. elif not w.pool_cls.requires_mediator:
  182. # just send task directly to pool, skip the mediator.
  183. w.ready_queue.put = w.process_task
  184. w.start_mediator = False
  185. else:
  186. w.ready_queue = TaskBucket(task_registry=w.app.tasks)
  187. class EvLoop(bootsteps.StartStopComponent):
  188. name = 'worker.ev'
  189. def __init__(self, w, **kwargs):
  190. w.hub = None
  191. def include_if(self, w):
  192. return w.use_eventloop
  193. def create(self, w):
  194. w.timer = Schedule(max_interval=10)
  195. hub = w.hub = Hub(w.timer)
  196. return hub
  197. class Timers(bootsteps.Component):
  198. """This component initializes the internal timers used by the worker."""
  199. name = 'worker.timers'
  200. requires = ('pool', )
  201. def include_if(self, w):
  202. return not w.use_eventloop
  203. def create(self, w):
  204. if not w.timer_cls:
  205. # Default Timer is set by the pool, as e.g. eventlet
  206. # needs a custom implementation.
  207. w.timer_cls = w.pool.Timer
  208. w.timer = self.instantiate(w.pool.Timer,
  209. max_interval=w.timer_precision,
  210. on_timer_error=self.on_timer_error,
  211. on_timer_tick=self.on_timer_tick)
  212. def on_timer_error(self, exc):
  213. logger.error('Timer error: %r', exc, exc_info=True)
  214. def on_timer_tick(self, delay):
  215. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  216. class StateDB(bootsteps.Component):
  217. """This component sets up the workers state db if enabled."""
  218. name = 'worker.state-db'
  219. def __init__(self, w, **kwargs):
  220. self.enabled = w.state_db
  221. w._persistence = None
  222. def create(self, w):
  223. w._persistence = state.Persistent(w.state_db)
  224. atexit.register(w._persistence.save)
  225. class WorkController(configurated):
  226. """Unmanaged worker instance."""
  227. RUN = RUN
  228. CLOSE = CLOSE
  229. TERMINATE = TERMINATE
  230. app = None
  231. concurrency = from_config()
  232. loglevel = from_config('log_level')
  233. logfile = from_config('log_file')
  234. send_events = from_config()
  235. pool_cls = from_config('pool')
  236. consumer_cls = from_config('consumer')
  237. mediator_cls = from_config('mediator')
  238. timer_cls = from_config('timer')
  239. timer_precision = from_config('timer_precision')
  240. autoscaler_cls = from_config('autoscaler')
  241. autoreloader_cls = from_config('autoreloader')
  242. schedule_filename = from_config()
  243. scheduler_cls = from_config('celerybeat_scheduler')
  244. task_time_limit = from_config()
  245. task_soft_time_limit = from_config()
  246. max_tasks_per_child = from_config()
  247. pool_putlocks = from_config()
  248. force_execv = from_config()
  249. prefetch_multiplier = from_config()
  250. state_db = from_config()
  251. disable_rate_limits = from_config()
  252. worker_lost_wait = from_config()
  253. _state = None
  254. _running = 0
  255. pidlock = None
  256. def __init__(self, app=None, hostname=None, **kwargs):
  257. self.app = app_or_default(app or self.app)
  258. # all new threads start without a current app, so if an app is not
  259. # passed on to the thread it will fall back to the "default app",
  260. # which then could be the wrong app. So for the worker
  261. # we set this to always return our app. This is a hack,
  262. # and means that only a single app can be used for workers
  263. # running in the same process.
  264. set_default_app(self.app)
  265. self.app.finalize()
  266. trace._tasks = self.app._tasks # optimization
  267. self.hostname = hostname or socket.gethostname()
  268. self.on_before_init(**kwargs)
  269. self._finalize = Finalize(self, self.stop, exitpriority=1)
  270. self._shutdown_complete = Event()
  271. self.setup_instance(**self.prepare_args(**kwargs))
  272. def on_before_init(self, **kwargs):
  273. pass
  274. def on_start(self):
  275. pass
  276. def on_consumer_ready(self, consumer):
  277. pass
  278. def setup_instance(self, queues=None, ready_callback=None,
  279. pidfile=None, include=None, **kwargs):
  280. self.pidfile = pidfile
  281. self.app.loader.init_worker()
  282. self.setup_defaults(kwargs, namespace='celeryd')
  283. self.setup_queues(queues)
  284. self.setup_includes(include)
  285. # Set default concurrency
  286. if not self.concurrency:
  287. try:
  288. self.concurrency = cpu_count()
  289. except NotImplementedError:
  290. self.concurrency = 2
  291. # Options
  292. self.loglevel = mlevel(self.loglevel)
  293. self.ready_callback = ready_callback or self.on_consumer_ready
  294. self.use_eventloop = self.should_use_eventloop()
  295. signals.worker_init.send(sender=self)
  296. # Initialize boot steps
  297. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  298. self.components = []
  299. self.namespace = Namespace(app=self.app).apply(self, **kwargs)
  300. def setup_queues(self, queues):
  301. if isinstance(queues, basestring):
  302. queues = queues.split(',')
  303. self.queues = queues
  304. try:
  305. self.app.select_queues(queues)
  306. except KeyError as exc:
  307. raise ImproperlyConfigured(
  308. UNKNOWN_QUEUE.format(queues, exc))
  309. if self.app.conf.CELERY_WORKER_DIRECT:
  310. self.app.amqp.queues.select_add(worker_direct(self.hostname))
  311. def setup_includes(self, includes):
  312. # Update celery_include to have all known task modules, so that we
  313. # ensure all task modules are imported in case an execv happens.
  314. inc = self.app.conf.CELERY_INCLUDE
  315. if includes:
  316. if isinstance(includes, basestring):
  317. includes = includes.split(',')
  318. inc = self.app.conf.CELERY_INCLUDE = tuple(inc) + tuple(includes)
  319. self.include = includes
  320. task_modules = set(task.__class__.__module__
  321. for task in self.app.tasks.itervalues())
  322. self.app.conf.CELERY_INCLUDE = tuple(set(inc) | task_modules)
  323. def prepare_args(self, **kwargs):
  324. return kwargs
  325. def start(self):
  326. """Starts the workers main loop."""
  327. self.on_start()
  328. self._state = self.RUN
  329. if self.pidfile:
  330. self.pidlock = platforms.create_pidlock(self.pidfile)
  331. try:
  332. for i, component in enumerate(self.components):
  333. logger.debug('Starting %s...', qualname(component))
  334. self._running = i + 1
  335. if component:
  336. component.start()
  337. logger.debug('%s OK!', qualname(component))
  338. except SystemTerminate:
  339. self.terminate()
  340. except Exception as exc:
  341. logger.error('Unrecoverable error: %r', exc,
  342. exc_info=True)
  343. self.stop()
  344. except (KeyboardInterrupt, SystemExit):
  345. self.stop()
  346. try:
  347. # Will only get here if running green,
  348. # makes sure all greenthreads have exited.
  349. self._shutdown_complete.wait()
  350. except IGNORE_ERRORS:
  351. pass
  352. run = start # XXX Compat
  353. def process_task_sem(self, req):
  354. return self.semaphore.acquire(self.process_task, req)
  355. def process_task(self, req):
  356. """Process task by sending it to the pool of workers."""
  357. try:
  358. req.execute_using_pool(self.pool)
  359. except TaskRevokedError:
  360. if self.semaphore: # (Issue #877)
  361. self.semaphore.release()
  362. except Exception as exc:
  363. logger.critical('Internal error: %r\n%s',
  364. exc, traceback.format_exc(), exc_info=True)
  365. except SystemTerminate:
  366. self.terminate()
  367. raise
  368. except BaseException as exc:
  369. self.stop()
  370. raise exc
  371. def signal_consumer_close(self):
  372. try:
  373. self.consumer.close()
  374. except AttributeError:
  375. pass
  376. def should_use_eventloop(self):
  377. return (detect_environment() == 'default' and
  378. self.app.connection().is_evented and not self.app.IS_WINDOWS)
  379. def stop(self, in_sighandler=False):
  380. """Graceful shutdown of the worker server."""
  381. self.signal_consumer_close()
  382. if not in_sighandler or self.pool.signal_safe:
  383. self._shutdown(warm=True)
  384. def terminate(self, in_sighandler=False):
  385. """Not so graceful shutdown of the worker server."""
  386. self.signal_consumer_close()
  387. if not in_sighandler or self.pool.signal_safe:
  388. self._shutdown(warm=False)
  389. def _shutdown(self, warm=True):
  390. what = 'Stopping' if warm else 'Terminating'
  391. if self._state in (self.CLOSE, self.TERMINATE):
  392. return
  393. self.app.loader.shutdown_worker()
  394. if self.pool:
  395. self.pool.close()
  396. if self._state != self.RUN or self._running != len(self.components):
  397. # Not fully started, can safely exit.
  398. self._state = self.TERMINATE
  399. self._shutdown_complete.set()
  400. return
  401. self._state = self.CLOSE
  402. for component in reversed(self.components):
  403. logger.debug('%s %s...', what, qualname(component))
  404. if component:
  405. stop = component.stop
  406. if not warm:
  407. stop = getattr(component, 'terminate', None) or stop
  408. stop()
  409. self.timer.stop()
  410. self.consumer.close_connection()
  411. if self.pidlock:
  412. self.pidlock.release()
  413. self._state = self.TERMINATE
  414. self._shutdown_complete.set()
  415. def reload(self, modules=None, reload=False, reloader=None):
  416. modules = self.app.loader.task_modules if modules is None else modules
  417. imp = self.app.loader.import_from_cwd
  418. for module in set(modules or ()):
  419. if module not in sys.modules:
  420. logger.debug('importing module %s', module)
  421. imp(module)
  422. elif reload:
  423. logger.debug('reloading module %s', module)
  424. reload_from_cwd(sys.modules[module], reloader)
  425. self.pool.restart()
  426. @property
  427. def state(self):
  428. return state