|
@@ -8,12 +8,418 @@
|
|
|
:local:
|
|
|
:depth: 2
|
|
|
|
|
|
+.. _extending-custom-consumers:
|
|
|
+
|
|
|
+Custom Message Consumers
|
|
|
+========================
|
|
|
+
|
|
|
+You may want to embed custom Kombu consumers to manually process your messages.
|
|
|
+
|
|
|
+For that purpose a special :class:`~celery.bootstep.ConsumerStep` bootstep class
|
|
|
+exists, where you only need to define the ``get_consumers`` method, which must
|
|
|
+return a list of :class:`kombu.Consumer` objects to start
|
|
|
+whenever the connection is established:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from celery import Celery
|
|
|
+ from celery import bootsteps
|
|
|
+ from kombu import Consumer, Exchange, Queue
|
|
|
+
|
|
|
+ my_queue = Queue('custom', Exchange('custom'), 'routing_key')
|
|
|
+
|
|
|
+ app = Celery(broker='amqp://')
|
|
|
+
|
|
|
+
|
|
|
+ class MyConsumerStep(bootsteps.ConsumerStep):
|
|
|
+
|
|
|
+ def get_consumers(self, channel):
|
|
|
+ return [Consumer(channel,
|
|
|
+ queues=[my_queue],
|
|
|
+ callbacks=[self.handle_message],
|
|
|
+ accept=['json'])]
|
|
|
+
|
|
|
+ def handle_message(self, body, message):
|
|
|
+ print('Received message: {0!r}'.format(body))
|
|
|
+ message.ack()
|
|
|
+ app.steps['consumer'].add(MyConsumerStep)
|
|
|
+
|
|
|
+ def send_me_a_message(self, who='world!', producer=None):
|
|
|
+ with app.producer_or_acquire(producer) as producer:
|
|
|
+ producer.send(
|
|
|
+ {'hello': who},
|
|
|
+ serializer='json',
|
|
|
+ exchange=my_queue.exchange,
|
|
|
+ routing_key='routing_key',
|
|
|
+ declare=[my_queue],
|
|
|
+ retry=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ if __name__ == '__main__':
|
|
|
+ send_me_a_message('celery')
|
|
|
+
|
|
|
+
|
|
|
+.. note::
|
|
|
+
|
|
|
+ Kombu Consumers can take use of two different message callback dispatching
|
|
|
+ mechanisms. The first one is the ``callbacks`` argument which accepts
|
|
|
+ a list of callbacks with a ``(body, message)`` signature,
|
|
|
+ the second one is the ``on_message`` argument which takes a single
|
|
|
+ callback with a ``(message, )`` signature. The latter will not
|
|
|
+ automatically decode and deserialize the payload which is useful
|
|
|
+ in many cases:
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
+
|
|
|
+ def get_consumers(self, channel):
|
|
|
+ return [Consumer(channel, queues=[my_queue],
|
|
|
+ on_message=self.on_message)]
|
|
|
+
|
|
|
+
|
|
|
+ def on_message(self, message):
|
|
|
+ payload = message.decode()
|
|
|
+ print(
|
|
|
+ 'Received message: {0!r} {props!r} rawlen={s}'.format(
|
|
|
+ payload, props=message.properties, s=len(message.body),
|
|
|
+ ))
|
|
|
+ message.ack()
|
|
|
+
|
|
|
+.. _extending-blueprints:
|
|
|
+
|
|
|
+Blueprints
|
|
|
+==========
|
|
|
+
|
|
|
+Bootsteps is a technique to add functionality to the workers.
|
|
|
+A bootstep is a custom class that defines hooks to do custom actions
|
|
|
+at different stages in the worker. Every bootstep belongs to a blueprint,
|
|
|
+and the worker currently defines two blueprints: **Worker**, and **Consumer**
|
|
|
+
|
|
|
+----------------------------------------------------------
|
|
|
+
|
|
|
+**Figure A:** Bootsteps in the Worker and Consumer blueprints. Starting
|
|
|
+ from the bottom up the first step in the worker blueprint
|
|
|
+ is the Timer, and the last step is to start the Consumer blueprint,
|
|
|
+ which then establishes the broker connection and starts
|
|
|
+ consuming messages.
|
|
|
+
|
|
|
+.. figure:: ../images/worker_graph_full.png
|
|
|
+
|
|
|
+----------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+Worker
|
|
|
+======
|
|
|
+
|
|
|
+The Worker is the first blueprint to start, and with it starts major components like
|
|
|
+the event loop, processing pool, the timer, and also optional components
|
|
|
+like the autoscaler. When the worker is fully started it will continue
|
|
|
+to the Consumer blueprint.
|
|
|
+
|
|
|
+The :class:`~celery.worker.WorkController` is the core worker implementation,
|
|
|
+and contains several methods and attributes that you can use in your bootstep.
|
|
|
+
|
|
|
+Attributes
|
|
|
+----------
|
|
|
+
|
|
|
+.. attribute:: app
|
|
|
+
|
|
|
+ The current app instance.
|
|
|
+
|
|
|
+.. attribute:: hostname
|
|
|
+
|
|
|
+ The workers node name (e.g. `worker1@example.com`)
|
|
|
+
|
|
|
+.. attribute:: blueprint
|
|
|
+
|
|
|
+ This is the worker :class:`~celery.bootsteps.Blueprint`.
|
|
|
+
|
|
|
+.. attribute:: hub
|
|
|
+
|
|
|
+ Event loop object (:class:`~celery.worker.hub.Hub`). You can use
|
|
|
+ this to register callbacks in the event loop.
|
|
|
+
|
|
|
+ This is only supported by async I/O enabled transports (amqp, redis),
|
|
|
+ in which case the `worker.use_eventloop` attribute should be set.
|
|
|
+
|
|
|
+ Your bootstep must require the Hub bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: pool
|
|
|
+
|
|
|
+ The current process/eventlet/gevent/thread pool.
|
|
|
+ See :class:`celery.concurrency.base.BasePool`.
|
|
|
+
|
|
|
+ Your bootstep must require the Pool bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: timer
|
|
|
+
|
|
|
+ :class:`Timer <celery.utils.timer2.Schedule` used to schedule functions.
|
|
|
+
|
|
|
+ Your bootstep must require the Timer bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: statedb
|
|
|
+
|
|
|
+ :class:`Database <celery.worker.state.Persistent>`` to persist state between
|
|
|
+ worker restarts.
|
|
|
+
|
|
|
+ This only exists if the ``statedb`` argument is enabled.
|
|
|
+ Your bootstep must require the Statedb bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: autoscaler
|
|
|
+
|
|
|
+ :class:`~celery.worker.autoscaler.Autoscaler` used to automatically grow
|
|
|
+ and shrink the number of processes in the pool.
|
|
|
+
|
|
|
+ This only exists if the ``autoscale`` argument is enabled.
|
|
|
+ Your bootstep must require the Autoscaler bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: autoreloader
|
|
|
+
|
|
|
+ :class:`~celery.worker.autoreloder.Autoreloader` used to automatically
|
|
|
+ reload use code when the filesystem changes.
|
|
|
+
|
|
|
+ This only exists if the ``autoreload`` argument is enabled.
|
|
|
+ Your bootstep must require the Autoreloader bootstep to use this.
|
|
|
+
|
|
|
+
|
|
|
+An example Worker bootstep could be:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from celery import bootsteps
|
|
|
+
|
|
|
+ class ExampleWorkerStep(bootsteps.StartStopStep):
|
|
|
+ requires = ('Pool', )
|
|
|
+
|
|
|
+ def __init__(self, worker, **kwargs):
|
|
|
+ print('Called when the WorkController instance is constructed')
|
|
|
+ print('Arguments to WorkController: {0!r}'.format(kwargs))
|
|
|
+
|
|
|
+ def create(self, worker):
|
|
|
+ # this method can be used to delegate the action methods
|
|
|
+ # to another object that implements ``start`` and ``stop``.
|
|
|
+ return self
|
|
|
+
|
|
|
+ def start(self, worker):
|
|
|
+ print('Called when the worker is started.')
|
|
|
+
|
|
|
+ def stop(self, worker):
|
|
|
+ print("Called when the worker shuts down.")
|
|
|
+
|
|
|
+ def terminate(self, worker):
|
|
|
+ print("Called when the worker terminates")
|
|
|
+
|
|
|
+
|
|
|
+Every method is passed the current ``WorkController`` instance as the first
|
|
|
+argument.
|
|
|
+
|
|
|
+
|
|
|
+Another example could use the timer to wake up at regular intervals:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from celery import bootsteps
|
|
|
+
|
|
|
+
|
|
|
+ class DeadlockDetection(bootsteps.StartStopStep):
|
|
|
+ requires = ('Timer', )
|
|
|
+
|
|
|
+ def __init__(self, worker, deadlock_timeout=3600):
|
|
|
+ self.timeout = deadlock_timeout
|
|
|
+ self.requests = []
|
|
|
+ self.tref = None
|
|
|
+
|
|
|
+ def start(self, worker):
|
|
|
+ # run every 30 seconds.
|
|
|
+ self.tref = worker.timer.apply_interval(
|
|
|
+ 30000.0, self.detect, (worker, ),
|
|
|
+ )
|
|
|
+
|
|
|
+ def stop(self, worker):
|
|
|
+ if self.tref:
|
|
|
+ self.tref.cancel()
|
|
|
+ self.tref = None
|
|
|
+
|
|
|
+ def detect(self, worker):
|
|
|
+ # update active requests
|
|
|
+ for req in self.worker.active_requests:
|
|
|
+ if req.time_start and time() - req.time_start > self.timeout:
|
|
|
+ raise SystemExit()
|
|
|
+
|
|
|
+Consumer
|
|
|
+========
|
|
|
+
|
|
|
+The Consumer blueprint establishes a connection to the broker, and
|
|
|
+is restarted every time this connection is lost. Consumer bootsteps
|
|
|
+include the worker heartbeat, the remote control command consumer, and
|
|
|
+importantly, the task consumer.
|
|
|
+
|
|
|
+When you create consumer bootsteps you must take into account that it must
|
|
|
+be possible to restart your blueprint. An additional 'shutdown' method is
|
|
|
+defined for consumer bootsteps, this method is called when the worker is
|
|
|
+shutdown.
|
|
|
+
|
|
|
+Attributes
|
|
|
+----------
|
|
|
+
|
|
|
+.. attribute:: app
|
|
|
+
|
|
|
+ The current app instance.
|
|
|
+
|
|
|
+.. attribute:: controller
|
|
|
+
|
|
|
+ The parent :class:`~@WorkController` object that created this consumer.
|
|
|
+
|
|
|
+.. attribute:: hostname
|
|
|
+
|
|
|
+ The workers node name (e.g. `worker1@example.com`)
|
|
|
+
|
|
|
+.. attribute:: blueprint
|
|
|
+
|
|
|
+ This is the worker :class:`~celery.bootsteps.Blueprint`.
|
|
|
+
|
|
|
+.. attribute:: hub
|
|
|
+
|
|
|
+ Event loop object (:class:`~celery.worker.hub.Hub`). You can use
|
|
|
+ this to register callbacks in the event loop.
|
|
|
+
|
|
|
+ This is only supported by async I/O enabled transports (amqp, redis),
|
|
|
+ in which case the `worker.use_eventloop` attribute should be set.
|
|
|
+
|
|
|
+ Your bootstep must require the Hub bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: connection
|
|
|
+
|
|
|
+ The current broker connection (:class:`kombu.Connection`).
|
|
|
+
|
|
|
+ Your bootstep must require the 'Connection' bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: event_dispatcher
|
|
|
+
|
|
|
+ A :class:`@events.Dispatcher` object that can be used to send events.
|
|
|
+
|
|
|
+ Your bootstep must require the `Events` bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: gossip
|
|
|
+
|
|
|
+ Worker to worker broadcast communication
|
|
|
+ (class:`~celery.worker.consumer.Gossip`).
|
|
|
+
|
|
|
+.. attribute:: pool
|
|
|
+
|
|
|
+ The current process/eventlet/gevent/thread pool.
|
|
|
+ See :class:`celery.concurrency.base.BasePool`.
|
|
|
+
|
|
|
+.. attribute:: timer
|
|
|
+
|
|
|
+ :class:`Timer <celery.utils.timer2.Schedule` used to schedule functions.
|
|
|
+
|
|
|
+.. attribute:: heart
|
|
|
+
|
|
|
+ Responsible for sending worker event heartbeats
|
|
|
+ (:class:`~celery.worker.heartbeat.Heart`).
|
|
|
+
|
|
|
+ Your bootstep must require the `Heart` bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: task_consumer
|
|
|
+
|
|
|
+ The :class:`kombu.Consumer` object used to consume task messages.
|
|
|
+
|
|
|
+ Your bootstep must require the `Tasks` bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: strategies
|
|
|
+
|
|
|
+ Every registered task type has an entry in this mapping,
|
|
|
+ where the value is used to execute an incoming message of this task type
|
|
|
+ (the task execution strategy). This mapping is generated by the Tasks
|
|
|
+ bootstep when the consumer starts::
|
|
|
+
|
|
|
+ for name, task in app.tasks.items():
|
|
|
+ strategies[name] = task.start_strategy(app, consumer)
|
|
|
+ task.__trace__ = celery.app.trace.build_tracer(
|
|
|
+ name, task, loader, hostname
|
|
|
+ )
|
|
|
+
|
|
|
+ Your bootstep must require the `Tasks` bootstep to use this.
|
|
|
+
|
|
|
+.. attribute:: task_buckets
|
|
|
+
|
|
|
+ A :class:`~collections.defaultdict` used to lookup the rate limit for
|
|
|
+ a task by type.
|
|
|
+ Entries in this dict may be None (for no limit) or a
|
|
|
+ :class:`~kombu.utils.limits.TokenBucket` instance implementing
|
|
|
+ ``consume(tokens)`` and ``expected_time(tokens)``.
|
|
|
+
|
|
|
+ TokenBucket implements the `token bucket algorithm`_, but any algorithm
|
|
|
+ may be used as long as it conforms to the same interface and defines the
|
|
|
+ two methods above.
|
|
|
+
|
|
|
+ .. _`token bucket algorithm`: http://en.wikipedia.org/wiki/Token_bucket
|
|
|
+
|
|
|
+.. attribute:: qos
|
|
|
+
|
|
|
+ The :class:`~kombu.common.QoS` object can be used to change the
|
|
|
+ task channels current prefetch_count value, e.g::
|
|
|
+
|
|
|
+ # increment at next cycle
|
|
|
+ consumer.qos.increment_eventually(1)
|
|
|
+ # decrement at next cycle
|
|
|
+ consumer.qos.decrement_eventually(1)
|
|
|
+ consumer.qos.set(10)
|
|
|
+
|
|
|
+
|
|
|
+Methods
|
|
|
+-------
|
|
|
+
|
|
|
+.. method:: consumer.reset_rate_limits()
|
|
|
+
|
|
|
+ Updates the ``task_buckets`` mapping for all registered task types.
|
|
|
+
|
|
|
+.. method:: consumer.bucket_for_task(type, Bucket=TokenBucket)
|
|
|
+
|
|
|
+ Creates rate limit bucket for a task using its ``task.rate_limit``
|
|
|
+ attribute.
|
|
|
+
|
|
|
+.. method:: consumer.add_task_queue(name, exchange=None, exchange_type=None,
|
|
|
+ routing_key=None, \*\*options):
|
|
|
+
|
|
|
+ Adds new queue to consume from. This will persist on connection restart.
|
|
|
+
|
|
|
+.. method:: consumer.cancel_task_queue(name)
|
|
|
+
|
|
|
+ Stop consuming from queue by name. This will persist on connection
|
|
|
+ restart.
|
|
|
+
|
|
|
+.. method:: apply_eta_task(request)
|
|
|
+
|
|
|
+ Schedule eta task to execute based on the ``request.eta`` attribute.
|
|
|
+ (:class:`~celery.worker.job.Request`)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
.. _extending-bootsteps:
|
|
|
|
|
|
-Bootsteps
|
|
|
-=========
|
|
|
+Installing Bootsteps
|
|
|
+====================
|
|
|
+
|
|
|
+``app.steps['worker']`` and ``app.steps['consumer']`` can be modified
|
|
|
+to add new bootsteps::
|
|
|
+
|
|
|
+ >>> app = Celery()
|
|
|
+ >>> app.steps['worker'].add(MyWorkerStep) # < add class, do not instantiate
|
|
|
+ >>> app.steps['consumer'].add(MyConsumerStep)
|
|
|
+
|
|
|
+ >>> app.steps['consumer'].update([StepA, StepB])
|
|
|
+
|
|
|
+ >>> app.steps['consumer']
|
|
|
+ {step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}
|
|
|
+
|
|
|
+The order of steps is not important here as the order is decided by the
|
|
|
+resulting dependency graph (``Step.requires``).
|
|
|
+
|
|
|
+To illustrate how you can install bootsteps and how they work, this is an example step that
|
|
|
+prints some useless debugging information.
|
|
|
+It can be added both as a worker and consumer bootstep:
|
|
|
|
|
|
-Blahblah blah, example bootstep:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
@@ -119,22 +525,6 @@ information about the boot process::
|
|
|
[2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
|
|
|
[2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.
|
|
|
|
|
|
-.. figure:: ../images/worker_graph_full.png
|
|
|
-
|
|
|
-.. _extending-worker-bootsteps:
|
|
|
-
|
|
|
-Worker bootsteps
|
|
|
-----------------
|
|
|
-
|
|
|
-Blablah
|
|
|
-
|
|
|
-.. _extending-consumer-bootsteps:
|
|
|
-
|
|
|
-Consumer bootsteps
|
|
|
-------------------
|
|
|
-
|
|
|
-blahblah
|
|
|
-
|
|
|
|
|
|
.. _extending-programs:
|
|
|
|