extending.rst 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924
  1. .. _guide-extending:
  2. ==========================
  3. Extensions and Bootsteps
  4. ==========================
  5. .. contents::
  6. :local:
  7. :depth: 2
  8. .. _extending-custom-consumers:
  9. Custom Message Consumers
  10. ========================
  11. You may want to embed custom Kombu consumers to manually process your messages.
  12. For that purpose a special :class:`~celery.bootstep.ConsumerStep` bootstep class
  13. exists, where you only need to define the ``get_consumers`` method, that must
  14. return a list of :class:`kombu.Consumer` objects to start
  15. whenever the connection is established:
  16. .. code-block:: python
  17. from celery import Celery
  18. from celery import bootsteps
  19. from kombu import Consumer, Exchange, Queue
  20. my_queue = Queue('custom', Exchange('custom'), 'routing_key')
  21. app = Celery(broker='amqp://')
  22. class MyConsumerStep(bootsteps.ConsumerStep):
  23. def get_consumers(self, channel):
  24. return [Consumer(channel,
  25. queues=[my_queue],
  26. callbacks=[self.handle_message],
  27. accept=['json'])]
  28. def handle_message(self, body, message):
  29. print('Received message: {0!r}'.format(body))
  30. message.ack()
  31. app.steps['consumer'].add(MyConsumerStep)
  32. def send_me_a_message(self, who='world!', producer=None):
  33. with app.producer_or_acquire(producer) as producer:
  34. producer.publish(
  35. {'hello': who},
  36. serializer='json',
  37. exchange=my_queue.exchange,
  38. routing_key='routing_key',
  39. declare=[my_queue],
  40. retry=True,
  41. )
  42. if __name__ == '__main__':
  43. send_me_a_message('celery')
  44. .. note::
  45. Kombu Consumers can take use of two different message callback dispatching
  46. mechanisms. The first one is the ``callbacks`` argument that accepts
  47. a list of callbacks with a ``(body, message)`` signature,
  48. the second one is the ``on_message`` argument that takes a single
  49. callback with a ``(message,)`` signature. The latter won't
  50. automatically decode and deserialize the payload.
  51. .. code-block:: python
  52. def get_consumers(self, channel):
  53. return [Consumer(channel, queues=[my_queue],
  54. on_message=self.on_message)]
  55. def on_message(self, message):
  56. payload = message.decode()
  57. print(
  58. 'Received message: {0!r} {props!r} rawlen={s}'.format(
  59. payload, props=message.properties, s=len(message.body),
  60. ))
  61. message.ack()
  62. .. _extending-blueprints:
  63. Blueprints
  64. ==========
  65. Bootsteps is a technique to add functionality to the workers.
  66. A bootstep is a custom class that defines hooks to do custom actions
  67. at different stages in the worker. Every bootstep belongs to a blueprint,
  68. and the worker currently defines two blueprints: **Worker**, and **Consumer**
  69. ----------------------------------------------------------
  70. **Figure A:** Bootsteps in the Worker and Consumer blueprints. Starting
  71. from the bottom up the first step in the worker blueprint
  72. is the Timer, and the last step is to start the Consumer blueprint,
  73. that then establishes the broker connection and starts
  74. consuming messages.
  75. .. figure:: ../images/worker_graph_full.png
  76. ----------------------------------------------------------
  77. .. _extending-worker_blueprint:
  78. Worker
  79. ======
  80. The Worker is the first blueprint to start, and with it starts major components like
  81. the event loop, processing pool, and the timer used for ETA tasks and other
  82. timed events.
  83. When the worker is fully started it continues with the Consumer blueprint,
  84. that sets up how tasks are executed, connects to the broker and starts
  85. the message consumers.
  86. The :class:`~celery.worker.WorkController` is the core worker implementation,
  87. and contains several methods and attributes that you can use in your bootstep.
  88. .. _extending-worker_blueprint-attributes:
  89. Attributes
  90. ----------
  91. .. _extending-worker-app:
  92. .. attribute:: app
  93. The current app instance.
  94. .. _extending-worker-hostname:
  95. .. attribute:: hostname
  96. The workers node name (e.g., `worker1@example.com`)
  97. .. _extending-worker-blueprint:
  98. .. attribute:: blueprint
  99. This is the worker :class:`~celery.bootsteps.Blueprint`.
  100. .. _extending-worker-hub:
  101. .. attribute:: hub
  102. Event loop object (:class:`~kombu.async.Hub`). You can use
  103. this to register callbacks in the event loop.
  104. This is only supported by async I/O enabled transports (amqp, redis),
  105. in which case the `worker.use_eventloop` attribute should be set.
  106. Your worker bootstep must require the Hub bootstep to use this:
  107. .. code-block:: python
  108. class WorkerStep(bootsteps.StartStopStep):
  109. requires = {'celery.worker.components:Hub'}
  110. .. _extending-worker-pool:
  111. .. attribute:: pool
  112. The current process/eventlet/gevent/thread pool.
  113. See :class:`celery.concurrency.base.BasePool`.
  114. Your worker bootstep must require the Pool bootstep to use this:
  115. .. code-block:: python
  116. class WorkerStep(bootsteps.StartStopStep):
  117. requires = {'celery.worker.components:Pool'}
  118. .. _extending-worker-timer:
  119. .. attribute:: timer
  120. :class:`~kombu.async.timer.Timer` used to schedule functions.
  121. Your worker bootstep must require the Timer bootstep to use this:
  122. .. code-block:: python
  123. class WorkerStep(bootsteps.StartStopStep):
  124. requires = {'celery.worker.components:Timer'}
  125. .. _extending-worker-statedb:
  126. .. attribute:: statedb
  127. :class:`Database <celery.worker.state.Persistent>`` to persist state between
  128. worker restarts.
  129. This is only defined if the ``statedb`` argument is enabled.
  130. Your worker bootstep must require the ``Statedb`` bootstep to use this:
  131. .. code-block:: python
  132. class WorkerStep(bootsteps.StartStopStep):
  133. requires = {'celery.worker.components:Statedb'}
  134. .. _extending-worker-autoscaler:
  135. .. attribute:: autoscaler
  136. :class:`~celery.worker.autoscaler.Autoscaler` used to automatically grow
  137. and shrink the number of processes in the pool.
  138. This is only defined if the ``autoscale`` argument is enabled.
  139. Your worker bootstep must require the `Autoscaler` bootstep to use this:
  140. .. code-block:: python
  141. class WorkerStep(bootsteps.StartStopStep):
  142. requires = ('celery.worker.autoscaler:Autoscaler',)
  143. .. _extending-worker-autoreloader:
  144. .. attribute:: autoreloader
  145. :class:`~celery.worker.autoreloder.Autoreloader` used to automatically
  146. reload use code when the file-system changes.
  147. This is only defined if the ``autoreload`` argument is enabled.
  148. Your worker bootstep must require the `Autoreloader` bootstep to use this;
  149. .. code-block:: python
  150. class WorkerStep(bootsteps.StartStopStep):
  151. requires = ('celery.worker.autoreloader:Autoreloader',)
  152. >>>>>>> parent of 8e0ab0f... Removes autoscale option completely
  153. Example worker bootstep
  154. -----------------------
  155. An example Worker bootstep could be:
  156. .. code-block:: python
  157. from celery import bootsteps
  158. class ExampleWorkerStep(bootsteps.StartStopStep):
  159. requires = {'celery.worker.components:Pool'}
  160. def __init__(self, worker, **kwargs):
  161. print('Called when the WorkController instance is constructed')
  162. print('Arguments to WorkController: {0!r}'.format(kwargs))
  163. def create(self, worker):
  164. # this method can be used to delegate the action methods
  165. # to another object that implements ``start`` and ``stop``.
  166. return self
  167. def start(self, worker):
  168. print('Called when the worker is started.')
  169. def stop(self, worker):
  170. print('Called when the worker shuts down.')
  171. def terminate(self, worker):
  172. print('Called when the worker terminates')
  173. Every method is passed the current ``WorkController`` instance as the first
  174. argument.
  175. Another example could use the timer to wake up at regular intervals:
  176. .. code-block:: python
  177. from celery import bootsteps
  178. class DeadlockDetection(bootsteps.StartStopStep):
  179. requires = {'celery.worker.components:Timer'}
  180. def __init__(self, worker, deadlock_timeout=3600):
  181. self.timeout = deadlock_timeout
  182. self.requests = []
  183. self.tref = None
  184. def start(self, worker):
  185. # run every 30 seconds.
  186. self.tref = worker.timer.call_repeatedly(
  187. 30.0, self.detect, (worker,), priority=10,
  188. )
  189. def stop(self, worker):
  190. if self.tref:
  191. self.tref.cancel()
  192. self.tref = None
  193. def detect(self, worker):
  194. # update active requests
  195. for req in worker.active_requests:
  196. if req.time_start and time() - req.time_start > self.timeout:
  197. raise SystemExit()
  198. .. _extending-consumer_blueprint:
  199. Consumer
  200. ========
  201. The Consumer blueprint establishes a connection to the broker, and
  202. is restarted every time this connection is lost. Consumer bootsteps
  203. include the worker heartbeat, the remote control command consumer, and
  204. importantly, the task consumer.
  205. When you create consumer bootsteps you must take into account that it must
  206. be possible to restart your blueprint. An additional 'shutdown' method is
  207. defined for consumer bootsteps, this method is called when the worker is
  208. shutdown.
  209. .. _extending-consumer-attributes:
  210. Attributes
  211. ----------
  212. .. _extending-consumer-app:
  213. .. attribute:: app
  214. The current app instance.
  215. .. _extending-consumer-controller:
  216. .. attribute:: controller
  217. The parent :class:`~@WorkController` object that created this consumer.
  218. .. _extending-consumer-hostname:
  219. .. attribute:: hostname
  220. The workers node name (e.g., `worker1@example.com`)
  221. .. _extending-consumer-blueprint:
  222. .. attribute:: blueprint
  223. This is the worker :class:`~celery.bootsteps.Blueprint`.
  224. .. _extending-consumer-hub:
  225. .. attribute:: hub
  226. Event loop object (:class:`~kombu.async.Hub`). You can use
  227. this to register callbacks in the event loop.
  228. This is only supported by async I/O enabled transports (amqp, redis),
  229. in which case the `worker.use_eventloop` attribute should be set.
  230. Your worker bootstep must require the Hub bootstep to use this:
  231. .. code-block:: python
  232. class WorkerStep(bootsteps.StartStopStep):
  233. requires = {'celery.worker.components:Hub'}
  234. .. _extending-consumer-connection:
  235. .. attribute:: connection
  236. The current broker connection (:class:`kombu.Connection`).
  237. A consumer bootstep must require the 'Connection' bootstep
  238. to use this:
  239. .. code-block:: python
  240. class Step(bootsteps.StartStopStep):
  241. requires = {'celery.worker.consumer.connection:Connection'}
  242. .. _extending-consumer-event_dispatcher:
  243. .. attribute:: event_dispatcher
  244. A :class:`@events.Dispatcher` object that can be used to send events.
  245. A consumer bootstep must require the `Events` bootstep to use this.
  246. .. code-block:: python
  247. class Step(bootsteps.StartStopStep):
  248. requires = {'celery.worker.consumer.events:Events'}
  249. .. _extending-consumer-gossip:
  250. .. attribute:: gossip
  251. Worker to worker broadcast communication
  252. (:class:`~celery.worker.consumer.gossip.Gossip`).
  253. A consumer bootstep must require the `Gossip` bootstep to use this.
  254. .. code-block:: python
  255. class RatelimitStep(bootsteps.StartStopStep):
  256. """Rate limit tasks based on the number of workers in the
  257. cluster."""
  258. requires = {'celery.worker.consumer.gossip:Gossip'}
  259. def start(self, c):
  260. self.c = c
  261. self.c.gossip.on.node_join.add(self.on_cluster_size_change)
  262. self.c.gossip.on.node_leave.add(self.on_cluster_size_change)
  263. self.c.gossip.on.node_lost.add(self.on_node_lost)
  264. self.tasks = [
  265. self.app.tasks['proj.tasks.add']
  266. self.app.tasks['proj.tasks.mul']
  267. ]
  268. self.last_size = None
  269. def on_cluster_size_change(self, worker):
  270. cluster_size = len(list(self.c.gossip.state.alive_workers()))
  271. if cluster_size != self.last_size:
  272. for task in self.tasks:
  273. task.rate_limit = 1.0 / cluster_size
  274. self.c.reset_rate_limits()
  275. self.last_size = cluster_size
  276. def on_node_lost(self, worker):
  277. # may have processed heartbeat too late, so wake up soon
  278. # in order to see if the worker recovered.
  279. self.c.timer.call_after(10.0, self.on_cluster_size_change)
  280. **Callbacks**
  281. - ``<set> gossip.on.node_join``
  282. Called whenever a new node joins the cluster, providing a
  283. :class:`~celery.events.state.Worker` instance.
  284. - ``<set> gossip.on.node_leave``
  285. Called whenever a new node leaves the cluster (shuts down),
  286. providing a :class:`~celery.events.state.Worker` instance.
  287. - ``<set> gossip.on.node_lost``
  288. Called whenever heartbeat was missed for a worker instance in the
  289. cluster (heartbeat not received or processed in time),
  290. providing a :class:`~celery.events.state.Worker` instance.
  291. This doesn't necessarily mean the worker is actually offline, so use a time
  292. out mechanism if the default heartbeat timeout isn't sufficient.
  293. .. _extending-consumer-pool:
  294. .. attribute:: pool
  295. The current process/eventlet/gevent/thread pool.
  296. See :class:`celery.concurrency.base.BasePool`.
  297. .. _extending-consumer-timer:
  298. .. attribute:: timer
  299. :class:`Timer <celery.utils.timer2.Schedule` used to schedule functions.
  300. .. _extending-consumer-heart:
  301. .. attribute:: heart
  302. Responsible for sending worker event heartbeats
  303. (:class:`~celery.worker.heartbeat.Heart`).
  304. Your consumer bootstep must require the `Heart` bootstep to use this:
  305. .. code-block:: python
  306. class Step(bootsteps.StartStopStep):
  307. requires = {'celery.worker.consumer.heart:Heart'}
  308. .. _extending-consumer-task_consumer:
  309. .. attribute:: task_consumer
  310. The :class:`kombu.Consumer` object used to consume task messages.
  311. Your consumer bootstep must require the `Tasks` bootstep to use this:
  312. .. code-block:: python
  313. class Step(bootsteps.StartStopStep):
  314. requires = {'celery.worker.consumer.tasks:Tasks'}
  315. .. _extending-consumer-strategies:
  316. .. attribute:: strategies
  317. Every registered task type has an entry in this mapping,
  318. where the value is used to execute an incoming message of this task type
  319. (the task execution strategy). This mapping is generated by the Tasks
  320. bootstep when the consumer starts:
  321. .. code-block:: python
  322. for name, task in app.tasks.items():
  323. strategies[name] = task.start_strategy(app, consumer)
  324. task.__trace__ = celery.app.trace.build_tracer(
  325. name, task, loader, hostname
  326. )
  327. Your consumer bootstep must require the `Tasks` bootstep to use this:
  328. .. code-block:: python
  329. class Step(bootsteps.StartStopStep):
  330. requires = {'celery.worker.consumer.tasks:Tasks'}
  331. .. _extending-consumer-task_buckets:
  332. .. attribute:: task_buckets
  333. A :class:`~collections.defaultdict` used to look-up the rate limit for
  334. a task by type.
  335. Entries in this dict may be None (for no limit) or a
  336. :class:`~kombu.utils.limits.TokenBucket` instance implementing
  337. ``consume(tokens)`` and ``expected_time(tokens)``.
  338. TokenBucket implements the `token bucket algorithm`_, but any algorithm
  339. may be used as long as it conforms to the same interface and defines the
  340. two methods above.
  341. .. _`token bucket algorithm`: https://en.wikipedia.org/wiki/Token_bucket
  342. .. _extending_consumer-qos:
  343. .. attribute:: qos
  344. The :class:`~kombu.common.QoS` object can be used to change the
  345. task channels current prefetch_count value:
  346. .. code-block:: python
  347. # increment at next cycle
  348. consumer.qos.increment_eventually(1)
  349. # decrement at next cycle
  350. consumer.qos.decrement_eventually(1)
  351. consumer.qos.set(10)
  352. Methods
  353. -------
  354. .. method:: consumer.reset_rate_limits()
  355. Updates the ``task_buckets`` mapping for all registered task types.
  356. .. method:: consumer.bucket_for_task(type, Bucket=TokenBucket)
  357. Creates rate limit bucket for a task using its ``task.rate_limit``
  358. attribute.
  359. .. method:: consumer.add_task_queue(name, exchange=None, exchange_type=None,
  360. routing_key=None, \*\*options):
  361. Adds new queue to consume from. This will persist on connection restart.
  362. .. method:: consumer.cancel_task_queue(name)
  363. Stop consuming from queue by name. This will persist on connection
  364. restart.
  365. .. method:: apply_eta_task(request)
  366. Schedule ETA task to execute based on the ``request.eta`` attribute.
  367. (:class:`~celery.worker.request.Request`)
  368. .. _extending-bootsteps:
  369. Installing Bootsteps
  370. ====================
  371. ``app.steps['worker']`` and ``app.steps['consumer']`` can be modified
  372. to add new bootsteps:
  373. .. code-block:: pycon
  374. >>> app = Celery()
  375. >>> app.steps['worker'].add(MyWorkerStep) # < add class, don't instantiate
  376. >>> app.steps['consumer'].add(MyConsumerStep)
  377. >>> app.steps['consumer'].update([StepA, StepB])
  378. >>> app.steps['consumer']
  379. {step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}
  380. The order of steps isn't important here as the order is decided by the
  381. resulting dependency graph (``Step.requires``).
  382. To illustrate how you can install bootsteps and how they work, this is an example step that
  383. prints some useless debugging information.
  384. It can be added both as a worker and consumer bootstep:
  385. .. code-block:: python
  386. from celery import Celery
  387. from celery import bootsteps
  388. class InfoStep(bootsteps.Step):
  389. def __init__(self, parent, **kwargs):
  390. # here we can prepare the Worker/Consumer object
  391. # in any way we want, set attribute defaults, and so on.
  392. print('{0!r} is in init'.format(parent))
  393. def start(self, parent):
  394. # our step is started together with all other Worker/Consumer
  395. # bootsteps.
  396. print('{0!r} is starting'.format(parent))
  397. def stop(self, parent):
  398. # the Consumer calls stop every time the consumer is
  399. # restarted (i.e., connection is lost) and also at shutdown.
  400. # The Worker will call stop at shutdown only.
  401. print('{0!r} is stopping'.format(parent))
  402. def shutdown(self, parent):
  403. # shutdown is called by the Consumer at shutdown, it's not
  404. # called by Worker.
  405. print('{0!r} is shutting down'.format(parent))
  406. app = Celery(broker='amqp://')
  407. app.steps['worker'].add(InfoStep)
  408. app.steps['consumer'].add(InfoStep)
  409. Starting the worker with this step installed will give us the following
  410. logs:
  411. .. code-block:: text
  412. <Worker: w@example.com (initializing)> is in init
  413. <Consumer: w@example.com (initializing)> is in init
  414. [2013-05-29 16:18:20,544: WARNING/MainProcess]
  415. <Worker: w@example.com (running)> is starting
  416. [2013-05-29 16:18:21,577: WARNING/MainProcess]
  417. <Consumer: w@example.com (running)> is starting
  418. <Consumer: w@example.com (closing)> is stopping
  419. <Worker: w@example.com (closing)> is stopping
  420. <Consumer: w@example.com (terminating)> is shutting down
  421. The ``print`` statements will be redirected to the logging subsystem after
  422. the worker has been initialized, so the "is starting" lines are time-stamped.
  423. You may notice that this does no longer happen at shutdown, this is because
  424. the ``stop`` and ``shutdown`` methods are called inside a *signal handler*,
  425. and it's not safe to use logging inside such a handler.
  426. Logging with the Python logging module isn't :term:`reentrant`:
  427. meaning you cannot interrupt the function then
  428. call it again later. It's important that the ``stop`` and ``shutdown`` methods
  429. you write is also :term:`reentrant`.
  430. Starting the worker with :option:`--loglevel=debug <celery worker --loglevel>`
  431. will show us more information about the boot process:
  432. .. code-block:: text
  433. [2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
  434. [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
  435. <celery.apps.worker.Worker object at 0x101ad8410> is in init
  436. [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
  437. {Hub, Pool, Timer, StateDB, Autoscaler, InfoStep, Beat, Consumer}
  438. [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
  439. [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
  440. <celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
  441. [2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
  442. {Connection, Mingle, Events, Gossip, InfoStep, Agent,
  443. Heart, Control, Tasks, event loop}
  444. [2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
  445. [2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
  446. [2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
  447. [2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
  448. [2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
  449. [2013-05-29 16:18:20,544: WARNING/MainProcess]
  450. <celery.apps.worker.Worker object at 0x101ad8410> is starting
  451. [2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
  452. [2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
  453. [2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
  454. [2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
  455. [2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
  456. [2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
  457. [2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
  458. [2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
  459. [2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
  460. [2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
  461. [2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
  462. [2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
  463. [2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
  464. [2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
  465. [2013-05-29 16:18:21,577: WARNING/MainProcess]
  466. <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
  467. [2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
  468. [2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
  469. [2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
  470. [2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
  471. [2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
  472. [2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
  473. [2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
  474. [2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
  475. [2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
  476. [2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.
  477. .. _extending-programs:
  478. Command-line programs
  479. =====================
  480. .. _extending-commandoptions:
  481. Adding new command-line options
  482. -------------------------------
  483. .. _extending-command-options:
  484. Command-specific options
  485. ~~~~~~~~~~~~~~~~~~~~~~~~
  486. You can add additional command-line options to the ``worker``, ``beat``, and
  487. ``events`` commands by modifying the :attr:`~@user_options` attribute of the
  488. application instance.
  489. Celery commands uses the :mod:`argparse` module to parse command-line
  490. arguments, and so to add custom arguments you need to specify a callback
  491. that takes a :class:`argparse.ArgumentParser` instance - and adds arguments.
  492. Please see the :mod:`argparse` documentation to read about the fields supported.
  493. Example adding a custom option to the :program:`celery worker` command:
  494. .. code-block:: python
  495. from celery import Celery
  496. app = Celery(broker='amqp://')
  497. def add_worker_arguments(parser):
  498. parser.add_argument(
  499. '--enable-my-option', action='store_true', default=False,
  500. help='Enable custom option.',
  501. ),
  502. app.user_options['worker'].add(add_worker_arguments)
  503. All bootsteps will now receive this argument as a keyword argument to
  504. ``Bootstep.__init__``:
  505. .. code-block:: python
  506. from celery import bootsteps
  507. class MyBootstep(bootsteps.Step):
  508. def __init__(self, worker, enable_my_option=False, **options):
  509. if enable_my_option:
  510. party()
  511. app.steps['worker'].add(MyBootstep)
  512. .. _extending-preload_options:
  513. Preload options
  514. ~~~~~~~~~~~~~~~
  515. The :program:`celery` umbrella command supports the concept of 'preload
  516. options'. These are special options passed to all sub-commands and parsed
  517. outside of the main parsing step.
  518. The list of default preload options can be found in the API reference:
  519. :mod:`celery.bin.base`.
  520. You can add new preload options too, for example to specify a configuration
  521. template:
  522. .. code-block:: python
  523. from celery import Celery
  524. from celery import signals
  525. from celery.bin import Option
  526. app = Celery()
  527. def add_preload_options(parser):
  528. parser.add_argument(
  529. '-Z', '--template', default='default',
  530. help='Configuration template to use.',
  531. )
  532. app.user_options['preload'].add(add_preload_options)
  533. @signals.user_preload_options.connect
  534. def on_preload_parsed(options, **kwargs):
  535. use_template(options['template'])
  536. .. _extending-subcommands:
  537. Adding new :program:`celery` sub-commands
  538. -----------------------------------------
  539. New commands can be added to the :program:`celery` umbrella command by using
  540. `setuptools entry-points`_.
  541. .. _`setuptools entry-points`:
  542. http://reinout.vanrees.org/weblog/2010/01/06/zest-releaser-entry-points.html
  543. Entry-points is special meta-data that can be added to your packages ``setup.py`` program,
  544. and then after installation, read from the system using the :mod:`pkg_resources` module.
  545. Celery recognizes ``celery.commands`` entry-points to install additional
  546. sub-commands, where the value of the entry-point must point to a valid subclass
  547. of :class:`celery.bin.base.Command`. There's limited documentation,
  548. unfortunately, but you can find inspiration from the various commands in the
  549. :mod:`celery.bin` package.
  550. This is how the :pypi:`Flower` monitoring extension adds the :program:`celery flower` command,
  551. by adding an entry-point in :file:`setup.py`:
  552. .. code-block:: python
  553. setup(
  554. name='flower',
  555. entry_points={
  556. 'celery.commands': [
  557. 'flower = flower.command:FlowerCommand',
  558. ],
  559. }
  560. )
  561. The command definition is in two parts separated by the equal sign, where the
  562. first part is the name of the sub-command (flower), then the second part is
  563. the fully qualified symbol path to the class that implements the command:
  564. .. code-block:: text
  565. flower.command:FlowerCommand
  566. The module path and the name of the attribute should be separated by colon
  567. as above.
  568. In the module :file:`flower/command.py`, the command class is defined
  569. something like this:
  570. .. code-block:: python
  571. from celery.bin.base import Command
  572. class FlowerCommand(Command):
  573. def add_arguments(self, parser):
  574. parser.add_argument(
  575. '--port', default=8888, type='int',
  576. help='Webserver port',
  577. ),
  578. parser.add_argument(
  579. '--debug', action='store_true',
  580. )
  581. def run(self, port=None, debug=False, **kwargs):
  582. print('Running our command')
  583. Worker API
  584. ==========
  585. :class:`~kombu.async.Hub` - The workers async event loop
  586. --------------------------------------------------------
  587. :supported transports: amqp, redis
  588. .. versionadded:: 3.0
  589. The worker uses asynchronous I/O when the amqp or redis broker transports are
  590. used. The eventual goal is for all transports to use the event-loop, but that
  591. will take some time so other transports still use a threading-based solution.
  592. .. method:: hub.add(fd, callback, flags)
  593. .. method:: hub.add_reader(fd, callback, \*args)
  594. Add callback to be called when ``fd`` is readable.
  595. The callback will stay registered until explicitly removed using
  596. :meth:`hub.remove(fd) <hub.remove>`, or the file descriptor is
  597. automatically discarded because it's no longer valid.
  598. Note that only one callback can be registered for any given
  599. file descriptor at a time, so calling ``add`` a second time will remove
  600. any callback that was previously registered for that file descriptor.
  601. A file descriptor is any file-like object that supports the ``fileno``
  602. method, or it can be the file descriptor number (int).
  603. .. method:: hub.add_writer(fd, callback, \*args)
  604. Add callback to be called when ``fd`` is writable.
  605. See also notes for :meth:`hub.add_reader` above.
  606. .. method:: hub.remove(fd)
  607. Remove all callbacks for file descriptor ``fd`` from the loop.
  608. Timer - Scheduling events
  609. -------------------------
  610. .. method:: timer.call_after(secs, callback, args=(), kwargs=(),
  611. priority=0)
  612. .. method:: timer.call_repeatedly(secs, callback, args=(), kwargs=(),
  613. priority=0)
  614. .. method:: timer.call_at(eta, callback, args=(), kwargs=(),
  615. priority=0)