123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- # -*- coding: utf-8 -*-
- """
- celery.worker.components
- ~~~~~~~~~~~~~~~~~~~~~~~~
- Default worker bootsteps.
- """
- from __future__ import absolute_import
- import atexit
- import warnings
- from kombu.async import Hub as _Hub, get_event_loop, set_event_loop
- from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
- from kombu.async.timer import Timer as _Timer
- from celery import bootsteps
- from celery._state import _set_task_join_will_block
- from celery.exceptions import ImproperlyConfigured
- from celery.five import string_t
- from celery.utils.log import worker_logger as logger
- __all__ = ['Timer', 'Hub', 'Queues', 'Pool', 'Beat', 'StateDB', 'Consumer']
- ERR_B_GREEN = """\
- -B option doesn't work with eventlet/gevent pools: \
- use standalone beat instead.\
- """
- W_POOL_SETTING = """
- The CELERYD_POOL setting should not be used to select the eventlet/gevent
- pools, instead you *must use the -P* argument so that patches are applied
- as early as possible.
- """
- class Timer(bootsteps.Step):
- """This step initializes the internal timer used by the worker."""
- def create(self, w):
- if w.use_eventloop:
- # does not use dedicated timer thread.
- w.timer = _Timer(max_interval=10.0)
- else:
- if not w.timer_cls:
- # Default Timer is set by the pool, as e.g. eventlet
- # needs a custom implementation.
- w.timer_cls = w.pool_cls.Timer
- w.timer = self.instantiate(w.timer_cls,
- max_interval=w.timer_precision,
- on_timer_error=self.on_timer_error,
- on_timer_tick=self.on_timer_tick)
- def on_timer_error(self, exc):
- logger.error('Timer error: %r', exc, exc_info=True)
- def on_timer_tick(self, delay):
- logger.debug('Timer wake-up! Next eta %s secs.', delay)
- class Hub(bootsteps.StartStopStep):
- requires = (Timer, )
- def __init__(self, w, **kwargs):
- w.hub = None
- def include_if(self, w):
- return w.use_eventloop
- def create(self, w):
- w.hub = get_event_loop()
- if w.hub is None:
- w.hub = set_event_loop(_Hub(w.timer))
- self._patch_thread_primitives(w)
- return self
- def start(self, w):
- pass
- def stop(self, w):
- w.hub.close()
- def terminate(self, w):
- w.hub.close()
- def _patch_thread_primitives(self, w):
- # make clock use dummy lock
- w.app.clock.mutex = DummyLock()
- # multiprocessing's ApplyResult uses this lock.
- try:
- from billiard import pool
- except ImportError:
- pass
- else:
- pool.Lock = DummyLock
- class Queues(bootsteps.Step):
- """This bootstep initializes the internal queues
- used by the worker."""
- label = 'Queues (intra)'
- requires = (Hub, )
- def create(self, w):
- w.process_task = w._process_task
- if w.use_eventloop:
- if w.pool_putlocks and w.pool_cls.uses_semaphore:
- w.process_task = w._process_task_sem
- class Pool(bootsteps.StartStopStep):
- """Bootstep managing the worker pool.
- Describes how to initialize the worker pool, and starts and stops
- the pool during worker startup/shutdown.
- Adds attributes:
- * autoscale
- * pool
- * max_concurrency
- * min_concurrency
- """
- requires = (Queues, )
- def __init__(self, w, autoscale=None, autoreload=None,
- no_execv=False, optimization=None, **kwargs):
- if isinstance(autoscale, string_t):
- max_c, _, min_c = autoscale.partition(',')
- autoscale = [int(max_c), min_c and int(min_c) or 0]
- w.autoscale = autoscale
- w.pool = None
- w.max_concurrency = None
- w.min_concurrency = w.concurrency
- w.no_execv = no_execv
- if w.autoscale:
- w.max_concurrency, w.min_concurrency = w.autoscale
- self.autoreload_enabled = autoreload
- self.optimization = optimization
- def close(self, w):
- if w.pool:
- w.pool.close()
- def terminate(self, w):
- if w.pool:
- w.pool.terminate()
- def create(self, w, semaphore=None, max_restarts=None):
- if w.app.conf.CELERYD_POOL in ('eventlet', 'gevent'):
- warnings.warn(UserWarning(W_POOL_SETTING))
- threaded = not w.use_eventloop
- procs = w.min_concurrency
- forking_enable = w.no_execv if w.force_execv else True
- if not threaded:
- semaphore = w.semaphore = LaxBoundedSemaphore(procs)
- w._quick_acquire = w.semaphore.acquire
- w._quick_release = w.semaphore.release
- max_restarts = 100
- allow_restart = self.autoreload_enabled or w.pool_restarts
- pool = w.pool = self.instantiate(
- w.pool_cls, w.min_concurrency,
- initargs=(w.app, w.hostname),
- maxtasksperchild=w.max_tasks_per_child,
- timeout=w.task_time_limit,
- soft_timeout=w.task_soft_time_limit,
- putlocks=w.pool_putlocks and threaded,
- lost_worker_timeout=w.worker_lost_wait,
- threads=threaded,
- max_restarts=max_restarts,
- allow_restart=allow_restart,
- forking_enable=forking_enable,
- semaphore=semaphore,
- sched_strategy=self.optimization,
- )
- _set_task_join_will_block(pool.task_join_will_block)
- return pool
- def info(self, w):
- return {'pool': w.pool.info if w.pool else 'N/A'}
- def register_with_event_loop(self, w, hub):
- w.pool.register_with_event_loop(hub)
- class Beat(bootsteps.StartStopStep):
- """Step used to embed a beat process.
- This will only be enabled if the ``beat``
- argument is set.
- """
- label = 'Beat'
- conditional = True
- def __init__(self, w, beat=False, **kwargs):
- self.enabled = w.beat = beat
- w.beat = None
- def create(self, w):
- from celery.beat import EmbeddedService
- if w.pool_cls.__module__.endswith(('gevent', 'eventlet')):
- raise ImproperlyConfigured(ERR_B_GREEN)
- b = w.beat = EmbeddedService(app=w.app,
- schedule_filename=w.schedule_filename,
- scheduler_cls=w.scheduler_cls)
- return b
- class StateDB(bootsteps.Step):
- """This bootstep sets up the workers state db if enabled."""
- def __init__(self, w, **kwargs):
- self.enabled = w.state_db
- w._persistence = None
- def create(self, w):
- w._persistence = w.state.Persistent(w.state, w.state_db, w.app.clock)
- atexit.register(w._persistence.save)
- class Consumer(bootsteps.StartStopStep):
- last = True
- def create(self, w):
- if w.max_concurrency:
- prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier
- else:
- prefetch_count = w.concurrency * w.prefetch_multiplier
- c = w.consumer = self.instantiate(
- w.consumer_cls, w.process_task,
- hostname=w.hostname,
- send_events=w.send_events,
- init_callback=w.ready_callback,
- initial_prefetch_count=prefetch_count,
- pool=w.pool,
- timer=w.timer,
- app=w.app,
- controller=w,
- hub=w.hub,
- worker_options=w.options,
- disable_rate_limits=w.disable_rate_limits,
- prefetch_multiplier=w.prefetch_multiplier,
- )
- return c
|