| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801 | 
							- .. _guide-extending:
 
- ==========================
 
-  Extensions and Bootsteps
 
- ==========================
 
- .. contents::
 
-     :local:
 
-     :depth: 2
 
- .. _extending-custom-consumers:
 
- Custom Message Consumers
 
- ========================
 
- You may want to embed custom Kombu consumers to manually process your messages.
 
- For that purpose a special :class:`~celery.bootstep.ConsumerStep` bootstep class
 
- exists, where you only need to define the ``get_consumers`` method, which must
 
- return a list of :class:`kombu.Consumer` objects to start
 
- whenever the connection is established:
 
- .. code-block:: python
 
-     from celery import Celery
 
-     from celery import bootsteps
 
-     from kombu import Consumer, Exchange, Queue
 
-     my_queue = Queue('custom', Exchange('custom'), 'routing_key')
 
-     app = Celery(broker='amqp://')
 
-     class MyConsumerStep(bootsteps.ConsumerStep):
 
-         def get_consumers(self, channel):
 
-             return [Consumer(channel,
 
-                              queues=[my_queue],
 
-                              callbacks=[self.handle_message],
 
-                              accept=['json'])]
 
-         def handle_message(self, body, message):
 
-             print('Received message: {0!r}'.format(body))
 
-             message.ack()
 
-     app.steps['consumer'].add(MyConsumerStep)
 
-     def send_me_a_message(self, who='world!', producer=None):
 
-         with app.producer_or_acquire(producer) as producer:
 
-             producer.send(
 
-                 {'hello': who},
 
-                 serializer='json',
 
-                 exchange=my_queue.exchange,
 
-                 routing_key='routing_key',
 
-                 declare=[my_queue],
 
-                 retry=True,
 
-             )
 
-     if __name__ == '__main__':
 
-         send_me_a_message('celery')
 
- .. note::
 
-     Kombu Consumers can take use of two different message callback dispatching
 
-     mechanisms.  The first one is the ``callbacks`` argument which accepts
 
-     a list of callbacks with a ``(body, message)`` signature,
 
-     the second one is the ``on_message`` argument which takes a single
 
-     callback with a ``(message, )`` signature.  The latter will not
 
-     automatically decode and deserialize the payload which is useful
 
-     in many cases:
 
-     .. code-block:: python
 
-         def get_consumers(self, channel):
 
-             return [Consumer(channel, queues=[my_queue],
 
-                              on_message=self.on_message)]
 
-         def on_message(self, message):
 
-             payload = message.decode()
 
-             print(
 
-                 'Received message: {0!r} {props!r} rawlen={s}'.format(
 
-                 payload, props=message.properties, s=len(message.body),
 
-             ))
 
-             message.ack()
 
- .. _extending-blueprints:
 
- Blueprints
 
- ==========
 
- Bootsteps is a technique to add functionality to the workers.
 
- A bootstep is a custom class that defines hooks to do custom actions
 
- at different stages in the worker.   Every bootstep belongs to a blueprint,
 
- and the worker currently defines two blueprints: **Worker**, and **Consumer**
 
- ----------------------------------------------------------
 
- **Figure A:** Bootsteps in the Worker and Consumer blueprints.  Starting
 
-               from the bottom up the first step in the worker blueprint
 
-               is the Timer, and the last step is to start the Consumer blueprint,
 
-               which then establishes the broker connection and starts
 
-               consuming messages.
 
- .. figure:: ../images/worker_graph_full.png
 
- ----------------------------------------------------------
 
- Worker
 
- ======
 
- The Worker is the first blueprint to start, and with it starts major components like
 
- the event loop, processing pool, the timer, and also optional components
 
- like the autoscaler.  When the worker is fully started it will continue
 
- to the Consumer blueprint.
 
- The :class:`~celery.worker.WorkController` is the core worker implementation,
 
- and contains several methods and attributes that you can use in your bootstep.
 
- Attributes
 
- ----------
 
- .. attribute:: app
 
-     The current app instance.
 
- .. attribute:: hostname
 
-     The workers node name (e.g. `worker1@example.com`)
 
- .. attribute:: blueprint
 
-     This is the worker :class:`~celery.bootsteps.Blueprint`.
 
- .. attribute:: hub
 
-     Event loop object (:class:`~kombu.async.Hub`).  You can use
 
-     this to register callbacks in the event loop.
 
-     This is only supported by async I/O enabled transports (amqp, redis),
 
-     in which case the `worker.use_eventloop` attribute should be set.
 
-     Your worker bootstep must require the Hub bootstep to use this:
 
-     .. code-block:: python
 
-         class WorkerStep(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.components:Hub', )
 
- .. attribute:: pool
 
-     The current process/eventlet/gevent/thread pool.
 
-     See :class:`celery.concurrency.base.BasePool`.
 
-     Your worker bootstep must require the Pool bootstep to use this:
 
-     .. code-block:: python
 
-         class WorkerStep(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.components:Pool', )
 
- .. attribute:: timer
 
-     :class:`~kombu.async.timer.Timer` used to schedule functions.
 
-     Your worker bootstep must require the Timer bootstep to use this:
 
-     .. code-block:: python
 
-         class WorkerStep(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.components:Timer', )
 
- .. attribute:: statedb
 
-     :class:`Database <celery.worker.state.Persistent>`` to persist state between
 
-     worker restarts.
 
-     This is only defined if the ``statedb`` argument is enabled.
 
-     Your worker bootstep must require the Statedb bootstep to use this:
 
-     .. code-block:: python
 
-         class WorkerStep(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.components:Statedb', )
 
- .. attribute:: autoscaler
 
-     :class:`~celery.worker.autoscaler.Autoscaler` used to automatically grow
 
-     and shrink the number of processes in the pool.
 
-     This is only defined if the ``autoscale`` argument is enabled.
 
-     Your worker bootstep must require the `Autoscaler` bootstep to use this:
 
-     .. code-block:: python
 
-         class WorkerStep(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.autoscaler:Autoscaler', )
 
- .. attribute:: autoreloader
 
-     :class:`~celery.worker.autoreloder.Autoreloader` used to automatically
 
-     reload use code when the filesystem changes.
 
-     This is only defined if the ``autoreload`` argument is enabled.
 
-     Your worker bootstep must require the `Autoreloader` bootstep to use this;
 
-     .. code-block:: python
 
-         class WorkerStep(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.autoreloader:Autoreloader', )
 
- An example Worker bootstep could be:
 
- .. code-block:: python
 
-     from celery import bootsteps
 
-     class ExampleWorkerStep(bootsteps.StartStopStep):
 
-         requires = ('Pool', )
 
-         def __init__(self, worker, **kwargs):
 
-             print('Called when the WorkController instance is constructed')
 
-             print('Arguments to WorkController: {0!r}'.format(kwargs))
 
-         def create(self, worker):
 
-             # this method can be used to delegate the action methods
 
-             # to another object that implements ``start`` and ``stop``.
 
-             return self
 
-         def start(self, worker):
 
-             print('Called when the worker is started.')
 
-         def stop(self, worker):
 
-             print("Called when the worker shuts down.")
 
-         def terminate(self, worker):
 
-             print("Called when the worker terminates")
 
- Every method is passed the current ``WorkController`` instance as the first
 
- argument.
 
- Another example could use the timer to wake up at regular intervals:
 
- .. code-block:: python
 
-     from celery import bootsteps
 
-     class DeadlockDetection(bootsteps.StartStopStep):
 
-         requires = ('Timer', )
 
-         def __init__(self, worker, deadlock_timeout=3600):
 
-             self.timeout = deadlock_timeout
 
-             self.requests = []
 
-             self.tref = None
 
-         def start(self, worker):
 
-             # run every 30 seconds.
 
-             self.tref = worker.timer.call_repeatedly(
 
-                 30.0, self.detect, (worker, ), priority=10,
 
-             )
 
-         def stop(self, worker):
 
-             if self.tref:
 
-                 self.tref.cancel()
 
-                 self.tref = None
 
-         def detect(self, worker):
 
-             # update active requests
 
-             for req in self.worker.active_requests:
 
-                 if req.time_start and time() - req.time_start > self.timeout:
 
-                     raise SystemExit()
 
- Consumer
 
- ========
 
- The Consumer blueprint establishes a connection to the broker, and
 
- is restarted every time this connection is lost.   Consumer bootsteps
 
- include the worker heartbeat, the remote control command consumer, and
 
- importantly, the task consumer.
 
- When you create consumer bootsteps you must take into account that it must
 
- be possible to restart your blueprint.  An additional 'shutdown' method is
 
- defined for consumer bootsteps, this method is called when the worker is
 
- shutdown.
 
- Attributes
 
- ----------
 
- .. attribute:: app
 
-     The current app instance.
 
- .. attribute:: controller
 
-     The parent :class:`~@WorkController` object that created this consumer.
 
- .. attribute:: hostname
 
-     The workers node name (e.g. `worker1@example.com`)
 
- .. attribute:: blueprint
 
-     This is the worker :class:`~celery.bootsteps.Blueprint`.
 
- .. attribute:: hub
 
-     Event loop object (:class:`~kombu.async.Hub`).  You can use
 
-     this to register callbacks in the event loop.
 
-     This is only supported by async I/O enabled transports (amqp, redis),
 
-     in which case the `worker.use_eventloop` attribute should be set.
 
-     Your worker bootstep must require the Hub bootstep to use this:
 
-     .. code-block:: python
 
-         class WorkerStep(bootsteps.StartStopStep):
 
-             requires = ('celery.worker:Hub', )
 
- .. attribute:: connection
 
-     The current broker connection (:class:`kombu.Connection`).
 
-     A consumer bootstep must require the 'Connection' bootstep
 
-     to use this:
 
-     .. code-block:: python
 
-         class Step(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.consumer:Connection', )
 
- .. attribute:: event_dispatcher
 
-     A :class:`@events.Dispatcher` object that can be used to send events.
 
-     A consumer bootstep must require the `Events` bootstep to use this.
 
-     .. code-block:: python
 
-         class Step(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.consumer:Events', )
 
- .. attribute:: gossip
 
-     Worker to worker broadcast communication
 
-     (class:`~celery.worker.consumer.Gossip`).
 
-     A consumer bootstep must require the `Gossip` bootstep to use this.
 
-     .. code-block:: python
 
-         class Step(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.consumer:Events', )
 
- .. attribute:: pool
 
-     The current process/eventlet/gevent/thread pool.
 
-     See :class:`celery.concurrency.base.BasePool`.
 
- .. attribute:: timer
 
-     :class:`Timer <celery.utils.timer2.Schedule` used to schedule functions.
 
- .. attribute:: heart
 
-     Responsible for sending worker event heartbeats
 
-     (:class:`~celery.worker.heartbeat.Heart`).
 
-     Your consumer bootstep must require the `Heart` bootstep to use this:
 
-     .. code-block:: python
 
-         class Step(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.consumer:Heart', )
 
- .. attribute:: task_consumer
 
-     The :class:`kombu.Consumer` object used to consume task messages.
 
-     Your consumer bootstep must require the `Tasks` bootstep to use this:
 
-     .. code-block:: python
 
-         class Step(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.consumer:Heart', )
 
- .. attribute:: strategies
 
-     Every registered task type has an entry in this mapping,
 
-     where the value is used to execute an incoming message of this task type
 
-     (the task execution strategy).  This mapping is generated by the Tasks
 
-     bootstep when the consumer starts::
 
-         for name, task in app.tasks.items():
 
-             strategies[name] = task.start_strategy(app, consumer)
 
-             task.__trace__ = celery.app.trace.build_tracer(
 
-                 name, task, loader, hostname
 
-             )
 
-     Your consumer bootstep must require the `Tasks` bootstep to use this:
 
-     .. code-block:: python
 
-         class Step(bootsteps.StartStopStep):
 
-             requires = ('celery.worker.consumer:Heart', )
 
- .. attribute:: task_buckets
 
-     A :class:`~collections.defaultdict` used to lookup the rate limit for
 
-     a task by type.
 
-     Entries in this dict may be None (for no limit) or a
 
-     :class:`~kombu.utils.limits.TokenBucket` instance implementing
 
-     ``consume(tokens)`` and ``expected_time(tokens)``.
 
-     TokenBucket implements the `token bucket algorithm`_, but any algorithm
 
-     may be used as long as it conforms to the same interface and defines the
 
-     two methods above.
 
-     .. _`token bucket algorithm`: http://en.wikipedia.org/wiki/Token_bucket
 
- .. attribute:: qos
 
-     The :class:`~kombu.common.QoS` object can be used to change the
 
-     task channels current prefetch_count value, e.g::
 
-         # increment at next cycle
 
-         consumer.qos.increment_eventually(1)
 
-         # decrement at next cycle
 
-         consumer.qos.decrement_eventually(1)
 
-         consumer.qos.set(10)
 
- Methods
 
- -------
 
- .. method:: consumer.reset_rate_limits()
 
-     Updates the ``task_buckets`` mapping for all registered task types.
 
- .. method:: consumer.bucket_for_task(type, Bucket=TokenBucket)
 
-     Creates rate limit bucket for a task using its ``task.rate_limit``
 
-     attribute.
 
- .. method:: consumer.add_task_queue(name, exchange=None, exchange_type=None,
 
-                                     routing_key=None, \*\*options):
 
-     Adds new queue to consume from.  This will persist on connection restart.
 
- .. method:: consumer.cancel_task_queue(name)
 
-     Stop consuming from queue by name.  This will persist on connection
 
-     restart.
 
- .. method:: apply_eta_task(request)
 
-     Schedule eta task to execute based on the ``request.eta`` attribute.
 
-     (:class:`~celery.worker.request.Request`)
 
- .. _extending-bootsteps:
 
- Installing Bootsteps
 
- ====================
 
- ``app.steps['worker']`` and ``app.steps['consumer']`` can be modified
 
- to add new bootsteps::
 
-     >>> app = Celery()
 
-     >>> app.steps['worker'].add(MyWorkerStep)  # < add class, do not instantiate
 
-     >>> app.steps['consumer'].add(MyConsumerStep)
 
-     >>> app.steps['consumer'].update([StepA, StepB])
 
-     >>> app.steps['consumer']
 
-     {step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}
 
- The order of steps is not important here as the order is decided by the
 
- resulting dependency graph (``Step.requires``).
 
- To illustrate how you can install bootsteps and how they work, this is an example step that
 
- prints some useless debugging information.
 
- It can be added both as a worker and consumer bootstep:
 
- .. code-block:: python
 
-     from celery import Celery
 
-     from celery import bootsteps
 
-     class InfoStep(bootsteps.Step):
 
-         def __init__(self, parent, **kwargs):
 
-             # here we can prepare the Worker/Consumer object
 
-             # in any way we want, set attribute defaults and so on.
 
-             print('{0!r} is in init'.format(parent))
 
-         def start(self, parent):
 
-             # our step is started together with all other Worker/Consumer
 
-             # bootsteps.
 
-             print('{0!r} is starting'.format(parent))
 
-         def stop(self, parent):
 
-             # the Consumer calls stop every time the consumer is restarted
 
-             # (i.e. connection is lost) and also at shutdown.  The Worker
 
-             # will call stop at shutdown only.
 
-             print('{0!r} is stopping'.format(parent))
 
-         def shutdown(self, parent):
 
-             # shutdown is called by the Consumer at shutdown, it's not
 
-             # called by Worker.
 
-             print('{0!r} is shutting down'.format(parent))
 
-         app = Celery(broker='amqp://')
 
-         app.steps['worker'].add(InfoStep)
 
-         app.steps['consumer'].add(InfoStep)
 
- Starting the worker with this step installed will give us the following
 
- logs::
 
-     <Worker: w@example.com (initializing)> is in init
 
-     <Consumer: w@example.com (initializing)> is in init
 
-     [2013-05-29 16:18:20,544: WARNING/MainProcess]
 
-         <Worker: w@example.com (running)> is starting
 
-     [2013-05-29 16:18:21,577: WARNING/MainProcess]
 
-         <Consumer: w@example.com (running)> is starting
 
-     <Consumer: w@example.com (closing)> is stopping
 
-     <Worker: w@example.com (closing)> is stopping
 
-     <Consumer: w@example.com (terminating)> is shutting down
 
- The ``print`` statements will be redirected to the logging subsystem after
 
- the worker has been initialized, so the "is starting" lines are timestamped.
 
- You may notice that this does no longer happen at shutdown, this is because
 
- the ``stop`` and ``shutdown`` methods are called inside a *signal handler*,
 
- and it's not safe to use logging inside such a handler.
 
- Logging with the Python logging module is not :term:`reentrant`,
 
- which means that you cannot interrupt the function and
 
- call it again later.  It's important that the ``stop`` and ``shutdown`` methods
 
- you write is also :term:`reentrant`.
 
- Starting the worker with ``--loglevel=debug`` will show us more
 
- information about the boot process::
 
-     [2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
 
-     [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
 
-     <celery.apps.worker.Worker object at 0x101ad8410> is in init
 
-     [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
 
-         {Hub, Queues (intra), Pool, Autoreloader, Timer, StateDB,
 
-          Autoscaler, InfoStep, Beat, Consumer}
 
-     [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
 
-     [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
 
-     <celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
 
-     [2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
 
-         {Connection, Mingle, Events, Gossip, InfoStep, Agent,
 
-          Heart, Control, Tasks, event loop}
 
-     [2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
 
-     [2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
 
-     [2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
 
-     [2013-05-29 16:18:20,544: WARNING/MainProcess]
 
-         <celery.apps.worker.Worker object at 0x101ad8410> is starting
 
-     [2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
 
-     [2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
 
-     [2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
 
-     [2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
 
-     [2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
 
-     [2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
 
-     [2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
 
-     [2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
 
-     [2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
 
-     [2013-05-29 16:18:21,577: WARNING/MainProcess]
 
-         <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
 
-     [2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
 
-     [2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
 
-     [2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
 
-     [2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
 
-     [2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
 
-     [2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
 
-     [2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.
 
- .. _extending-programs:
 
- Command-line programs
 
- =====================
 
- .. _extending-commandoptions:
 
- Adding new command-line options
 
- -------------------------------
 
- .. _extending-command-options:
 
- Command-specific options
 
- ~~~~~~~~~~~~~~~~~~~~~~~~
 
- You can add additional command-line options to the ``worker``, ``beat`` and
 
- ``events`` commands by modifying the :attr:`~@Celery.user_options` attribute of the
 
- application instance.
 
- Celery commands uses the :mod:`optparse` module to parse command-line
 
- arguments, and so you have to use :mod:`optparse` specific option instances created
 
- using :func:`optparse.make_option`.  Please see the :mod:`optparse`
 
- documentation to read about the fields supported.
 
- Example adding a custom option to the :program:`celery worker` command:
 
- .. code-block:: python
 
-     from celery import Celery
 
-     from celery.bin import Option  # <-- alias to optparse.make_option
 
-     app = Celery(broker='amqp://')
 
-     app.user_options['worker'].add(
 
-         Option('--enable-my-option', action='store_true', default=False,
 
-                help='Enable custom option.'),
 
-     )
 
- All bootsteps will now receive this argument as a keyword argument to
 
- ``Bootstep.__init__``:
 
- .. code-block:: python
 
-     from celery import bootsteps
 
-     class MyBootstep(bootsteps.Step):
 
-         def __init__(self, worker, enable_my_option=False, **options):
 
-             if enable_my_option:
 
-                 party()
 
-     app.steps['worker'].add(MyBootstep)
 
- .. _extending-preload_options:
 
- Preload options
 
- ~~~~~~~~~~~~~~~
 
- The :program:`celery` umbrella command supports the concept of 'preload
 
- options', which are special options passed to all subcommands and parsed
 
- outside of the main parsing step.
 
- The list of default preload options can be found in the API reference:
 
- :mod:`celery.bin.base`.
 
- You can add new preload options too, e.g. to specify a configuration template:
 
- .. code-block:: python
 
-     from celery import Celery
 
-     from celery import signals
 
-     from celery.bin import Option
 
-     app = Celery()
 
-     app.user_options['preload'].add(
 
-         Option('-Z', '--template', default='default',
 
-                help='Configuration template to use.'),
 
-     )
 
-     @signals.user_preload_options.connect
 
-     def on_preload_parsed(options, **kwargs):
 
-         use_template(options['template'])
 
- .. _extending-subcommands:
 
- Adding new :program:`celery` sub-commands
 
- -----------------------------------------
 
- New commands can be added to the :program:`celery` umbrella command by using
 
- `setuptools entry-points`_.
 
- .. _`setuptools entry-points`:
 
-     http://reinout.vanrees.org/weblog/2010/01/06/zest-releaser-entry-points.html
 
- Entry-points is special metadata that can be added to your packages ``setup.py`` program,
 
- and then after installation, read from the system using the :mod:`pkg_resources` module.
 
- Celery recognizes ``celery.commands`` entry-points to install additional
 
- subcommands, where the value of the entry-point must point to a valid subclass
 
- of :class:`celery.bin.base.Command`.  There is limited documentation,
 
- unfortunately, but you can find inspiration from the various commands in the
 
- :mod:`celery.bin` package.
 
- This is how the Flower_ monitoring extension adds the :program:`celery flower` command,
 
- by adding an entry-point in :file:`setup.py`:
 
- .. code-block:: python
 
-     setup(
 
-         name='flower',
 
-         entry_points={
 
-             'celery.commands': [
 
-                'flower = flower.command.FlowerCommand',
 
-             ],
 
-         }
 
-     )
 
- .. _Flower: http://pypi.python.org/pypi/flower
 
- The command definition is in two parts separated by the equal sign, where the
 
- first part is the name of the subcommand (flower), then the fully qualified
 
- module path to the class that implements the command
 
- (``flower.command.FlowerCommand``).
 
- In the module :file:`flower/command.py`, the command class is defined
 
- something like this:
 
- .. code-block:: python
 
-     from celery.bin.base import Command, Option
 
-     class FlowerCommand(Command):
 
-         def get_options(self):
 
-             return (
 
-                 Option('--port', default=8888, type='int',
 
-                     help='Webserver port',
 
-                 ),
 
-                 Option('--debug', action='store_true'),
 
-             )
 
-         def run(self, port=None, debug=False, **kwargs):
 
-             print('Running our command')
 
- Worker API
 
- ==========
 
- :class:`~kombu.async.Hub` - The workers async event loop.
 
- ---------------------------------------------------------
 
- :supported transports: amqp, redis
 
- .. versionadded:: 3.0
 
- The worker uses asynchronous I/O when the amqp or redis broker transports are
 
- used.  The eventual goal is for all transports to use the eventloop, but that
 
- will take some time so other transports still use a threading-based solution.
 
- .. method:: hub.add(fd, callback, flags)
 
- .. method:: hub.add_reader(fd, callback, \*args)
 
-     Add callback to be called when ``fd`` is readable.
 
-     The callback will stay registered until explictly removed using
 
-     :meth:`hub.remove(fd) <hub.remove>`, or the fd is automatically discarded
 
-     because it's no longer valid.
 
-     Note that only one callback can be registered for any given fd at a time,
 
-     so calling ``add`` a second time will remove any callback that
 
-     was previously registered for that fd.
 
-     A file descriptor is any file-like object that supports the ``fileno``
 
-     method, or it can be the file descriptor number (int).
 
- .. method:: hub.add_writer(fd, callback, \*args)
 
-     Add callback to be called when ``fd`` is writable.
 
-     See also notes for :meth:`hub.add_reader` above.
 
- .. method:: hub.remove(fd)
 
-     Remove all callbacks for ``fd`` from the loop.
 
- Timer - Scheduling events
 
- -------------------------
 
- .. method:: timer.call_after(secs, callback, args=(), kwargs=(),
 
-                              priority=0)
 
- .. method:: timer.call_repeatedly(secs, callback, args=(), kwargs=(),
 
-                                   priority=0)
 
- .. method:: timer.call_at(eta, callback, args=(), kwargs=(),
 
-                           priority=0)
 
 
  |