|
@@ -3,114 +3,61 @@
|
|
|
celery.worker.consumer
|
|
|
~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
|
|
-This module contains the component responsible for consuming messages
|
|
|
+This module contains the components responsible for consuming messages
|
|
|
from the broker, processing the messages and keeping the broker connections
|
|
|
up and running.
|
|
|
|
|
|
-
|
|
|
-* :meth:`~Consumer.start` is an infinite loop, which only iterates
|
|
|
- again if the connection is lost. For each iteration (at start, or if the
|
|
|
- connection is lost) it calls :meth:`~Consumer.reset_connection`,
|
|
|
- and starts the consumer by calling :meth:`~Consumer.consume_messages`.
|
|
|
-
|
|
|
-* :meth:`~Consumer.reset_connection`, clears the internal queues,
|
|
|
- establishes a new connection to the broker, sets up the task
|
|
|
- consumer (+ QoS), and the broadcast remote control command consumer.
|
|
|
-
|
|
|
- Also if events are enabled it configures the event dispatcher and starts
|
|
|
- up the heartbeat thread.
|
|
|
-
|
|
|
-* Finally it can consume messages. :meth:`~Consumer.consume_messages`
|
|
|
- is simply an infinite loop waiting for events on the AMQP channels.
|
|
|
-
|
|
|
- Both the task consumer and the broadcast consumer uses the same
|
|
|
- callback: :meth:`~Consumer.receive_message`.
|
|
|
-
|
|
|
-* So for each message received the :meth:`~Consumer.receive_message`
|
|
|
- method is called, this checks the payload of the message for either
|
|
|
- a `task` key or a `control` key.
|
|
|
-
|
|
|
- If the message is a task, it verifies the validity of the message
|
|
|
- converts it to a :class:`celery.worker.job.Request`, and sends
|
|
|
- it to :meth:`~Consumer.on_task`.
|
|
|
-
|
|
|
- If the message is a control command the message is passed to
|
|
|
- :meth:`~Consumer.on_control`, which in turn dispatches
|
|
|
- the control command using the control dispatcher.
|
|
|
-
|
|
|
- It also tries to handle malformed or invalid messages properly,
|
|
|
- so the worker doesn't choke on them and die. Any invalid messages
|
|
|
- are acknowledged immediately and logged, so the message is not resent
|
|
|
- again, and again.
|
|
|
-
|
|
|
-* If the task has an ETA/countdown, the task is moved to the `timer`
|
|
|
- so the :class:`timer2.Timer` can schedule it at its
|
|
|
- deadline. Tasks without an eta are moved immediately to the `ready_queue`,
|
|
|
- so they can be picked up by the :class:`~celery.worker.mediator.Mediator`
|
|
|
- to be sent to the pool.
|
|
|
-
|
|
|
-* When a task with an ETA is received the QoS prefetch count is also
|
|
|
- incremented, so another message can be reserved. When the ETA is met
|
|
|
- the prefetch count is decremented again, though this cannot happen
|
|
|
- immediately because amqplib doesn't support doing broker requests
|
|
|
- across threads. Instead the current prefetch count is kept as a
|
|
|
- shared counter, so as soon as :meth:`~Consumer.consume_messages`
|
|
|
- detects that the value has changed it will send out the actual
|
|
|
- QoS event to the broker.
|
|
|
-
|
|
|
-* Notice that when the connection is lost all internal queues are cleared
|
|
|
- because we can no longer ack the messages reserved in memory.
|
|
|
- However, this is not dangerous as the broker will resend them
|
|
|
- to another worker when the channel is closed.
|
|
|
-
|
|
|
-* **WARNING**: :meth:`~Consumer.stop` does not close the connection!
|
|
|
- This is because some pre-acked messages may be in processing,
|
|
|
- and they need to be finished before the channel is closed.
|
|
|
- For celeryd this means the pool must finish the tasks it has acked
|
|
|
- early, *then* close the connection.
|
|
|
-
|
|
|
"""
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
import logging
|
|
|
import socket
|
|
|
-import threading
|
|
|
-
|
|
|
-from time import sleep
|
|
|
-from Queue import Empty
|
|
|
|
|
|
+from kombu.common import QoS, ignore_errors
|
|
|
+from kombu.syn import _detect_environment
|
|
|
from kombu.utils.encoding import safe_repr
|
|
|
-from kombu.utils.eventio import READ, WRITE, ERR
|
|
|
|
|
|
+from celery import bootsteps
|
|
|
from celery.app import app_or_default
|
|
|
-from celery.datastructures import AttributeDict
|
|
|
-from celery.exceptions import InvalidTaskError, SystemTerminate
|
|
|
from celery.task.trace import build_tracer
|
|
|
-from celery.utils import timer2
|
|
|
+from celery.utils.timer2 import default_timer, to_timestamp
|
|
|
from celery.utils.functional import noop
|
|
|
from celery.utils.log import get_logger
|
|
|
-from celery.utils.imports import instantiate
|
|
|
-from celery.utils import text
|
|
|
+from celery.utils.text import truncate
|
|
|
+from celery.utils.timeutils import humanize_seconds, timezone
|
|
|
|
|
|
-from . import state
|
|
|
-from .bootsteps import StartStopComponent
|
|
|
-from .control import Panel
|
|
|
-from .heartbeat import Heart
|
|
|
+from . import heartbeat, loops, pidbox
|
|
|
+from .state import task_reserved, maybe_shutdown
|
|
|
|
|
|
-RUN = 0x1
|
|
|
-CLOSE = 0x2
|
|
|
+CLOSE = bootsteps.CLOSE
|
|
|
+logger = get_logger(__name__)
|
|
|
+debug, info, warn, error, crit = (logger.debug, logger.info, logger.warn,
|
|
|
+ logger.error, logger.critical)
|
|
|
|
|
|
-#: Heartbeat check is called every heartbeat_seconds' / rate'.
|
|
|
-AMQHEARTBEAT_RATE = 2.0
|
|
|
+CONNECTION_RETRY = """\
|
|
|
+consumer: Connection to broker lost. \
|
|
|
+Trying to re-establish the connection...\
|
|
|
+"""
|
|
|
+
|
|
|
+CONNECTION_RETRY_STEP = """\
|
|
|
+Trying again {when}...\
|
|
|
+"""
|
|
|
+
|
|
|
+CONNECTION_ERROR = """\
|
|
|
+consumer: Cannot connect to %s: %s.
|
|
|
+%s
|
|
|
+"""
|
|
|
|
|
|
-#: Prefetch count can't exceed short.
|
|
|
-PREFETCH_COUNT_MAX = 0xFFFF
|
|
|
+CONNECTION_FAILOVER = """\
|
|
|
+Will retry using next failover.\
|
|
|
+"""
|
|
|
|
|
|
UNKNOWN_FORMAT = """\
|
|
|
Received and deleted unknown message. Wrong destination?!?
|
|
|
|
|
|
The full contents of the message body was: %s
|
|
|
"""
|
|
|
+
|
|
|
#: Error message for when an unregistered task is received.
|
|
|
UNKNOWN_TASK_ERROR = """\
|
|
|
Received unregistered task of type %s.
|
|
@@ -136,173 +83,24 @@ The full contents of the message body was:
|
|
|
%s
|
|
|
"""
|
|
|
|
|
|
-MESSAGE_REPORT_FMT = """\
|
|
|
+MESSAGE_REPORT = """\
|
|
|
body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
|
|
|
"""
|
|
|
|
|
|
|
|
|
-RETRY_CONNECTION = """\
|
|
|
-Consumer: Connection to broker lost. \
|
|
|
-Trying to re-establish the connection...\
|
|
|
-"""
|
|
|
-
|
|
|
-task_reserved = state.task_reserved
|
|
|
-
|
|
|
-logger = get_logger(__name__)
|
|
|
-info, warn, error, crit = (logger.info, logger.warn,
|
|
|
- logger.error, logger.critical)
|
|
|
-
|
|
|
-
|
|
|
-def debug(msg, *args, **kwargs):
|
|
|
- logger.debug('Consumer: {0}'.format(msg), *args, **kwargs)
|
|
|
-
|
|
|
-
|
|
|
def dump_body(m, body):
|
|
|
- return '{0} ({1}b)'.format(text.truncate(safe_repr(body), 1024),
|
|
|
+ return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
|
|
|
len(m.body))
|
|
|
|
|
|
|
|
|
-class Component(StartStopComponent):
|
|
|
- name = 'worker.consumer'
|
|
|
- last = True
|
|
|
-
|
|
|
- def Consumer(self, w):
|
|
|
- return (w.consumer_cls or
|
|
|
- Consumer if w.hub else BlockingConsumer)
|
|
|
-
|
|
|
- def create(self, w):
|
|
|
- prefetch_count = w.concurrency * w.prefetch_multiplier
|
|
|
- c = w.consumer = self.instantiate(self.Consumer(w),
|
|
|
- w.ready_queue,
|
|
|
- hostname=w.hostname,
|
|
|
- send_events=w.send_events,
|
|
|
- init_callback=w.ready_callback,
|
|
|
- initial_prefetch_count=prefetch_count,
|
|
|
- pool=w.pool,
|
|
|
- timer=w.timer,
|
|
|
- app=w.app,
|
|
|
- controller=w,
|
|
|
- hub=w.hub)
|
|
|
- return c
|
|
|
-
|
|
|
-
|
|
|
-class QoS(object):
|
|
|
- """Thread safe increment/decrement of a channels prefetch_count.
|
|
|
-
|
|
|
- :param consumer: A :class:`kombu.messaging.Consumer` instance.
|
|
|
- :param initial_value: Initial prefetch count value.
|
|
|
-
|
|
|
- """
|
|
|
- prev = None
|
|
|
-
|
|
|
- def __init__(self, consumer, initial_value):
|
|
|
- self.consumer = consumer
|
|
|
- self._mutex = threading.RLock()
|
|
|
- self.value = initial_value or 0
|
|
|
-
|
|
|
- def increment_eventually(self, n=1):
|
|
|
- """Increment the value, but do not update the channels QoS.
|
|
|
-
|
|
|
- The MainThread will be responsible for calling :meth:`update`
|
|
|
- when necessary.
|
|
|
-
|
|
|
- """
|
|
|
- with self._mutex:
|
|
|
- if self.value:
|
|
|
- self.value = self.value + max(n, 0)
|
|
|
- return self.value
|
|
|
-
|
|
|
- def decrement_eventually(self, n=1):
|
|
|
- """Decrement the value, but do not update the channels QoS.
|
|
|
-
|
|
|
- The MainThread will be responsible for calling :meth:`update`
|
|
|
- when necessary.
|
|
|
-
|
|
|
- """
|
|
|
- with self._mutex:
|
|
|
- if self.value:
|
|
|
- self.value -= n
|
|
|
- return self.value
|
|
|
-
|
|
|
- def set(self, pcount):
|
|
|
- """Set channel prefetch_count setting."""
|
|
|
- if pcount != self.prev:
|
|
|
- new_value = pcount
|
|
|
- if pcount > PREFETCH_COUNT_MAX:
|
|
|
- warn('QoS: Disabled: prefetch_count exceeds %r',
|
|
|
- PREFETCH_COUNT_MAX)
|
|
|
- new_value = 0
|
|
|
- debug('basic.qos: prefetch_count->%s', new_value)
|
|
|
- self.consumer.qos(prefetch_count=new_value)
|
|
|
- self.prev = pcount
|
|
|
- return pcount
|
|
|
-
|
|
|
- def update(self):
|
|
|
- """Update prefetch count with current value."""
|
|
|
- with self._mutex:
|
|
|
- return self.set(self.value)
|
|
|
-
|
|
|
-
|
|
|
class Consumer(object):
|
|
|
- """Listen for messages received from the broker and
|
|
|
- move them to the ready queue for task processing.
|
|
|
-
|
|
|
- :param ready_queue: See :attr:`ready_queue`.
|
|
|
- :param timer: See :attr:`timer`.
|
|
|
|
|
|
- """
|
|
|
-
|
|
|
- #: The queue that holds tasks ready for immediate processing.
|
|
|
+ #: Intra-queue for tasks ready to be handled
|
|
|
ready_queue = None
|
|
|
|
|
|
- #: Enable/disable events.
|
|
|
- send_events = False
|
|
|
-
|
|
|
- #: Optional callback to be called when the connection is established.
|
|
|
- #: Will only be called once, even if the connection is lost and
|
|
|
- #: re-established.
|
|
|
+ #: Optional callback called the first time the worker
|
|
|
+ #: is ready to receive tasks.
|
|
|
init_callback = None
|
|
|
-
|
|
|
-
|
|
|
- #: List of callbacks to be called when the connection is started/reset,
|
|
|
- #: applied with the connection instance as sole argument.
|
|
|
- on_reset_conncetion = None
|
|
|
-
|
|
|
-
|
|
|
- #: List of callbacks to be called before the connection is closed,
|
|
|
- #: applied with the connection instance as sole argument.
|
|
|
- on_close_connection = None
|
|
|
-
|
|
|
- #: The current hostname. Defaults to the system hostname.
|
|
|
- hostname = None
|
|
|
-
|
|
|
- #: Initial QoS prefetch count for the task channel.
|
|
|
- initial_prefetch_count = 0
|
|
|
-
|
|
|
- #: A :class:`celery.events.EventDispatcher` for sending events.
|
|
|
- event_dispatcher = None
|
|
|
-
|
|
|
- #: The thread that sends event heartbeats at regular intervals.
|
|
|
- #: The heartbeats are used by monitors to detect that a worker
|
|
|
- #: went offline/disappeared.
|
|
|
- heart = None
|
|
|
-
|
|
|
- #: The broker connection.
|
|
|
- connection = None
|
|
|
-
|
|
|
- #: The consumer used to consume task messages.
|
|
|
- task_consumer = None
|
|
|
-
|
|
|
- #: The consumer used to consume broadcast commands.
|
|
|
- broadcast_consumer = None
|
|
|
-
|
|
|
- #: Dictionary holding all active actors.
|
|
|
- actor_registry = {}
|
|
|
-
|
|
|
- #: The process mailbox (kombu pidbox node).
|
|
|
- pidbox_node = None
|
|
|
- _pidbox_node_shutdown = None # used for greenlets
|
|
|
- _pidbox_node_stopped = None # used for greenlets
|
|
|
|
|
|
#: The current worker pool instance.
|
|
|
pool = None
|
|
@@ -311,234 +109,81 @@ class Consumer(object):
|
|
|
#: as sending heartbeats.
|
|
|
timer = None
|
|
|
|
|
|
- # Consumer state, can be RUN or CLOSE.
|
|
|
- _state = None
|
|
|
+ class Namespace(bootsteps.Namespace):
|
|
|
+ name = 'Consumer'
|
|
|
+ default_steps = [
|
|
|
+ 'celery.worker.consumer:Connection',
|
|
|
+ 'celery.worker.consumer:Events',
|
|
|
+ 'celery.worker.consumer:Heart',
|
|
|
+ 'celery.worker.consumer:Control',
|
|
|
+ 'celery.worker.consumer:Tasks',
|
|
|
+ 'celery.worker.consumer:Evloop',
|
|
|
+ ]
|
|
|
+
|
|
|
+ def shutdown(self, parent):
|
|
|
+ self.restart(parent, 'Shutdown', 'shutdown')
|
|
|
|
|
|
def __init__(self, ready_queue,
|
|
|
- init_callback=noop, send_events=False, hostname=None,
|
|
|
- initial_prefetch_count=2, pool=None, app=None,
|
|
|
+ init_callback=noop, hostname=None,
|
|
|
+ pool=None, app=None,
|
|
|
timer=None, controller=None, hub=None, amqheartbeat=None,
|
|
|
- **kwargs):
|
|
|
+ worker_options=None, **kwargs):
|
|
|
self.app = app_or_default(app)
|
|
|
- self.connection = None
|
|
|
- self.task_consumer = None
|
|
|
self.controller = controller
|
|
|
- self.on_reset_connection = []
|
|
|
- self.on_close_connection = []
|
|
|
- self.broadcast_consumer = None
|
|
|
- self.actor_registry = {}
|
|
|
self.ready_queue = ready_queue
|
|
|
- self.send_events = send_events
|
|
|
self.init_callback = init_callback
|
|
|
self.hostname = hostname or socket.gethostname()
|
|
|
- self.initial_prefetch_count = initial_prefetch_count
|
|
|
- self.event_dispatcher = None
|
|
|
- self.heart = None
|
|
|
self.pool = pool
|
|
|
- self.timer = timer or timer2.default_timer
|
|
|
- pidbox_state = AttributeDict(app=self.app,
|
|
|
- hostname=self.hostname,
|
|
|
- listener=self, # pre 2.2
|
|
|
- consumer=self)
|
|
|
- self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
|
|
|
- state=pidbox_state,
|
|
|
- handlers=Panel.data)
|
|
|
-
|
|
|
+ self.timer = timer or default_timer
|
|
|
+ self.strategies = {}
|
|
|
conninfo = self.app.connection()
|
|
|
self.connection_errors = conninfo.connection_errors
|
|
|
self.channel_errors = conninfo.channel_errors
|
|
|
|
|
|
self._does_info = logger.isEnabledFor(logging.INFO)
|
|
|
- self.strategies = {}
|
|
|
- if hub:
|
|
|
- hub.on_init.append(self.on_poll_init)
|
|
|
- self.hub = hub
|
|
|
self._quick_put = self.ready_queue.put
|
|
|
- self.amqheartbeat = amqheartbeat
|
|
|
- if self.amqheartbeat is None:
|
|
|
- self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
|
|
|
- if not hub:
|
|
|
- self.amqheartbeat = 0
|
|
|
|
|
|
- def update_strategies(self):
|
|
|
- S = self.strategies
|
|
|
- app = self.app
|
|
|
- loader = app.loader
|
|
|
- hostname = self.hostname
|
|
|
- for name, task in self.app.tasks.iteritems():
|
|
|
- S[name] = task.start_strategy(app, self)
|
|
|
- task.__trace__ = build_tracer(name, task, loader, hostname)
|
|
|
+ if hub:
|
|
|
+ self.amqheartbeat = amqheartbeat
|
|
|
+ if self.amqheartbeat is None:
|
|
|
+ self.amqheartbeat = self.app.conf.BROKER_HEARTBEAT
|
|
|
+ self.hub = hub
|
|
|
+ self.hub.on_init.append(self.on_poll_init)
|
|
|
+ else:
|
|
|
+ self.hub = None
|
|
|
+ self.amqheartbeat = 0
|
|
|
|
|
|
- def start(self):
|
|
|
- """Start the consumer.
|
|
|
+ if not hasattr(self, 'loop'):
|
|
|
+ self.loop = loops.asynloop if hub else loops.synloop
|
|
|
|
|
|
- Automatically survives intermittent connection failure,
|
|
|
- and will retry establishing the connection and restart
|
|
|
- consuming messages.
|
|
|
+ if _detect_environment() == 'gevent':
|
|
|
+ # there's a gevent bug that causes timeouts to not be reset,
|
|
|
+ # so if the connection timeout is exceeded once, it can NEVER
|
|
|
+ # connect again.
|
|
|
+ self.app.conf.BROKER_CONNECTION_TIMEOUT = None
|
|
|
|
|
|
- """
|
|
|
+ self.steps = []
|
|
|
+ self.namespace = self.Namespace(
|
|
|
+ app=self.app, on_start=self.on_start, on_close=self.on_close,
|
|
|
+ )
|
|
|
+ self.namespace.apply(self, **worker_options or {})
|
|
|
|
|
|
- self.init_callback(self)
|
|
|
- while self._state != CLOSE:
|
|
|
- self.maybe_shutdown()
|
|
|
+ def start(self):
|
|
|
+ ns, loop = self.namespace, self.loop
|
|
|
+ while ns.state != CLOSE:
|
|
|
+ maybe_shutdown()
|
|
|
try:
|
|
|
- self.reset_connection()
|
|
|
- self.consume_messages()
|
|
|
+ ns.start(self)
|
|
|
except self.connection_errors + self.channel_errors:
|
|
|
- error(RETRY_CONNECTION, exc_info=True)
|
|
|
- def on_poll_init(self, hub):
|
|
|
- hub.update_readers(self.connection.eventmap)
|
|
|
- self.connection.transport.on_poll_init(hub.poller)
|
|
|
-
|
|
|
- def consume_messages(self, sleep=sleep, min=min, Empty=Empty,
|
|
|
- hbrate=AMQHEARTBEAT_RATE):
|
|
|
- """Consume messages forever (or until an exception is raised)."""
|
|
|
-
|
|
|
- with self.hub as hub:
|
|
|
- qos = self.qos
|
|
|
- update_qos = qos.update
|
|
|
- update_readers = hub.update_readers
|
|
|
- readers, writers = hub.readers, hub.writers
|
|
|
- poll = hub.poller.poll
|
|
|
- fire_timers = hub.fire_timers
|
|
|
- scheduled = hub.timer._queue
|
|
|
- connection = self.connection
|
|
|
- hb = self.amqheartbeat
|
|
|
- hbtick = connection.heartbeat_check
|
|
|
- on_poll_start = connection.transport.on_poll_start
|
|
|
- on_poll_empty = connection.transport.on_poll_empty
|
|
|
- strategies = self.strategies
|
|
|
- drain_nowait = connection.drain_nowait
|
|
|
- on_task_callbacks = hub.on_task
|
|
|
- keep_draining = connection.transport.nb_keep_draining
|
|
|
-
|
|
|
- if hb and connection.supports_heartbeats:
|
|
|
- hub.timer.apply_interval(
|
|
|
- hb * 1000.0 / hbrate, hbtick, (hbrate, ))
|
|
|
-
|
|
|
- def on_task_received(body, message):
|
|
|
- if on_task_callbacks:
|
|
|
- [callback() for callback in on_task_callbacks]
|
|
|
- try:
|
|
|
- name = body['task']
|
|
|
- except (KeyError, TypeError):
|
|
|
- return self.handle_unknown_message(body, message)
|
|
|
- try:
|
|
|
- strategies[name](message, body, message.ack_log_error)
|
|
|
- except KeyError as exc:
|
|
|
- self.handle_unknown_task(body, message, exc)
|
|
|
- except InvalidTaskError as exc:
|
|
|
- self.handle_invalid_task(body, message, exc)
|
|
|
- #fire_timers()
|
|
|
-
|
|
|
- self.task_consumer.callbacks = [on_task_received]
|
|
|
- self.task_consumer.consume()
|
|
|
-
|
|
|
- debug('Ready to accept tasks!')
|
|
|
-
|
|
|
- while self._state != CLOSE and self.connection:
|
|
|
- # shutdown if signal handlers told us to.
|
|
|
- if state.should_stop:
|
|
|
- raise SystemExit()
|
|
|
- elif state.should_terminate:
|
|
|
- raise SystemTerminate()
|
|
|
-
|
|
|
- # fire any ready timers, this also returns
|
|
|
- # the number of seconds until we need to fire timers again.
|
|
|
- poll_timeout = fire_timers() if scheduled else 1
|
|
|
-
|
|
|
- # We only update QoS when there is no more messages to read.
|
|
|
- # This groups together qos calls, and makes sure that remote
|
|
|
- # control commands will be prioritized over task messages.
|
|
|
- if qos.prev != qos.value:
|
|
|
- update_qos()
|
|
|
-
|
|
|
- update_readers(on_poll_start())
|
|
|
- if readers or writers:
|
|
|
- connection.more_to_read = True
|
|
|
- while connection.more_to_read:
|
|
|
- try:
|
|
|
- events = poll(poll_timeout)
|
|
|
- except ValueError: # Issue 882
|
|
|
- return
|
|
|
- if not events:
|
|
|
- on_poll_empty()
|
|
|
- for fileno, event in events or ():
|
|
|
- try:
|
|
|
- if event & READ:
|
|
|
- readers[fileno](fileno, event)
|
|
|
- if event & WRITE:
|
|
|
- writers[fileno](fileno, event)
|
|
|
- if event & ERR:
|
|
|
- for handlermap in readers, writers:
|
|
|
- try:
|
|
|
- handlermap[fileno](fileno, event)
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- except (KeyError, Empty):
|
|
|
- continue
|
|
|
- except socket.error:
|
|
|
- if self._state != CLOSE: # pragma: no cover
|
|
|
- raise
|
|
|
- if keep_draining:
|
|
|
- drain_nowait()
|
|
|
- poll_timeout = 0
|
|
|
- else:
|
|
|
- connection.more_to_read = False
|
|
|
- else:
|
|
|
- # no sockets yet, startup is probably not done.
|
|
|
- sleep(min(poll_timeout, 0.1))
|
|
|
-
|
|
|
- def on_task(self, task, task_reserved=task_reserved):
|
|
|
- """Handle received task.
|
|
|
-
|
|
|
- If the task has an `eta` we enter it into the ETA schedule,
|
|
|
- otherwise we move it the ready queue for immediate processing.
|
|
|
-
|
|
|
- """
|
|
|
- if task.revoked():
|
|
|
- return
|
|
|
-
|
|
|
- if self._does_info:
|
|
|
- info('Got task from broker: %s', task)
|
|
|
-
|
|
|
- if self.event_dispatcher.enabled:
|
|
|
- self.event_dispatcher.send('task-received', uuid=task.id,
|
|
|
- name=task.name, args=safe_repr(task.args),
|
|
|
- kwargs=safe_repr(task.kwargs),
|
|
|
- retries=task.request_dict.get('retries', 0),
|
|
|
- eta=task.eta and task.eta.isoformat(),
|
|
|
- expires=task.expires and task.expires.isoformat())
|
|
|
-
|
|
|
- if task.eta:
|
|
|
- try:
|
|
|
- eta = timer2.to_timestamp(task.eta)
|
|
|
- except OverflowError as exc:
|
|
|
- error("Couldn't convert eta %s to timestamp: %r. Task: %r",
|
|
|
- task.eta, exc, task.info(safe=True), exc_info=True)
|
|
|
- task.acknowledge()
|
|
|
- else:
|
|
|
- self.qos.increment_eventually()
|
|
|
- self.timer.apply_at(eta, self.apply_eta_task, (task, ),
|
|
|
- priority=6)
|
|
|
- else:
|
|
|
- task_reserved(task)
|
|
|
- self._quick_put(task)
|
|
|
-
|
|
|
- def on_control(self, body, message):
|
|
|
- """Process remote control command message."""
|
|
|
- try:
|
|
|
- self.pidbox_node.handle_message(body, message)
|
|
|
- except KeyError as exc:
|
|
|
- error('No such control command: %s', exc)
|
|
|
- except Exception as exc:
|
|
|
- error('Control command error: %r', exc, exc_info=True)
|
|
|
- self.reset_pidbox_node()
|
|
|
+ maybe_shutdown()
|
|
|
+ if ns.state != CLOSE and self.connection:
|
|
|
+ error(CONNECTION_RETRY, exc_info=True)
|
|
|
+ ns.restart(self)
|
|
|
|
|
|
def add_actor(self, actor_name, actor_id):
|
|
|
"""Add actor to the actor registry and start the actor main method"""
|
|
|
try:
|
|
|
- actor = instantiate(actor_name, connection = self.connection,
|
|
|
+ actor = instantiate(actor_name, connection = self.connection,
|
|
|
id = actor_id)
|
|
|
consumer = actor.Consumer(self.connection.channel())
|
|
|
consumer.consume()
|
|
@@ -547,132 +192,46 @@ class Consumer(object):
|
|
|
return actor.id
|
|
|
except Exception as exc:
|
|
|
error('Start actor error: %r', exc, exc_info=True)
|
|
|
-
|
|
|
+
|
|
|
def stop_all_actors(self):
|
|
|
for _, consumer in self.actor_registry.items():
|
|
|
self.maybe_conn_error(consumer.cancel)
|
|
|
self.actor_registry.clear()
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
def reset_actor_nodes(self):
|
|
|
for _, consumer in self.actor_registry.items():
|
|
|
self.maybe_conn_error(consumer.cancel)
|
|
|
consumer.consume()
|
|
|
-
|
|
|
+
|
|
|
def stop_actor(self, actor_id):
|
|
|
if actor_id in self.actor_registry:
|
|
|
consumer = self.actor_registry.pop(actor_id)
|
|
|
self.maybe_conn_error(consumer.cancel)
|
|
|
-
|
|
|
- def apply_eta_task(self, task):
|
|
|
- """Method called by the timer to apply a task with an
|
|
|
- ETA/countdown."""
|
|
|
- task_reserved(task)
|
|
|
- self._quick_put(task)
|
|
|
- self.qos.decrement_eventually()
|
|
|
-
|
|
|
- def _message_report(self, body, message):
|
|
|
- return MESSAGE_REPORT_FMT.format(dump_body(message, body),
|
|
|
- safe_repr(message.content_type),
|
|
|
- safe_repr(message.content_encoding),
|
|
|
- safe_repr(message.delivery_info))
|
|
|
-
|
|
|
- def handle_unknown_message(self, body, message):
|
|
|
- warn(UNKNOWN_FORMAT, self._message_report(body, message))
|
|
|
- message.reject_log_error(logger, self.connection_errors)
|
|
|
-
|
|
|
- def handle_unknown_task(self, body, message, exc):
|
|
|
- error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
- message.reject_log_error(logger, self.connection_errors)
|
|
|
|
|
|
- def handle_invalid_task(self, body, message, exc):
|
|
|
- error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
- message.reject_log_error(logger, self.connection_errors)
|
|
|
-
|
|
|
- def receive_message(self, body, message):
|
|
|
- """Handles incoming messages.
|
|
|
-
|
|
|
- :param body: The message body.
|
|
|
- :param message: The kombu message object.
|
|
|
+ def shutdown(self):
|
|
|
+ self.namespace.shutdown(self)
|
|
|
|
|
|
- """
|
|
|
- try:
|
|
|
- name = body['task']
|
|
|
- except (KeyError, TypeError):
|
|
|
- return self.handle_unknown_message(body, message)
|
|
|
-
|
|
|
- try:
|
|
|
- self.strategies[name](message, body, message.ack_log_error)
|
|
|
- except KeyError as exc:
|
|
|
- self.handle_unknown_task(body, message, exc)
|
|
|
- except InvalidTaskError as exc:
|
|
|
- self.handle_invalid_task(body, message, exc)
|
|
|
-
|
|
|
- def maybe_conn_error(self, fun):
|
|
|
- """Applies function but ignores any connection or channel
|
|
|
- errors raised."""
|
|
|
- try:
|
|
|
- fun()
|
|
|
- except (AttributeError, ) + \
|
|
|
- self.connection_errors + \
|
|
|
- self.channel_errors:
|
|
|
- pass
|
|
|
-
|
|
|
- def close_connection(self):
|
|
|
- """Closes the current broker connection and all open channels."""
|
|
|
-
|
|
|
-
|
|
|
- # We must set self.connection to None here, so
|
|
|
- # that the green pidbox thread exits.
|
|
|
- connection, self.connection = self.connection, None
|
|
|
-
|
|
|
- if self.task_consumer:
|
|
|
- debug('Closing consumer channel...')
|
|
|
- self.task_consumer = \
|
|
|
- self.maybe_conn_error(self.task_consumer.close)
|
|
|
-
|
|
|
- self.stop_pidbox_node()
|
|
|
- self.stop_all_actors()
|
|
|
-
|
|
|
- [callback() for callback in self.on_close_connection]
|
|
|
-
|
|
|
- if connection:
|
|
|
- debug('Closing broker connection...')
|
|
|
- self.maybe_conn_error(connection.close)
|
|
|
+ def stop(self):
|
|
|
+ self.namespace.stop(self)
|
|
|
|
|
|
- def stop_consumers(self, close_connection=True):
|
|
|
- """Stop consuming tasks and broadcast commands, also stops
|
|
|
- the heartbeat thread and event dispatcher.
|
|
|
+ def on_start(self):
|
|
|
+ self.update_strategies()
|
|
|
|
|
|
- :keyword close_connection: Set to False to skip closing the broker
|
|
|
- connection.
|
|
|
+ def on_ready(self):
|
|
|
+ callback, self.init_callback = self.init_callback, None
|
|
|
+ if callback:
|
|
|
+ callback(self)
|
|
|
|
|
|
- """
|
|
|
- if not self._state == RUN:
|
|
|
- return
|
|
|
+ def loop_args(self):
|
|
|
+ return (self, self.connection, self.task_consumer,
|
|
|
+ self.strategies, self.namespace, self.hub, self.qos,
|
|
|
+ self.amqheartbeat, self.handle_unknown_message,
|
|
|
+ self.handle_unknown_task, self.handle_invalid_task)
|
|
|
|
|
|
- if self.heart:
|
|
|
- # Stop the heartbeat thread if it's running.
|
|
|
- debug('Heart: Going into cardiac arrest...')
|
|
|
- self.heart = self.heart.stop()
|
|
|
-
|
|
|
- debug('Cancelling task consumer...')
|
|
|
- if self.task_consumer:
|
|
|
- self.maybe_conn_error(self.task_consumer.cancel)
|
|
|
-
|
|
|
- if self.event_dispatcher:
|
|
|
- debug('Shutting down event dispatcher...')
|
|
|
- self.event_dispatcher = \
|
|
|
- self.maybe_conn_error(self.event_dispatcher.close)
|
|
|
-
|
|
|
- self.stop_all_actors()
|
|
|
-
|
|
|
- debug('Cancelling broadcast consumer...')
|
|
|
- if self.broadcast_consumer:
|
|
|
- self.maybe_conn_error(self.broadcast_consumer.cancel)
|
|
|
-
|
|
|
- if close_connection:
|
|
|
- self.close_connection()
|
|
|
+ def on_poll_init(self, hub):
|
|
|
+ hub.update_readers(self.connection.eventmap)
|
|
|
+ self.connection.transport.on_poll_init(hub.poller)
|
|
|
|
|
|
def on_decode_error(self, message, exc):
|
|
|
"""Callback called if an error occurs while decoding
|
|
@@ -690,127 +249,32 @@ class Consumer(object):
|
|
|
dump_body(message, message.body))
|
|
|
message.ack()
|
|
|
|
|
|
- def reset_pidbox_node(self):
|
|
|
- """Sets up the process mailbox."""
|
|
|
- self.stop_pidbox_node()
|
|
|
- # close previously opened channel if any.
|
|
|
- if self.pidbox_node.channel:
|
|
|
- try:
|
|
|
- self.pidbox_node.channel.close()
|
|
|
- except self.connection_errors + self.channel_errors:
|
|
|
- pass
|
|
|
-
|
|
|
- if self.pool is not None and self.pool.is_green:
|
|
|
- return self.pool.spawn_n(self._green_pidbox_node)
|
|
|
- self.pidbox_node.channel = self.connection.channel()
|
|
|
- self.broadcast_consumer = self.pidbox_node.listen(
|
|
|
- callback=self.on_control)
|
|
|
-
|
|
|
- def stop_pidbox_node(self):
|
|
|
- if self._pidbox_node_stopped:
|
|
|
- self._pidbox_node_shutdown.set()
|
|
|
- debug('Waiting for broadcast thread to shutdown...')
|
|
|
- self._pidbox_node_stopped.wait()
|
|
|
- self._pidbox_node_stopped = self._pidbox_node_shutdown = None
|
|
|
- elif self.broadcast_consumer:
|
|
|
- debug('Closing broadcast channel...')
|
|
|
- self.broadcast_consumer = \
|
|
|
- self.maybe_conn_error(self.broadcast_consumer.channel.close)
|
|
|
-
|
|
|
- def _green_pidbox_node(self):
|
|
|
- """Sets up the process mailbox when running in a greenlet
|
|
|
- environment."""
|
|
|
- # THIS CODE IS TERRIBLE
|
|
|
- # Luckily work has already started rewriting the Consumer for 4.0.
|
|
|
- self._pidbox_node_shutdown = threading.Event()
|
|
|
- self._pidbox_node_stopped = threading.Event()
|
|
|
- try:
|
|
|
- with self._open_connection() as conn:
|
|
|
- self.pidbox_node.channel = conn.default_channel
|
|
|
- self.broadcast_consumer = self.pidbox_node.listen(
|
|
|
- callback=self.on_control)
|
|
|
- with self.broadcast_consumer:
|
|
|
- while not self._pidbox_node_shutdown.isSet():
|
|
|
- try:
|
|
|
- conn.drain_events(timeout=1.0)
|
|
|
- except socket.timeout:
|
|
|
- pass
|
|
|
- finally:
|
|
|
- self._pidbox_node_stopped.set()
|
|
|
-
|
|
|
- def reset_connection(self):
|
|
|
- """Re-establish the broker connection and set up consumers,
|
|
|
- heartbeat and the event dispatcher."""
|
|
|
- debug('Re-establishing connection to the broker...')
|
|
|
- self.stop_consumers()
|
|
|
-
|
|
|
+ def on_close(self):
|
|
|
# Clear internal queues to get rid of old messages.
|
|
|
# They can't be acked anyway, as a delivery tag is specific
|
|
|
# to the current channel.
|
|
|
self.ready_queue.clear()
|
|
|
self.timer.clear()
|
|
|
|
|
|
- # Re-establish the broker connection and setup the task consumer.
|
|
|
- self.connection = self._open_connection()
|
|
|
- debug('Connection established.')
|
|
|
- self.task_consumer = self.app.amqp.TaskConsumer(self.connection,
|
|
|
- on_decode_error=self.on_decode_error)
|
|
|
- self.reset_actor_nodes()
|
|
|
- # QoS: Reset prefetch window.
|
|
|
- self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
|
|
|
- self.qos.update()
|
|
|
-
|
|
|
- # Setup the process mailbox.
|
|
|
- self.reset_pidbox_node()
|
|
|
-
|
|
|
- # Flush events sent while connection was down.
|
|
|
- prev_event_dispatcher = self.event_dispatcher
|
|
|
- self.event_dispatcher = self.app.events.Dispatcher(self.connection,
|
|
|
- hostname=self.hostname,
|
|
|
- enabled=self.send_events)
|
|
|
- if prev_event_dispatcher:
|
|
|
- self.event_dispatcher.copy_buffer(prev_event_dispatcher)
|
|
|
- self.event_dispatcher.flush()
|
|
|
-
|
|
|
- # Restart heartbeat thread.
|
|
|
- self.restart_heartbeat()
|
|
|
-
|
|
|
- # reload all task's execution strategies.
|
|
|
- self.update_strategies()
|
|
|
-
|
|
|
- # We're back!
|
|
|
- self._state = RUN
|
|
|
-
|
|
|
- for callback in self.on_reset_connection:
|
|
|
- callback(self.connection)
|
|
|
-
|
|
|
- def restart_heartbeat(self):
|
|
|
- """Restart the heartbeat thread.
|
|
|
-
|
|
|
- This thread sends heartbeat events at intervals so monitors
|
|
|
- can tell if the worker is off-line/missing.
|
|
|
-
|
|
|
- """
|
|
|
- self.heart = Heart(self.timer, self.event_dispatcher)
|
|
|
- self.heart.start()
|
|
|
-
|
|
|
- def _open_connection(self):
|
|
|
+ def connect(self):
|
|
|
"""Establish the broker connection.
|
|
|
|
|
|
Will retry establishing the connection if the
|
|
|
:setting:`BROKER_CONNECTION_RETRY` setting is enabled
|
|
|
|
|
|
"""
|
|
|
+ conn = self.app.connection(heartbeat=self.amqheartbeat)
|
|
|
|
|
|
# Callback called for each retry while the connection
|
|
|
# can't be established.
|
|
|
- def _error_handler(exc, interval):
|
|
|
- error('Consumer: Connection Error: %s. '
|
|
|
- 'Trying again in %d seconds...', exc, interval)
|
|
|
+ def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
|
|
|
+ if getattr(conn, 'alt', None) and interval == 0:
|
|
|
+ next_step = CONNECTION_FAILOVER
|
|
|
+ error(CONNECTION_ERROR, conn.as_uri(), exc,
|
|
|
+ next_step.format(when=humanize_seconds(interval, 'in', ' ')))
|
|
|
|
|
|
# remember that the connection is lazy, it won't establish
|
|
|
# until it's needed.
|
|
|
- conn = self.app.connection(heartbeat=self.amqheartbeat)
|
|
|
if not self.app.conf.BROKER_CONNECTION_RETRY:
|
|
|
# retry disabled, just call connect directly.
|
|
|
conn.connect()
|
|
@@ -818,30 +282,7 @@ class Consumer(object):
|
|
|
|
|
|
return conn.ensure_connection(_error_handler,
|
|
|
self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
|
|
|
- callback=self.maybe_shutdown)
|
|
|
-
|
|
|
- def stop(self):
|
|
|
- """Stop consuming.
|
|
|
-
|
|
|
- Does not close the broker connection, so be sure to call
|
|
|
- :meth:`close_connection` when you are finished with it.
|
|
|
-
|
|
|
- """
|
|
|
- # Notifies other threads that this instance can't be used
|
|
|
- # anymore.
|
|
|
- self.close()
|
|
|
- debug('Stopping consumers...')
|
|
|
-
|
|
|
- self.stop_consumers(close_connection=False)
|
|
|
-
|
|
|
- def close(self):
|
|
|
- self._state = CLOSE
|
|
|
-
|
|
|
- def maybe_shutdown(self):
|
|
|
- if state.should_stop:
|
|
|
- raise SystemExit()
|
|
|
- elif state.should_terminate:
|
|
|
- raise SystemTerminate()
|
|
|
+ callback=maybe_shutdown)
|
|
|
|
|
|
def add_task_queue(self, queue, exchange=None, exchange_type=None,
|
|
|
routing_key=None, **options):
|
|
@@ -859,7 +300,7 @@ class Consumer(object):
|
|
|
if not cset.consuming_from(queue):
|
|
|
cset.add_queue(q)
|
|
|
cset.consume()
|
|
|
- logger.info('Started consuming from %r', queue)
|
|
|
+ info('Started consuming from %r', queue)
|
|
|
|
|
|
def cancel_task_queue(self, queue):
|
|
|
self.app.amqp.queues.select_remove(queue)
|
|
@@ -880,24 +321,172 @@ class Consumer(object):
|
|
|
conninfo.pop('password', None) # don't send password.
|
|
|
return {'broker': conninfo, 'prefetch_count': self.qos.value}
|
|
|
|
|
|
+ def on_task(self, task, task_reserved=task_reserved):
|
|
|
+ """Handle received task.
|
|
|
+
|
|
|
+ If the task has an `eta` we enter it into the ETA schedule,
|
|
|
+ otherwise we move it the ready queue for immediate processing.
|
|
|
|
|
|
-class BlockingConsumer(Consumer):
|
|
|
+ """
|
|
|
+ if task.revoked():
|
|
|
+ return
|
|
|
|
|
|
- def consume_messages(self):
|
|
|
- # receive_message handles incoming messages.
|
|
|
- self.task_consumer.register_callback(self.receive_message)
|
|
|
- self.task_consumer.consume()
|
|
|
+ if self._does_info:
|
|
|
+ info('Got task from broker: %s', task)
|
|
|
|
|
|
- debug('Ready to accept tasks!')
|
|
|
+ if self.event_dispatcher.enabled:
|
|
|
+ self.event_dispatcher.send('task-received', uuid=task.id,
|
|
|
+ name=task.name, args=safe_repr(task.args),
|
|
|
+ kwargs=safe_repr(task.kwargs),
|
|
|
+ retries=task.request_dict.get('retries', 0),
|
|
|
+ eta=task.eta and task.eta.isoformat(),
|
|
|
+ expires=task.expires and task.expires.isoformat())
|
|
|
|
|
|
- while self._state != CLOSE and self.connection:
|
|
|
- self.maybe_shutdown()
|
|
|
- if self.qos.prev != self.qos.value: # pragma: no cover
|
|
|
- self.qos.update()
|
|
|
+ if task.eta:
|
|
|
+ eta = timezone.to_system(task.eta) if task.utc else task.eta
|
|
|
try:
|
|
|
- self.connection.drain_events(timeout=10.0)
|
|
|
- except socket.timeout:
|
|
|
- pass
|
|
|
- except socket.error:
|
|
|
- if self._state != CLOSE: # pragma: no cover
|
|
|
- raise
|
|
|
+ eta = to_timestamp(eta)
|
|
|
+ except OverflowError as exc:
|
|
|
+ error("Couldn't convert eta %s to timestamp: %r. Task: %r",
|
|
|
+ task.eta, exc, task.info(safe=True), exc_info=True)
|
|
|
+ task.acknowledge()
|
|
|
+ else:
|
|
|
+ self.qos.increment_eventually()
|
|
|
+ self.timer.apply_at(
|
|
|
+ eta, self.apply_eta_task, (task, ), priority=6,
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ task_reserved(task)
|
|
|
+ self._quick_put(task)
|
|
|
+
|
|
|
+ def apply_eta_task(self, task):
|
|
|
+ """Method called by the timer to apply a task with an
|
|
|
+ ETA/countdown."""
|
|
|
+ task_reserved(task)
|
|
|
+ self._quick_put(task)
|
|
|
+ self.qos.decrement_eventually()
|
|
|
+
|
|
|
+ def _message_report(self, body, message):
|
|
|
+ return MESSAGE_REPORT.format(dump_body(message, body),
|
|
|
+ safe_repr(message.content_type),
|
|
|
+ safe_repr(message.content_encoding),
|
|
|
+ safe_repr(message.delivery_info))
|
|
|
+
|
|
|
+ def handle_unknown_message(self, body, message):
|
|
|
+ warn(UNKNOWN_FORMAT, self._message_report(body, message))
|
|
|
+ message.reject_log_error(logger, self.connection_errors)
|
|
|
+
|
|
|
+ def handle_unknown_task(self, body, message, exc):
|
|
|
+ error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
+ message.reject_log_error(logger, self.connection_errors)
|
|
|
+
|
|
|
+ def handle_invalid_task(self, body, message, exc):
|
|
|
+ error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
+ message.reject_log_error(logger, self.connection_errors)
|
|
|
+
|
|
|
+ def update_strategies(self):
|
|
|
+ loader = self.app.loader
|
|
|
+ for name, task in self.app.tasks.iteritems():
|
|
|
+ self.strategies[name] = task.start_strategy(self.app, self)
|
|
|
+ task.__trace__ = build_tracer(name, task, loader, self.hostname)
|
|
|
+
|
|
|
+
|
|
|
+class Connection(bootsteps.StartStopStep):
|
|
|
+
|
|
|
+ def __init__(self, c, **kwargs):
|
|
|
+ c.connection = None
|
|
|
+
|
|
|
+ def start(self, c):
|
|
|
+ c.connection = c.connect()
|
|
|
+ info('Connected to %s', c.connection.as_uri())
|
|
|
+
|
|
|
+ def shutdown(self, c):
|
|
|
+ # We must set self.connection to None here, so
|
|
|
+ # that the green pidbox thread exits.
|
|
|
+ connection, c.connection = c.connection, None
|
|
|
+ if connection:
|
|
|
+ ignore_errors(connection, connection.close)
|
|
|
+
|
|
|
+
|
|
|
+class Events(bootsteps.StartStopStep):
|
|
|
+ requires = (Connection, )
|
|
|
+
|
|
|
+ def __init__(self, c, send_events=None, **kwargs):
|
|
|
+ self.send_events = send_events
|
|
|
+ c.event_dispatcher = None
|
|
|
+
|
|
|
+ def start(self, c):
|
|
|
+ # Flush events sent while connection was down.
|
|
|
+ prev = c.event_dispatcher
|
|
|
+ dis = c.event_dispatcher = c.app.events.Dispatcher(
|
|
|
+ c.connection, hostname=c.hostname, enabled=self.send_events,
|
|
|
+ )
|
|
|
+ if prev:
|
|
|
+ dis.copy_buffer(prev)
|
|
|
+ dis.flush()
|
|
|
+
|
|
|
+ def stop(self, c):
|
|
|
+ if c.event_dispatcher:
|
|
|
+ ignore_errors(c, c.event_dispatcher.close)
|
|
|
+ c.event_dispatcher = None
|
|
|
+ shutdown = stop
|
|
|
+
|
|
|
+
|
|
|
+class Heart(bootsteps.StartStopStep):
|
|
|
+ requires = (Events, )
|
|
|
+
|
|
|
+ def __init__(self, c, **kwargs):
|
|
|
+ c.heart = None
|
|
|
+
|
|
|
+ def start(self, c):
|
|
|
+ c.heart = heartbeat.Heart(c.timer, c.event_dispatcher)
|
|
|
+ c.heart.start()
|
|
|
+
|
|
|
+ def stop(self, c):
|
|
|
+ c.heart = c.heart and c.heart.stop()
|
|
|
+ shutdown = stop
|
|
|
+
|
|
|
+
|
|
|
+class Control(bootsteps.StartStopStep):
|
|
|
+ requires = (Events, )
|
|
|
+
|
|
|
+ def __init__(self, c, **kwargs):
|
|
|
+ self.is_green = c.pool is not None and c.pool.is_green
|
|
|
+ self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
|
|
|
+ self.start = self.box.start
|
|
|
+ self.stop = self.box.stop
|
|
|
+ self.shutdown = self.box.shutdown
|
|
|
+
|
|
|
+
|
|
|
+class Tasks(bootsteps.StartStopStep):
|
|
|
+ requires = (Control, )
|
|
|
+
|
|
|
+ def __init__(self, c, initial_prefetch_count=2, **kwargs):
|
|
|
+ c.task_consumer = c.qos = None
|
|
|
+ self.initial_prefetch_count = initial_prefetch_count
|
|
|
+
|
|
|
+ def start(self, c):
|
|
|
+ c.task_consumer = c.app.amqp.TaskConsumer(
|
|
|
+ c.connection, on_decode_error=c.on_decode_error,
|
|
|
+ )
|
|
|
+ c.qos = QoS(c.task_consumer, self.initial_prefetch_count)
|
|
|
+ c.qos.update() # set initial prefetch count
|
|
|
+
|
|
|
+ def stop(self, c):
|
|
|
+ if c.task_consumer:
|
|
|
+ debug('Cancelling task consumer...')
|
|
|
+ ignore_errors(c, c.task_consumer.cancel)
|
|
|
+
|
|
|
+ def shutdown(self, c):
|
|
|
+ if c.task_consumer:
|
|
|
+ self.stop(c)
|
|
|
+ debug('Closing consumer channel...')
|
|
|
+ ignore_errors(c, c.task_consumer.close)
|
|
|
+ c.task_consumer = None
|
|
|
+
|
|
|
+
|
|
|
+class Evloop(bootsteps.StartStopStep):
|
|
|
+ last = True
|
|
|
+
|
|
|
+ def start(self, c):
|
|
|
+ c.loop(*c.loop_args())
|