|
@@ -3,7 +3,7 @@
|
|
|
celery.worker.consumer
|
|
|
~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
-This module contains the component responsible for consuming messages
|
|
|
+This module contains the components responsible for consuming messages
|
|
|
from the broker, processing the messages and keeping the broker connections
|
|
|
up and running.
|
|
|
|
|
@@ -12,33 +12,45 @@ from __future__ import absolute_import
|
|
|
|
|
|
import logging
|
|
|
import socket
|
|
|
-import threading
|
|
|
|
|
|
-from time import sleep
|
|
|
-from Queue import Empty
|
|
|
-
|
|
|
-from kombu.common import QoS
|
|
|
+from kombu.common import QoS, ignore_errors
|
|
|
from kombu.syn import _detect_environment
|
|
|
from kombu.utils.encoding import safe_repr
|
|
|
-from kombu.utils.eventio import READ, WRITE, ERR
|
|
|
|
|
|
+from celery import bootsteps
|
|
|
from celery.app import app_or_default
|
|
|
-from celery.datastructures import AttributeDict
|
|
|
-from celery.exceptions import InvalidTaskError, SystemTerminate
|
|
|
from celery.task.trace import build_tracer
|
|
|
-from celery.utils import text
|
|
|
-from celery.utils import timer2
|
|
|
+from celery.utils.timer2 import default_timer, to_timestamp
|
|
|
from celery.utils.functional import noop
|
|
|
from celery.utils.log import get_logger
|
|
|
+from celery.utils.text import truncate
|
|
|
from celery.utils.timeutils import humanize_seconds, timezone
|
|
|
|
|
|
-from . import state
|
|
|
-from .bootsteps import StartStopComponent, RUN, CLOSE
|
|
|
-from .control import Panel
|
|
|
-from .heartbeat import Heart
|
|
|
+from . import heartbeat, loops, pidbox
|
|
|
+from .state import task_reserved, maybe_shutdown
|
|
|
+
|
|
|
+CLOSE = bootsteps.CLOSE
|
|
|
+logger = get_logger(__name__)
|
|
|
+debug, info, warn, error, crit = (logger.debug, logger.info, logger.warn,
|
|
|
+ logger.error, logger.critical)
|
|
|
+
|
|
|
+CONNECTION_RETRY = """\
|
|
|
+consumer: Connection to broker lost. \
|
|
|
+Trying to re-establish the connection...\
|
|
|
+"""
|
|
|
+
|
|
|
+CONNECTION_RETRY_STEP = """\
|
|
|
+Trying again {when}...\
|
|
|
+"""
|
|
|
|
|
|
-#: Heartbeat check is called every heartbeat_seconds' / rate'.
|
|
|
-AMQHEARTBEAT_RATE = 2.0
|
|
|
+CONNECTION_ERROR = """\
|
|
|
+consumer: Cannot connect to %s: %s.
|
|
|
+%s
|
|
|
+"""
|
|
|
+
|
|
|
+CONNECTION_FAILOVER = """\
|
|
|
+Will retry using next failover.\
|
|
|
+"""
|
|
|
|
|
|
UNKNOWN_FORMAT = """\
|
|
|
Received and deleted unknown message. Wrong destination?!?
|
|
@@ -53,7 +65,7 @@ The message has been ignored and discarded.
|
|
|
|
|
|
Did you remember to import the module containing this task?
|
|
|
Or maybe you are using relative imports?
|
|
|
-More: http://docs.celeryq.org/en/latest/userguide/tasks.html#names
|
|
|
+Please see http://bit.ly/gLye1c for more information.
|
|
|
|
|
|
The full contents of the message body was:
|
|
|
%s
|
|
@@ -64,8 +76,8 @@ INVALID_TASK_ERROR = """\
|
|
|
Received invalid task message: %s
|
|
|
The message has been ignored and discarded.
|
|
|
|
|
|
-Please ensure your message conforms to the task message format:
|
|
|
-http://docs.celeryq.org/en/latest/internals/protocol.html
|
|
|
+Please ensure your message conforms to the task
|
|
|
+message protocol as described here: http://bit.ly/hYj41y
|
|
|
|
|
|
The full contents of the message body was:
|
|
|
%s
|
|
@@ -76,112 +88,20 @@ body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
|
|
|
"""
|
|
|
|
|
|
|
|
|
-RETRY_CONNECTION = """\
|
|
|
-consumer: Connection to broker lost. \
|
|
|
-Trying to re-establish the connection...\
|
|
|
-"""
|
|
|
-
|
|
|
-CONNECTION_ERROR = """\
|
|
|
-consumer: Cannot connect to %s: %s.
|
|
|
-%s
|
|
|
-"""
|
|
|
-
|
|
|
-CONNECTION_RETRY = """\
|
|
|
-Trying again {when}...\
|
|
|
-"""
|
|
|
-
|
|
|
-CONNECTION_FAILOVER = """\
|
|
|
-Will retry using next failover.\
|
|
|
-"""
|
|
|
-
|
|
|
-task_reserved = state.task_reserved
|
|
|
-
|
|
|
-logger = get_logger(__name__)
|
|
|
-info, warn, error, crit = (logger.info, logger.warn,
|
|
|
- logger.error, logger.critical)
|
|
|
-
|
|
|
-
|
|
|
-def debug(msg, *args, **kwargs):
|
|
|
- logger.debug('consumer: {0}'.format(msg), *args, **kwargs)
|
|
|
-
|
|
|
-
|
|
|
def dump_body(m, body):
|
|
|
- return '{0} ({1}b)'.format(text.truncate(safe_repr(body), 1024),
|
|
|
+ return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
|
|
|
len(m.body))
|
|
|
|
|
|
|
|
|
-class Component(StartStopComponent):
|
|
|
- name = 'worker.consumer'
|
|
|
- last = True
|
|
|
-
|
|
|
- def Consumer(self, w):
|
|
|
- return (w.consumer_cls or
|
|
|
- Consumer if w.hub else BlockingConsumer)
|
|
|
-
|
|
|
- def create(self, w):
|
|
|
- prefetch_count = w.concurrency * w.prefetch_multiplier
|
|
|
- c = w.consumer = self.instantiate(self.Consumer(w),
|
|
|
- w.ready_queue,
|
|
|
- hostname=w.hostname,
|
|
|
- send_events=w.send_events,
|
|
|
- init_callback=w.ready_callback,
|
|
|
- initial_prefetch_count=prefetch_count,
|
|
|
- pool=w.pool,
|
|
|
- timer=w.timer,
|
|
|
- app=w.app,
|
|
|
- controller=w,
|
|
|
- hub=w.hub)
|
|
|
- return c
|
|
|
-
|
|
|
-
|
|
|
class Consumer(object):
|
|
|
- """Listen for messages received from the broker and
|
|
|
- move them to the ready queue for task processing.
|
|
|
-
|
|
|
- :param ready_queue: See :attr:`ready_queue`.
|
|
|
- :param timer: See :attr:`timer`.
|
|
|
-
|
|
|
- """
|
|
|
|
|
|
- #: The queue that holds tasks ready for immediate processing.
|
|
|
+ #: Intra-queue for tasks ready to be handled
|
|
|
ready_queue = None
|
|
|
|
|
|
- #: Enable/disable events.
|
|
|
- send_events = False
|
|
|
-
|
|
|
- #: Optional callback to be called when the connection is established.
|
|
|
- #: Will only be called once, even if the connection is lost and
|
|
|
- #: re-established.
|
|
|
+ #: Optional callback called the first time the worker
|
|
|
+ #: is ready to receive tasks.
|
|
|
init_callback = None
|
|
|
|
|
|
- #: The current hostname. Defaults to the system hostname.
|
|
|
- hostname = None
|
|
|
-
|
|
|
- #: Initial QoS prefetch count for the task channel.
|
|
|
- initial_prefetch_count = 0
|
|
|
-
|
|
|
- #: A :class:`celery.events.EventDispatcher` for sending events.
|
|
|
- event_dispatcher = None
|
|
|
-
|
|
|
- #: The thread that sends event heartbeats at regular intervals.
|
|
|
- #: The heartbeats are used by monitors to detect that a worker
|
|
|
- #: went offline/disappeared.
|
|
|
- heart = None
|
|
|
-
|
|
|
- #: The broker connection.
|
|
|
- connection = None
|
|
|
-
|
|
|
- #: The consumer used to consume task messages.
|
|
|
- task_consumer = None
|
|
|
-
|
|
|
- #: The consumer used to consume broadcast commands.
|
|
|
- broadcast_consumer = None
|
|
|
-
|
|
|
- #: The process mailbox (kombu pidbox node).
|
|
|
- pidbox_node = None
|
|
|
- _pidbox_node_shutdown = None # used for greenlets
|
|
|
- _pidbox_node_stopped = None # used for greenlets
|
|
|
-
|
|
|
#: The current worker pool instance.
|
|
|
pool = None
|
|
|
|
|
@@ -189,187 +109,188 @@ class Consumer(object):
|
|
|
#: as sending heartbeats.
|
|
|
timer = None
|
|
|
|
|
|
- # Consumer state, can be RUN or CLOSE.
|
|
|
- _state = None
|
|
|
+ class Namespace(bootsteps.Namespace):
|
|
|
+ name = 'Consumer'
|
|
|
+ default_steps = [
|
|
|
+ 'celery.worker.consumer:Connection',
|
|
|
+ 'celery.worker.consumer:Events',
|
|
|
+ 'celery.worker.consumer:Heart',
|
|
|
+ 'celery.worker.consumer:Control',
|
|
|
+ 'celery.worker.consumer:Tasks',
|
|
|
+ 'celery.worker.consumer:Evloop',
|
|
|
+ ]
|
|
|
+
|
|
|
+ def shutdown(self, parent):
|
|
|
+ self.restart(parent, 'Shutdown', 'shutdown')
|
|
|
|
|
|
def __init__(self, ready_queue,
|
|
|
- init_callback=noop, send_events=False, hostname=None,
|
|
|
- initial_prefetch_count=2, pool=None, app=None,
|
|
|
+ init_callback=noop, hostname=None,
|
|
|
+ pool=None, app=None,
|
|
|
timer=None, controller=None, hub=None, amqheartbeat=None,
|
|
|
- **kwargs):
|
|
|
+ worker_options=None, **kwargs):
|
|
|
self.app = app_or_default(app)
|
|
|
- self.connection = None
|
|
|
- self.task_consumer = None
|
|
|
self.controller = controller
|
|
|
- self.broadcast_consumer = None
|
|
|
self.ready_queue = ready_queue
|
|
|
- self.send_events = send_events
|
|
|
self.init_callback = init_callback
|
|
|
self.hostname = hostname or socket.gethostname()
|
|
|
- self.initial_prefetch_count = initial_prefetch_count
|
|
|
- self.event_dispatcher = None
|
|
|
- self.heart = None
|
|
|
self.pool = pool
|
|
|
- self.timer = timer or timer2.default_timer
|
|
|
- pidbox_state = AttributeDict(app=self.app,
|
|
|
- hostname=self.hostname,
|
|
|
- listener=self, # pre 2.2
|
|
|
- consumer=self)
|
|
|
- self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
|
|
|
- state=pidbox_state,
|
|
|
- handlers=Panel.data)
|
|
|
+ self.timer = timer or default_timer
|
|
|
+ self.strategies = {}
|
|
|
conninfo = self.app.connection()
|
|
|
self.connection_errors = conninfo.connection_errors
|
|
|
self.channel_errors = conninfo.channel_errors
|
|
|
|
|
|
self._does_info = logger.isEnabledFor(logging.INFO)
|
|
|
- self.strategies = {}
|
|
|
- if hub:
|
|
|
- hub.on_init.append(self.on_poll_init)
|
|
|
- self.hub = hub
|
|
|
self._quick_put = self.ready_queue.put
|
|
|
- self.amqheartbeat = amqheartbeat
|
|
|
- if self.amqheartbeat is None:
|
|
|
- self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
|
|
|
- if not hub:
|
|
|
+
|
|
|
+ if hub:
|
|
|
+ self.amqheartbeat = amqheartbeat
|
|
|
+ if self.amqheartbeat is None:
|
|
|
+ self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
|
|
|
+ self.hub = hub
|
|
|
+ self.hub.on_init.append(self.on_poll_init)
|
|
|
+ else:
|
|
|
+ self.hub = None
|
|
|
self.amqheartbeat = 0
|
|
|
|
|
|
+ if not hasattr(self, 'loop'):
|
|
|
+ self.loop = loops.asynloop if hub else loops.synloop
|
|
|
+
|
|
|
if _detect_environment() == 'gevent':
|
|
|
# there's a gevent bug that causes timeouts to not be reset,
|
|
|
# so if the connection timeout is exceeded once, it can NEVER
|
|
|
# connect again.
|
|
|
self.app.conf.BROKER_CONNECTION_TIMEOUT = None
|
|
|
|
|
|
- def update_strategies(self):
|
|
|
- S = self.strategies
|
|
|
- app = self.app
|
|
|
- loader = app.loader
|
|
|
- hostname = self.hostname
|
|
|
- for name, task in self.app.tasks.iteritems():
|
|
|
- S[name] = task.start_strategy(app, self)
|
|
|
- task.__trace__ = build_tracer(name, task, loader, hostname)
|
|
|
+ self.steps = []
|
|
|
+ self.namespace = self.Namespace(
|
|
|
+ app=self.app, on_start=self.on_start, on_close=self.on_close,
|
|
|
+ )
|
|
|
+ self.namespace.apply(self, **worker_options or {})
|
|
|
|
|
|
def start(self):
|
|
|
- """Start the consumer.
|
|
|
+ ns, loop = self.namespace, self.loop
|
|
|
+ while ns.state != CLOSE:
|
|
|
+ maybe_shutdown()
|
|
|
+ try:
|
|
|
+ ns.start(self)
|
|
|
+ except self.connection_errors + self.channel_errors:
|
|
|
+ maybe_shutdown()
|
|
|
+ if ns.state != CLOSE and self.connection:
|
|
|
+ error(CONNECTION_RETRY, exc_info=True)
|
|
|
+ ns.restart(self)
|
|
|
|
|
|
- Automatically survives intermittent connection failure,
|
|
|
- and will retry establishing the connection and restart
|
|
|
- consuming messages.
|
|
|
+ def shutdown(self):
|
|
|
+ self.namespace.shutdown(self)
|
|
|
|
|
|
- """
|
|
|
+ def stop(self):
|
|
|
+ self.namespace.stop(self)
|
|
|
|
|
|
- self.init_callback(self)
|
|
|
+ def on_start(self):
|
|
|
+ self.update_strategies()
|
|
|
|
|
|
- while self._state != CLOSE:
|
|
|
- self.maybe_shutdown()
|
|
|
- try:
|
|
|
- self.reset_connection()
|
|
|
- self.consume_messages()
|
|
|
- except self.connection_errors + self.channel_errors:
|
|
|
- error(RETRY_CONNECTION, exc_info=True)
|
|
|
+ def on_ready(self):
|
|
|
+ callback, self.init_callback = self.init_callback, None
|
|
|
+ if callback:
|
|
|
+ callback(self)
|
|
|
+
|
|
|
+ def loop_args(self):
|
|
|
+ return (self, self.connection, self.task_consumer,
|
|
|
+ self.strategies, self.namespace, self.hub, self.qos,
|
|
|
+ self.amqheartbeat, self.handle_unknown_message,
|
|
|
+ self.handle_unknown_task, self.handle_invalid_task)
|
|
|
|
|
|
def on_poll_init(self, hub):
|
|
|
hub.update_readers(self.connection.eventmap)
|
|
|
self.connection.transport.on_poll_init(hub.poller)
|
|
|
|
|
|
- def consume_messages(self, sleep=sleep, min=min, Empty=Empty,
|
|
|
- hbrate=AMQHEARTBEAT_RATE):
|
|
|
- """Consume messages forever (or until an exception is raised)."""
|
|
|
-
|
|
|
- with self.hub as hub:
|
|
|
- qos = self.qos
|
|
|
- update_qos = qos.update
|
|
|
- update_readers = hub.update_readers
|
|
|
- readers, writers = hub.readers, hub.writers
|
|
|
- poll = hub.poller.poll
|
|
|
- fire_timers = hub.fire_timers
|
|
|
- scheduled = hub.timer._queue
|
|
|
- connection = self.connection
|
|
|
- hb = self.amqheartbeat
|
|
|
- hbtick = connection.heartbeat_check
|
|
|
- on_poll_start = connection.transport.on_poll_start
|
|
|
- on_poll_empty = connection.transport.on_poll_empty
|
|
|
- strategies = self.strategies
|
|
|
- drain_nowait = connection.drain_nowait
|
|
|
- on_task_callbacks = hub.on_task
|
|
|
- keep_draining = connection.transport.nb_keep_draining
|
|
|
-
|
|
|
- if hb and connection.supports_heartbeats:
|
|
|
- hub.timer.apply_interval(
|
|
|
- hb * 1000.0 / hbrate, hbtick, (hbrate, ))
|
|
|
-
|
|
|
- def on_task_received(body, message):
|
|
|
- if on_task_callbacks:
|
|
|
- [callback() for callback in on_task_callbacks]
|
|
|
- try:
|
|
|
- name = body['task']
|
|
|
- except (KeyError, TypeError):
|
|
|
- return self.handle_unknown_message(body, message)
|
|
|
- try:
|
|
|
- strategies[name](message, body, message.ack_log_error)
|
|
|
- except KeyError as exc:
|
|
|
- self.handle_unknown_task(body, message, exc)
|
|
|
- except InvalidTaskError as exc:
|
|
|
- self.handle_invalid_task(body, message, exc)
|
|
|
- #fire_timers()
|
|
|
-
|
|
|
- self.task_consumer.callbacks = [on_task_received]
|
|
|
- self.task_consumer.consume()
|
|
|
-
|
|
|
- debug('Ready to accept tasks!')
|
|
|
-
|
|
|
- while self._state != CLOSE and self.connection:
|
|
|
- # shutdown if signal handlers told us to.
|
|
|
- if state.should_stop:
|
|
|
- raise SystemExit()
|
|
|
- elif state.should_terminate:
|
|
|
- raise SystemTerminate()
|
|
|
-
|
|
|
- # fire any ready timers, this also returns
|
|
|
- # the number of seconds until we need to fire timers again.
|
|
|
- poll_timeout = fire_timers() if scheduled else 1
|
|
|
-
|
|
|
- # We only update QoS when there is no more messages to read.
|
|
|
- # This groups together qos calls, and makes sure that remote
|
|
|
- # control commands will be prioritized over task messages.
|
|
|
- if qos.prev != qos.value:
|
|
|
- update_qos()
|
|
|
-
|
|
|
- update_readers(on_poll_start())
|
|
|
- if readers or writers:
|
|
|
- connection.more_to_read = True
|
|
|
- while connection.more_to_read:
|
|
|
- try:
|
|
|
- events = poll(poll_timeout)
|
|
|
- except ValueError: # Issue 882
|
|
|
- return
|
|
|
- if not events:
|
|
|
- on_poll_empty()
|
|
|
- for fileno, event in events or ():
|
|
|
- try:
|
|
|
- if event & READ:
|
|
|
- readers[fileno](fileno, event)
|
|
|
- if event & WRITE:
|
|
|
- writers[fileno](fileno, event)
|
|
|
- if event & ERR:
|
|
|
- for handlermap in readers, writers:
|
|
|
- try:
|
|
|
- handlermap[fileno](fileno, event)
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- except (KeyError, Empty):
|
|
|
- continue
|
|
|
- except socket.error:
|
|
|
- if self._state != CLOSE: # pragma: no cover
|
|
|
- raise
|
|
|
- if keep_draining:
|
|
|
- drain_nowait()
|
|
|
- poll_timeout = 0
|
|
|
- else:
|
|
|
- connection.more_to_read = False
|
|
|
- else:
|
|
|
- # no sockets yet, startup is probably not done.
|
|
|
- sleep(min(poll_timeout, 0.1))
|
|
|
+ def on_decode_error(self, message, exc):
|
|
|
+ """Callback called if an error occurs while decoding
|
|
|
+ a message received.
|
|
|
+
|
|
|
+ Simply logs the error and acknowledges the message so it
|
|
|
+ doesn't enter a loop.
|
|
|
+
|
|
|
+ :param message: The message with errors.
|
|
|
+ :param exc: The original exception instance.
|
|
|
+
|
|
|
+ """
|
|
|
+ crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
|
|
|
+ exc, message.content_type, message.content_encoding,
|
|
|
+ dump_body(message, message.body))
|
|
|
+ message.ack()
|
|
|
+
|
|
|
+ def on_close(self):
|
|
|
+ # Clear internal queues to get rid of old messages.
|
|
|
+ # They can't be acked anyway, as a delivery tag is specific
|
|
|
+ # to the current channel.
|
|
|
+ self.ready_queue.clear()
|
|
|
+ self.timer.clear()
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ """Establish the broker connection.
|
|
|
+
|
|
|
+ Will retry establishing the connection if the
|
|
|
+ :setting:`BROKER_CONNECTION_RETRY` setting is enabled
|
|
|
+
|
|
|
+ """
|
|
|
+ conn = self.app.connection(heartbeat=self.amqheartbeat)
|
|
|
+
|
|
|
+ # Callback called for each retry while the connection
|
|
|
+ # can't be established.
|
|
|
+ def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
|
|
|
+ if getattr(conn, 'alt', None) and interval == 0:
|
|
|
+ next_step = CONNECTION_FAILOVER
|
|
|
+ error(CONNECTION_ERROR, conn.as_uri(), exc,
|
|
|
+ next_step.format(when=humanize_seconds(interval, 'in', ' ')))
|
|
|
+
|
|
|
+ # remember that the connection is lazy, it won't establish
|
|
|
+ # until it's needed.
|
|
|
+ if not self.app.conf.BROKER_CONNECTION_RETRY:
|
|
|
+ # retry disabled, just call connect directly.
|
|
|
+ conn.connect()
|
|
|
+ return conn
|
|
|
+
|
|
|
+ return conn.ensure_connection(_error_handler,
|
|
|
+ self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
|
|
|
+ callback=maybe_shutdown)
|
|
|
+
|
|
|
+ def add_task_queue(self, queue, exchange=None, exchange_type=None,
|
|
|
+ routing_key=None, **options):
|
|
|
+ cset = self.task_consumer
|
|
|
+ try:
|
|
|
+ q = self.app.amqp.queues[queue]
|
|
|
+ except KeyError:
|
|
|
+ exchange = queue if exchange is None else exchange
|
|
|
+ exchange_type = 'direct' if exchange_type is None \
|
|
|
+ else exchange_type
|
|
|
+ q = self.app.amqp.queues.select_add(queue,
|
|
|
+ exchange=exchange,
|
|
|
+ exchange_type=exchange_type,
|
|
|
+ routing_key=routing_key, **options)
|
|
|
+ if not cset.consuming_from(queue):
|
|
|
+ cset.add_queue(q)
|
|
|
+ cset.consume()
|
|
|
+ info('Started consuming from %r', queue)
|
|
|
+
|
|
|
+ def cancel_task_queue(self, queue):
|
|
|
+ self.app.amqp.queues.select_remove(queue)
|
|
|
+ self.task_consumer.cancel_by_queue(queue)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def info(self):
|
|
|
+ """Returns information about this consumer instance
|
|
|
+ as a dict.
|
|
|
+
|
|
|
+ This is also the consumer related info returned by
|
|
|
+ ``celeryctl stats``.
|
|
|
+
|
|
|
+ """
|
|
|
+ conninfo = {}
|
|
|
+ if self.connection:
|
|
|
+ conninfo = self.connection.info()
|
|
|
+ conninfo.pop('password', None) # don't send password.
|
|
|
+ return {'broker': conninfo, 'prefetch_count': self.qos.value}
|
|
|
|
|
|
def on_task(self, task, task_reserved=task_reserved):
|
|
|
"""Handle received task.
|
|
@@ -395,7 +316,7 @@ class Consumer(object):
|
|
|
if task.eta:
|
|
|
eta = timezone.to_system(task.eta) if task.utc else task.eta
|
|
|
try:
|
|
|
- eta = timer2.to_timestamp(eta)
|
|
|
+ eta = to_timestamp(eta)
|
|
|
except OverflowError as exc:
|
|
|
error("Couldn't convert eta %s to timestamp: %r. Task: %r",
|
|
|
task.eta, exc, task.info(safe=True), exc_info=True)
|
|
@@ -409,16 +330,6 @@ class Consumer(object):
|
|
|
task_reserved(task)
|
|
|
self._quick_put(task)
|
|
|
|
|
|
- def on_control(self, body, message):
|
|
|
- """Process remote control command message."""
|
|
|
- try:
|
|
|
- self.pidbox_node.handle_message(body, message)
|
|
|
- except KeyError as exc:
|
|
|
- error('No such control command: %s', exc)
|
|
|
- except Exception as exc:
|
|
|
- error('Control command error: %r', exc, exc_info=True)
|
|
|
- self.reset_pidbox_node()
|
|
|
-
|
|
|
def apply_eta_task(self, task):
|
|
|
"""Method called by the timer to apply a task with an
|
|
|
ETA/countdown."""
|
|
@@ -444,307 +355,109 @@ class Consumer(object):
|
|
|
error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
|
|
|
|
- def receive_message(self, body, message):
|
|
|
- """Handles incoming messages.
|
|
|
+ def update_strategies(self):
|
|
|
+ loader = self.app.loader
|
|
|
+ for name, task in self.app.tasks.iteritems():
|
|
|
+ self.strategies[name] = task.start_strategy(self.app, self)
|
|
|
+ task.__trace__ = build_tracer(name, task, loader, self.hostname)
|
|
|
|
|
|
- :param body: The message body.
|
|
|
- :param message: The kombu message object.
|
|
|
|
|
|
- """
|
|
|
- try:
|
|
|
- name = body['task']
|
|
|
- except (KeyError, TypeError):
|
|
|
- return self.handle_unknown_message(body, message)
|
|
|
+class Connection(bootsteps.StartStopStep):
|
|
|
|
|
|
- try:
|
|
|
- self.strategies[name](message, body, message.ack_log_error)
|
|
|
- except KeyError as exc:
|
|
|
- self.handle_unknown_task(body, message, exc)
|
|
|
- except InvalidTaskError as exc:
|
|
|
- self.handle_invalid_task(body, message, exc)
|
|
|
-
|
|
|
- def maybe_conn_error(self, fun):
|
|
|
- """Applies function but ignores any connection or channel
|
|
|
- errors raised."""
|
|
|
- try:
|
|
|
- fun()
|
|
|
- except (AttributeError, ) + \
|
|
|
- self.connection_errors + \
|
|
|
- self.channel_errors:
|
|
|
- pass
|
|
|
+ def __init__(self, c, **kwargs):
|
|
|
+ c.connection = None
|
|
|
|
|
|
- def close_connection(self):
|
|
|
- """Closes the current broker connection and all open channels."""
|
|
|
+ def start(self, c):
|
|
|
+ c.connection = c.connect()
|
|
|
+ info('Connected to %s', c.connection.as_uri())
|
|
|
|
|
|
+ def shutdown(self, c):
|
|
|
# We must set self.connection to None here, so
|
|
|
# that the green pidbox thread exits.
|
|
|
- connection, self.connection = self.connection, None
|
|
|
-
|
|
|
- if self.task_consumer:
|
|
|
- debug('Closing consumer channel...')
|
|
|
- self.task_consumer = \
|
|
|
- self.maybe_conn_error(self.task_consumer.close)
|
|
|
-
|
|
|
- self.stop_pidbox_node()
|
|
|
-
|
|
|
+ connection, c.connection = c.connection, None
|
|
|
if connection:
|
|
|
- debug('Closing broker connection...')
|
|
|
- self.maybe_conn_error(connection.close)
|
|
|
-
|
|
|
- def stop_consumers(self, close_connection=True, join=True):
|
|
|
- """Stop consuming tasks and broadcast commands, also stops
|
|
|
- the heartbeat thread and event dispatcher.
|
|
|
+ ignore_errors(connection, connection.close)
|
|
|
|
|
|
- :keyword close_connection: Set to False to skip closing the broker
|
|
|
- connection.
|
|
|
-
|
|
|
- """
|
|
|
- if not self._state == RUN:
|
|
|
- return
|
|
|
-
|
|
|
- if self.heart:
|
|
|
- # Stop the heartbeat thread if it's running.
|
|
|
- debug('Heart: Going into cardiac arrest...')
|
|
|
- self.heart = self.heart.stop()
|
|
|
-
|
|
|
- debug('Cancelling task consumer...')
|
|
|
- if join and self.task_consumer:
|
|
|
- self.maybe_conn_error(self.task_consumer.cancel)
|
|
|
-
|
|
|
- if self.event_dispatcher:
|
|
|
- debug('Shutting down event dispatcher...')
|
|
|
- self.event_dispatcher = \
|
|
|
- self.maybe_conn_error(self.event_dispatcher.close)
|
|
|
-
|
|
|
- debug('Cancelling broadcast consumer...')
|
|
|
- if join and self.broadcast_consumer:
|
|
|
- self.maybe_conn_error(self.broadcast_consumer.cancel)
|
|
|
-
|
|
|
- if close_connection:
|
|
|
- self.close_connection()
|
|
|
-
|
|
|
- def on_decode_error(self, message, exc):
|
|
|
- """Callback called if an error occurs while decoding
|
|
|
- a message received.
|
|
|
-
|
|
|
- Simply logs the error and acknowledges the message so it
|
|
|
- doesn't enter a loop.
|
|
|
-
|
|
|
- :param message: The message with errors.
|
|
|
- :param exc: The original exception instance.
|
|
|
-
|
|
|
- """
|
|
|
- crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
|
|
|
- exc, message.content_type, message.content_encoding,
|
|
|
- dump_body(message, message.body))
|
|
|
- message.ack()
|
|
|
-
|
|
|
- def reset_pidbox_node(self):
|
|
|
- """Sets up the process mailbox."""
|
|
|
- self.stop_pidbox_node()
|
|
|
- # close previously opened channel if any.
|
|
|
- if self.pidbox_node.channel:
|
|
|
- try:
|
|
|
- self.pidbox_node.channel.close()
|
|
|
- except self.connection_errors + self.channel_errors:
|
|
|
- pass
|
|
|
-
|
|
|
- if self.pool is not None and self.pool.is_green:
|
|
|
- return self.pool.spawn_n(self._green_pidbox_node)
|
|
|
- self.pidbox_node.channel = self.connection.channel()
|
|
|
- self.broadcast_consumer = self.pidbox_node.listen(
|
|
|
- callback=self.on_control)
|
|
|
-
|
|
|
- def stop_pidbox_node(self):
|
|
|
- if self._pidbox_node_stopped:
|
|
|
- self._pidbox_node_shutdown.set()
|
|
|
- debug('Waiting for broadcast thread to shutdown...')
|
|
|
- self._pidbox_node_stopped.wait()
|
|
|
- self._pidbox_node_stopped = self._pidbox_node_shutdown = None
|
|
|
- elif self.broadcast_consumer:
|
|
|
- debug('Closing broadcast channel...')
|
|
|
- self.broadcast_consumer = \
|
|
|
- self.maybe_conn_error(self.broadcast_consumer.channel.close)
|
|
|
-
|
|
|
- def _green_pidbox_node(self):
|
|
|
- """Sets up the process mailbox when running in a greenlet
|
|
|
- environment."""
|
|
|
- # THIS CODE IS TERRIBLE
|
|
|
- # Luckily work has already started rewriting the Consumer for 4.0.
|
|
|
- self._pidbox_node_shutdown = threading.Event()
|
|
|
- self._pidbox_node_stopped = threading.Event()
|
|
|
- try:
|
|
|
- with self._open_connection() as conn:
|
|
|
- info('pidbox: Connected to %s.', conn.as_uri())
|
|
|
- self.pidbox_node.channel = conn.default_channel
|
|
|
- self.broadcast_consumer = self.pidbox_node.listen(
|
|
|
- callback=self.on_control)
|
|
|
- with self.broadcast_consumer:
|
|
|
- while not self._pidbox_node_shutdown.isSet():
|
|
|
- try:
|
|
|
- conn.drain_events(timeout=1.0)
|
|
|
- except socket.timeout:
|
|
|
- pass
|
|
|
- finally:
|
|
|
- self._pidbox_node_stopped.set()
|
|
|
-
|
|
|
- def reset_connection(self):
|
|
|
- """Re-establish the broker connection and set up consumers,
|
|
|
- heartbeat and the event dispatcher."""
|
|
|
- debug('Re-establishing connection to the broker...')
|
|
|
- self.stop_consumers(join=False)
|
|
|
-
|
|
|
- # Clear internal queues to get rid of old messages.
|
|
|
- # They can't be acked anyway, as a delivery tag is specific
|
|
|
- # to the current channel.
|
|
|
- self.ready_queue.clear()
|
|
|
- self.timer.clear()
|
|
|
|
|
|
- # Re-establish the broker connection and setup the task consumer.
|
|
|
- self.connection = self._open_connection()
|
|
|
- info('consumer: Connected to %s.', self.connection.as_uri())
|
|
|
- self.task_consumer = self.app.amqp.TaskConsumer(self.connection,
|
|
|
- on_decode_error=self.on_decode_error)
|
|
|
- # QoS: Reset prefetch window.
|
|
|
- self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
|
|
|
- self.qos.update()
|
|
|
+class Events(bootsteps.StartStopStep):
|
|
|
+ requires = (Connection, )
|
|
|
|
|
|
- # Setup the process mailbox.
|
|
|
- self.reset_pidbox_node()
|
|
|
+ def __init__(self, c, send_events=None, **kwargs):
|
|
|
+ self.send_events = send_events
|
|
|
+ c.event_dispatcher = None
|
|
|
|
|
|
+ def start(self, c):
|
|
|
# Flush events sent while connection was down.
|
|
|
- prev_event_dispatcher = self.event_dispatcher
|
|
|
- self.event_dispatcher = self.app.events.Dispatcher(self.connection,
|
|
|
- hostname=self.hostname,
|
|
|
- enabled=self.send_events)
|
|
|
- if prev_event_dispatcher:
|
|
|
- self.event_dispatcher.copy_buffer(prev_event_dispatcher)
|
|
|
- self.event_dispatcher.flush()
|
|
|
-
|
|
|
- # Restart heartbeat thread.
|
|
|
- self.restart_heartbeat()
|
|
|
-
|
|
|
- # reload all task's execution strategies.
|
|
|
- self.update_strategies()
|
|
|
-
|
|
|
- # We're back!
|
|
|
- self._state = RUN
|
|
|
-
|
|
|
- def restart_heartbeat(self):
|
|
|
- """Restart the heartbeat thread.
|
|
|
-
|
|
|
- This thread sends heartbeat events at intervals so monitors
|
|
|
- can tell if the worker is off-line/missing.
|
|
|
-
|
|
|
- """
|
|
|
- self.heart = Heart(self.timer, self.event_dispatcher)
|
|
|
- self.heart.start()
|
|
|
-
|
|
|
- def _open_connection(self):
|
|
|
- """Establish the broker connection.
|
|
|
-
|
|
|
- Will retry establishing the connection if the
|
|
|
- :setting:`BROKER_CONNECTION_RETRY` setting is enabled
|
|
|
-
|
|
|
- """
|
|
|
- conn = self.app.connection(heartbeat=self.amqheartbeat)
|
|
|
-
|
|
|
- # Callback called for each retry while the connection
|
|
|
- # can't be established.
|
|
|
- def _error_handler(exc, interval, next_step=CONNECTION_RETRY):
|
|
|
- if getattr(conn, 'alt', None) and interval == 0:
|
|
|
- next_step = CONNECTION_FAILOVER
|
|
|
- error(CONNECTION_ERROR, conn.as_uri(), exc,
|
|
|
- next_step.format(when=humanize_seconds(interval, 'in', ' ')))
|
|
|
+ prev = c.event_dispatcher
|
|
|
+ dis = c.event_dispatcher = c.app.events.Dispatcher(
|
|
|
+ c.connection, hostname=c.hostname, enabled=self.send_events,
|
|
|
+ )
|
|
|
+ if prev:
|
|
|
+ dis.copy_buffer(prev)
|
|
|
+ dis.flush()
|
|
|
|
|
|
- # remember that the connection is lazy, it won't establish
|
|
|
- # until it's needed.
|
|
|
- if not self.app.conf.BROKER_CONNECTION_RETRY:
|
|
|
- # retry disabled, just call connect directly.
|
|
|
- conn.connect()
|
|
|
- return conn
|
|
|
-
|
|
|
- return conn.ensure_connection(_error_handler,
|
|
|
- self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
|
|
|
- callback=self.maybe_shutdown)
|
|
|
+ def stop(self, c):
|
|
|
+ if c.event_dispatcher:
|
|
|
+ ignore_errors(c, c.event_dispatcher.close)
|
|
|
+ c.event_dispatcher = None
|
|
|
+ shutdown = stop
|
|
|
|
|
|
- def stop(self):
|
|
|
- """Stop consuming.
|
|
|
|
|
|
- Does not close the broker connection, so be sure to call
|
|
|
- :meth:`close_connection` when you are finished with it.
|
|
|
+class Heart(bootsteps.StartStopStep):
|
|
|
+ requires = (Events, )
|
|
|
|
|
|
- """
|
|
|
- # Notifies other threads that this instance can't be used
|
|
|
- # anymore.
|
|
|
- self.close()
|
|
|
- debug('Stopping consumers...')
|
|
|
- self.stop_consumers(close_connection=False, join=True)
|
|
|
+ def __init__(self, c, **kwargs):
|
|
|
+ c.heart = None
|
|
|
|
|
|
- def close(self):
|
|
|
- self._state = CLOSE
|
|
|
+ def start(self, c):
|
|
|
+ c.heart = heartbeat.Heart(c.timer, c.event_dispatcher)
|
|
|
+ c.heart.start()
|
|
|
|
|
|
- def maybe_shutdown(self):
|
|
|
- if state.should_stop:
|
|
|
- raise SystemExit()
|
|
|
- elif state.should_terminate:
|
|
|
- raise SystemTerminate()
|
|
|
+ def stop(self, c):
|
|
|
+ c.heart = c.heart and c.heart.stop()
|
|
|
+ shutdown = stop
|
|
|
|
|
|
- def add_task_queue(self, queue, exchange=None, exchange_type=None,
|
|
|
- routing_key=None, **options):
|
|
|
- cset = self.task_consumer
|
|
|
- try:
|
|
|
- q = self.app.amqp.queues[queue]
|
|
|
- except KeyError:
|
|
|
- exchange = queue if exchange is None else exchange
|
|
|
- exchange_type = 'direct' if exchange_type is None \
|
|
|
- else exchange_type
|
|
|
- q = self.app.amqp.queues.select_add(queue,
|
|
|
- exchange=exchange,
|
|
|
- exchange_type=exchange_type,
|
|
|
- routing_key=routing_key, **options)
|
|
|
- if not cset.consuming_from(queue):
|
|
|
- cset.add_queue(q)
|
|
|
- cset.consume()
|
|
|
- logger.info('Started consuming from %r', queue)
|
|
|
|
|
|
- def cancel_task_queue(self, queue):
|
|
|
- self.app.amqp.queues.select_remove(queue)
|
|
|
- self.task_consumer.cancel_by_queue(queue)
|
|
|
+class Control(bootsteps.StartStopStep):
|
|
|
+ requires = (Events, )
|
|
|
|
|
|
- @property
|
|
|
- def info(self):
|
|
|
- """Returns information about this consumer instance
|
|
|
- as a dict.
|
|
|
+ def __init__(self, c, **kwargs):
|
|
|
+ self.is_green = c.pool is not None and c.pool.is_green
|
|
|
+ self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
|
|
|
+ self.start = self.box.start
|
|
|
+ self.stop = self.box.stop
|
|
|
+ self.shutdown = self.box.shutdown
|
|
|
|
|
|
- This is also the consumer related info returned by
|
|
|
- ``celeryctl stats``.
|
|
|
|
|
|
- """
|
|
|
- conninfo = {}
|
|
|
- if self.connection:
|
|
|
- conninfo = self.connection.info()
|
|
|
- conninfo.pop('password', None) # don't send password.
|
|
|
- return {'broker': conninfo, 'prefetch_count': self.qos.value}
|
|
|
+class Tasks(bootsteps.StartStopStep):
|
|
|
+ requires = (Control, )
|
|
|
|
|
|
+ def __init__(self, c, initial_prefetch_count=2, **kwargs):
|
|
|
+ c.task_consumer = c.qos = None
|
|
|
+ self.initial_prefetch_count = initial_prefetch_count
|
|
|
|
|
|
-class BlockingConsumer(Consumer):
|
|
|
+ def start(self, c):
|
|
|
+ c.task_consumer = c.app.amqp.TaskConsumer(
|
|
|
+ c.connection, on_decode_error=c.on_decode_error,
|
|
|
+ )
|
|
|
+ c.qos = QoS(c.task_consumer, self.initial_prefetch_count)
|
|
|
+ c.qos.update() # set initial prefetch count
|
|
|
+
|
|
|
+ def stop(self, c):
|
|
|
+ if c.task_consumer:
|
|
|
+ debug('Cancelling task consumer...')
|
|
|
+ ignore_errors(c, c.task_consumer.cancel)
|
|
|
+
|
|
|
+ def shutdown(self, c):
|
|
|
+ if c.task_consumer:
|
|
|
+ self.stop(c)
|
|
|
+ debug('Closing consumer channel...')
|
|
|
+ ignore_errors(c, c.task_consumer.close)
|
|
|
+ c.task_consumer = None
|
|
|
|
|
|
- def consume_messages(self):
|
|
|
- # receive_message handles incoming messages.
|
|
|
- self.task_consumer.register_callback(self.receive_message)
|
|
|
- self.task_consumer.consume()
|
|
|
|
|
|
- debug('Ready to accept tasks!')
|
|
|
+class Evloop(bootsteps.StartStopStep):
|
|
|
+ last = True
|
|
|
|
|
|
- while self._state != CLOSE and self.connection:
|
|
|
- self.maybe_shutdown()
|
|
|
- if self.qos.prev != self.qos.value: # pragma: no cover
|
|
|
- self.qos.update()
|
|
|
- try:
|
|
|
- self.connection.drain_events(timeout=10.0)
|
|
|
- except socket.timeout:
|
|
|
- pass
|
|
|
- except socket.error:
|
|
|
- if self._state != CLOSE: # pragma: no cover
|
|
|
- raise
|
|
|
+ def start(self, c):
|
|
|
+ c.loop(*c.loop_args())
|