extending.rst 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801
  1. .. _guide-extending:
  2. ==========================
  3. Extensions and Bootsteps
  4. ==========================
  5. .. contents::
  6. :local:
  7. :depth: 2
  8. .. _extending-custom-consumers:
  9. Custom Message Consumers
  10. ========================
  11. You may want to embed custom Kombu consumers to manually process your messages.
  12. For that purpose a special :class:`~celery.bootstep.ConsumerStep` bootstep class
  13. exists, where you only need to define the ``get_consumers`` method, which must
  14. return a list of :class:`kombu.Consumer` objects to start
  15. whenever the connection is established:
  16. .. code-block:: python
  17. from celery import Celery
  18. from celery import bootsteps
  19. from kombu import Consumer, Exchange, Queue
  20. my_queue = Queue('custom', Exchange('custom'), 'routing_key')
  21. app = Celery(broker='amqp://')
  22. class MyConsumerStep(bootsteps.ConsumerStep):
  23. def get_consumers(self, channel):
  24. return [Consumer(channel,
  25. queues=[my_queue],
  26. callbacks=[self.handle_message],
  27. accept=['json'])]
  28. def handle_message(self, body, message):
  29. print('Received message: {0!r}'.format(body))
  30. message.ack()
  31. app.steps['consumer'].add(MyConsumerStep)
  32. def send_me_a_message(self, who='world!', producer=None):
  33. with app.producer_or_acquire(producer) as producer:
  34. producer.send(
  35. {'hello': who},
  36. serializer='json',
  37. exchange=my_queue.exchange,
  38. routing_key='routing_key',
  39. declare=[my_queue],
  40. retry=True,
  41. )
  42. if __name__ == '__main__':
  43. send_me_a_message('celery')
  44. .. note::
  45. Kombu Consumers can take use of two different message callback dispatching
  46. mechanisms. The first one is the ``callbacks`` argument which accepts
  47. a list of callbacks with a ``(body, message)`` signature,
  48. the second one is the ``on_message`` argument which takes a single
  49. callback with a ``(message, )`` signature. The latter will not
  50. automatically decode and deserialize the payload which is useful
  51. in many cases:
  52. .. code-block:: python
  53. def get_consumers(self, channel):
  54. return [Consumer(channel, queues=[my_queue],
  55. on_message=self.on_message)]
  56. def on_message(self, message):
  57. payload = message.decode()
  58. print(
  59. 'Received message: {0!r} {props!r} rawlen={s}'.format(
  60. payload, props=message.properties, s=len(message.body),
  61. ))
  62. message.ack()
  63. .. _extending-blueprints:
  64. Blueprints
  65. ==========
  66. Bootsteps is a technique to add functionality to the workers.
  67. A bootstep is a custom class that defines hooks to do custom actions
  68. at different stages in the worker. Every bootstep belongs to a blueprint,
  69. and the worker currently defines two blueprints: **Worker**, and **Consumer**
  70. ----------------------------------------------------------
  71. **Figure A:** Bootsteps in the Worker and Consumer blueprints. Starting
  72. from the bottom up the first step in the worker blueprint
  73. is the Timer, and the last step is to start the Consumer blueprint,
  74. which then establishes the broker connection and starts
  75. consuming messages.
  76. .. figure:: ../images/worker_graph_full.png
  77. ----------------------------------------------------------
  78. Worker
  79. ======
  80. The Worker is the first blueprint to start, and with it starts major components like
  81. the event loop, processing pool, the timer, and also optional components
  82. like the autoscaler. When the worker is fully started it will continue
  83. to the Consumer blueprint.
  84. The :class:`~celery.worker.WorkController` is the core worker implementation,
  85. and contains several methods and attributes that you can use in your bootstep.
  86. Attributes
  87. ----------
  88. .. attribute:: app
  89. The current app instance.
  90. .. attribute:: hostname
  91. The workers node name (e.g. `worker1@example.com`)
  92. .. attribute:: blueprint
  93. This is the worker :class:`~celery.bootsteps.Blueprint`.
  94. .. attribute:: hub
  95. Event loop object (:class:`~kombu.async.Hub`). You can use
  96. this to register callbacks in the event loop.
  97. This is only supported by async I/O enabled transports (amqp, redis),
  98. in which case the `worker.use_eventloop` attribute should be set.
  99. Your worker bootstep must require the Hub bootstep to use this:
  100. .. code-block:: python
  101. class WorkerStep(bootsteps.StartStopStep):
  102. requires = ('celery.worker.components:Hub', )
  103. .. attribute:: pool
  104. The current process/eventlet/gevent/thread pool.
  105. See :class:`celery.concurrency.base.BasePool`.
  106. Your worker bootstep must require the Pool bootstep to use this:
  107. .. code-block:: python
  108. class WorkerStep(bootsteps.StartStopStep):
  109. requires = ('celery.worker.components:Pool', )
  110. .. attribute:: timer
  111. :class:`~kombu.async.timer.Timer` used to schedule functions.
  112. Your worker bootstep must require the Timer bootstep to use this:
  113. .. code-block:: python
  114. class WorkerStep(bootsteps.StartStopStep):
  115. requires = ('celery.worker.components:Timer', )
  116. .. attribute:: statedb
  117. :class:`Database <celery.worker.state.Persistent>`` to persist state between
  118. worker restarts.
  119. This is only defined if the ``statedb`` argument is enabled.
  120. Your worker bootstep must require the Statedb bootstep to use this:
  121. .. code-block:: python
  122. class WorkerStep(bootsteps.StartStopStep):
  123. requires = ('celery.worker.components:Statedb', )
  124. .. attribute:: autoscaler
  125. :class:`~celery.worker.autoscaler.Autoscaler` used to automatically grow
  126. and shrink the number of processes in the pool.
  127. This is only defined if the ``autoscale`` argument is enabled.
  128. Your worker bootstep must require the `Autoscaler` bootstep to use this:
  129. .. code-block:: python
  130. class WorkerStep(bootsteps.StartStopStep):
  131. requires = ('celery.worker.autoscaler:Autoscaler', )
  132. .. attribute:: autoreloader
  133. :class:`~celery.worker.autoreloder.Autoreloader` used to automatically
  134. reload use code when the filesystem changes.
  135. This is only defined if the ``autoreload`` argument is enabled.
  136. Your worker bootstep must require the `Autoreloader` bootstep to use this;
  137. .. code-block:: python
  138. class WorkerStep(bootsteps.StartStopStep):
  139. requires = ('celery.worker.autoreloader:Autoreloader', )
  140. An example Worker bootstep could be:
  141. .. code-block:: python
  142. from celery import bootsteps
  143. class ExampleWorkerStep(bootsteps.StartStopStep):
  144. requires = ('Pool', )
  145. def __init__(self, worker, **kwargs):
  146. print('Called when the WorkController instance is constructed')
  147. print('Arguments to WorkController: {0!r}'.format(kwargs))
  148. def create(self, worker):
  149. # this method can be used to delegate the action methods
  150. # to another object that implements ``start`` and ``stop``.
  151. return self
  152. def start(self, worker):
  153. print('Called when the worker is started.')
  154. def stop(self, worker):
  155. print("Called when the worker shuts down.")
  156. def terminate(self, worker):
  157. print("Called when the worker terminates")
  158. Every method is passed the current ``WorkController`` instance as the first
  159. argument.
  160. Another example could use the timer to wake up at regular intervals:
  161. .. code-block:: python
  162. from celery import bootsteps
  163. class DeadlockDetection(bootsteps.StartStopStep):
  164. requires = ('Timer', )
  165. def __init__(self, worker, deadlock_timeout=3600):
  166. self.timeout = deadlock_timeout
  167. self.requests = []
  168. self.tref = None
  169. def start(self, worker):
  170. # run every 30 seconds.
  171. self.tref = worker.timer.call_repeatedly(
  172. 30.0, self.detect, (worker, ), priority=10,
  173. )
  174. def stop(self, worker):
  175. if self.tref:
  176. self.tref.cancel()
  177. self.tref = None
  178. def detect(self, worker):
  179. # update active requests
  180. for req in self.worker.active_requests:
  181. if req.time_start and time() - req.time_start > self.timeout:
  182. raise SystemExit()
  183. Consumer
  184. ========
  185. The Consumer blueprint establishes a connection to the broker, and
  186. is restarted every time this connection is lost. Consumer bootsteps
  187. include the worker heartbeat, the remote control command consumer, and
  188. importantly, the task consumer.
  189. When you create consumer bootsteps you must take into account that it must
  190. be possible to restart your blueprint. An additional 'shutdown' method is
  191. defined for consumer bootsteps, this method is called when the worker is
  192. shutdown.
  193. Attributes
  194. ----------
  195. .. attribute:: app
  196. The current app instance.
  197. .. attribute:: controller
  198. The parent :class:`~@WorkController` object that created this consumer.
  199. .. attribute:: hostname
  200. The workers node name (e.g. `worker1@example.com`)
  201. .. attribute:: blueprint
  202. This is the worker :class:`~celery.bootsteps.Blueprint`.
  203. .. attribute:: hub
  204. Event loop object (:class:`~kombu.async.Hub`). You can use
  205. this to register callbacks in the event loop.
  206. This is only supported by async I/O enabled transports (amqp, redis),
  207. in which case the `worker.use_eventloop` attribute should be set.
  208. Your worker bootstep must require the Hub bootstep to use this:
  209. .. code-block:: python
  210. class WorkerStep(bootsteps.StartStopStep):
  211. requires = ('celery.worker:Hub', )
  212. .. attribute:: connection
  213. The current broker connection (:class:`kombu.Connection`).
  214. A consumer bootstep must require the 'Connection' bootstep
  215. to use this:
  216. .. code-block:: python
  217. class Step(bootsteps.StartStopStep):
  218. requires = ('celery.worker.consumer:Connection', )
  219. .. attribute:: event_dispatcher
  220. A :class:`@events.Dispatcher` object that can be used to send events.
  221. A consumer bootstep must require the `Events` bootstep to use this.
  222. .. code-block:: python
  223. class Step(bootsteps.StartStopStep):
  224. requires = ('celery.worker.consumer:Events', )
  225. .. attribute:: gossip
  226. Worker to worker broadcast communication
  227. (class:`~celery.worker.consumer.Gossip`).
  228. A consumer bootstep must require the `Gossip` bootstep to use this.
  229. .. code-block:: python
  230. class Step(bootsteps.StartStopStep):
  231. requires = ('celery.worker.consumer:Events', )
  232. .. attribute:: pool
  233. The current process/eventlet/gevent/thread pool.
  234. See :class:`celery.concurrency.base.BasePool`.
  235. .. attribute:: timer
  236. :class:`Timer <celery.utils.timer2.Schedule` used to schedule functions.
  237. .. attribute:: heart
  238. Responsible for sending worker event heartbeats
  239. (:class:`~celery.worker.heartbeat.Heart`).
  240. Your consumer bootstep must require the `Heart` bootstep to use this:
  241. .. code-block:: python
  242. class Step(bootsteps.StartStopStep):
  243. requires = ('celery.worker.consumer:Heart', )
  244. .. attribute:: task_consumer
  245. The :class:`kombu.Consumer` object used to consume task messages.
  246. Your consumer bootstep must require the `Tasks` bootstep to use this:
  247. .. code-block:: python
  248. class Step(bootsteps.StartStopStep):
  249. requires = ('celery.worker.consumer:Heart', )
  250. .. attribute:: strategies
  251. Every registered task type has an entry in this mapping,
  252. where the value is used to execute an incoming message of this task type
  253. (the task execution strategy). This mapping is generated by the Tasks
  254. bootstep when the consumer starts::
  255. for name, task in app.tasks.items():
  256. strategies[name] = task.start_strategy(app, consumer)
  257. task.__trace__ = celery.app.trace.build_tracer(
  258. name, task, loader, hostname
  259. )
  260. Your consumer bootstep must require the `Tasks` bootstep to use this:
  261. .. code-block:: python
  262. class Step(bootsteps.StartStopStep):
  263. requires = ('celery.worker.consumer:Heart', )
  264. .. attribute:: task_buckets
  265. A :class:`~collections.defaultdict` used to lookup the rate limit for
  266. a task by type.
  267. Entries in this dict may be None (for no limit) or a
  268. :class:`~kombu.utils.limits.TokenBucket` instance implementing
  269. ``consume(tokens)`` and ``expected_time(tokens)``.
  270. TokenBucket implements the `token bucket algorithm`_, but any algorithm
  271. may be used as long as it conforms to the same interface and defines the
  272. two methods above.
  273. .. _`token bucket algorithm`: http://en.wikipedia.org/wiki/Token_bucket
  274. .. attribute:: qos
  275. The :class:`~kombu.common.QoS` object can be used to change the
  276. task channels current prefetch_count value, e.g::
  277. # increment at next cycle
  278. consumer.qos.increment_eventually(1)
  279. # decrement at next cycle
  280. consumer.qos.decrement_eventually(1)
  281. consumer.qos.set(10)
  282. Methods
  283. -------
  284. .. method:: consumer.reset_rate_limits()
  285. Updates the ``task_buckets`` mapping for all registered task types.
  286. .. method:: consumer.bucket_for_task(type, Bucket=TokenBucket)
  287. Creates rate limit bucket for a task using its ``task.rate_limit``
  288. attribute.
  289. .. method:: consumer.add_task_queue(name, exchange=None, exchange_type=None,
  290. routing_key=None, \*\*options):
  291. Adds new queue to consume from. This will persist on connection restart.
  292. .. method:: consumer.cancel_task_queue(name)
  293. Stop consuming from queue by name. This will persist on connection
  294. restart.
  295. .. method:: apply_eta_task(request)
  296. Schedule eta task to execute based on the ``request.eta`` attribute.
  297. (:class:`~celery.worker.job.Request`)
  298. .. _extending-bootsteps:
  299. Installing Bootsteps
  300. ====================
  301. ``app.steps['worker']`` and ``app.steps['consumer']`` can be modified
  302. to add new bootsteps::
  303. >>> app = Celery()
  304. >>> app.steps['worker'].add(MyWorkerStep) # < add class, do not instantiate
  305. >>> app.steps['consumer'].add(MyConsumerStep)
  306. >>> app.steps['consumer'].update([StepA, StepB])
  307. >>> app.steps['consumer']
  308. {step:proj.StepB{()}, step:proj.MyConsumerStep{()}, step:proj.StepA{()}
  309. The order of steps is not important here as the order is decided by the
  310. resulting dependency graph (``Step.requires``).
  311. To illustrate how you can install bootsteps and how they work, this is an example step that
  312. prints some useless debugging information.
  313. It can be added both as a worker and consumer bootstep:
  314. .. code-block:: python
  315. from celery import Celery
  316. from celery import bootsteps
  317. class InfoStep(bootsteps.Step):
  318. def __init__(self, parent, **kwargs):
  319. # here we can prepare the Worker/Consumer object
  320. # in any way we want, set attribute defaults and so on.
  321. print('{0!r} is in init'.format(parent))
  322. def start(self, parent):
  323. # our step is started together with all other Worker/Consumer
  324. # bootsteps.
  325. print('{0!r} is starting'.format(parent))
  326. def stop(self, parent):
  327. # the Consumer calls stop every time the consumer is restarted
  328. # (i.e. connection is lost) and also at shutdown. The Worker
  329. # will call stop at shutdown only.
  330. print('{0!r} is stopping'.format(parent))
  331. def shutdown(self, parent):
  332. # shutdown is called by the Consumer at shutdown, it's not
  333. # called by Worker.
  334. print('{0!r} is shutting down'.format(parent))
  335. app = Celery(broker='amqp://')
  336. app.steps['worker'].add(InfoStep)
  337. app.steps['consumer'].add(InfoStep)
  338. Starting the worker with this step installed will give us the following
  339. logs::
  340. <Worker: w@example.com (initializing)> is in init
  341. <Consumer: w@example.com (initializing)> is in init
  342. [2013-05-29 16:18:20,544: WARNING/MainProcess]
  343. <Worker: w@example.com (running)> is starting
  344. [2013-05-29 16:18:21,577: WARNING/MainProcess]
  345. <Consumer: w@example.com (running)> is starting
  346. <Consumer: w@example.com (closing)> is stopping
  347. <Worker: w@example.com (closing)> is stopping
  348. <Consumer: w@example.com (terminating)> is shutting down
  349. The ``print`` statements will be redirected to the logging subsystem after
  350. the worker has been initialized, so the "is starting" lines are timestamped.
  351. You may notice that this does no longer happen at shutdown, this is because
  352. the ``stop`` and ``shutdown`` methods are called inside a *signal handler*,
  353. and it's not safe to use logging inside such a handler.
  354. Logging with the Python logging module is not :term:`reentrant`,
  355. which means that you cannot interrupt the function and
  356. call it again later. It's important that the ``stop`` and ``shutdown`` methods
  357. you write is also :term:`reentrant`.
  358. Starting the worker with ``--loglevel=debug`` will show us more
  359. information about the boot process::
  360. [2013-05-29 16:18:20,509: DEBUG/MainProcess] | Worker: Preparing bootsteps.
  361. [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: Building graph...
  362. <celery.apps.worker.Worker object at 0x101ad8410> is in init
  363. [2013-05-29 16:18:20,511: DEBUG/MainProcess] | Worker: New boot order:
  364. {Hub, Queues (intra), Pool, Autoreloader, Timer, StateDB,
  365. Autoscaler, InfoStep, Beat, Consumer}
  366. [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
  367. [2013-05-29 16:18:20,514: DEBUG/MainProcess] | Consumer: Building graph...
  368. <celery.worker.consumer.Consumer object at 0x101c2d8d0> is in init
  369. [2013-05-29 16:18:20,515: DEBUG/MainProcess] | Consumer: New boot order:
  370. {Connection, Mingle, Events, Gossip, InfoStep, Agent,
  371. Heart, Control, Tasks, event loop}
  372. [2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Hub
  373. [2013-05-29 16:18:20,522: DEBUG/MainProcess] ^-- substep ok
  374. [2013-05-29 16:18:20,522: DEBUG/MainProcess] | Worker: Starting Pool
  375. [2013-05-29 16:18:20,542: DEBUG/MainProcess] ^-- substep ok
  376. [2013-05-29 16:18:20,543: DEBUG/MainProcess] | Worker: Starting InfoStep
  377. [2013-05-29 16:18:20,544: WARNING/MainProcess]
  378. <celery.apps.worker.Worker object at 0x101ad8410> is starting
  379. [2013-05-29 16:18:20,544: DEBUG/MainProcess] ^-- substep ok
  380. [2013-05-29 16:18:20,544: DEBUG/MainProcess] | Worker: Starting Consumer
  381. [2013-05-29 16:18:20,544: DEBUG/MainProcess] | Consumer: Starting Connection
  382. [2013-05-29 16:18:20,559: INFO/MainProcess] Connected to amqp://guest@127.0.0.1:5672//
  383. [2013-05-29 16:18:20,560: DEBUG/MainProcess] ^-- substep ok
  384. [2013-05-29 16:18:20,560: DEBUG/MainProcess] | Consumer: Starting Mingle
  385. [2013-05-29 16:18:20,560: INFO/MainProcess] mingle: searching for neighbors
  386. [2013-05-29 16:18:21,570: INFO/MainProcess] mingle: no one here
  387. [2013-05-29 16:18:21,570: DEBUG/MainProcess] ^-- substep ok
  388. [2013-05-29 16:18:21,571: DEBUG/MainProcess] | Consumer: Starting Events
  389. [2013-05-29 16:18:21,572: DEBUG/MainProcess] ^-- substep ok
  390. [2013-05-29 16:18:21,572: DEBUG/MainProcess] | Consumer: Starting Gossip
  391. [2013-05-29 16:18:21,577: DEBUG/MainProcess] ^-- substep ok
  392. [2013-05-29 16:18:21,577: DEBUG/MainProcess] | Consumer: Starting InfoStep
  393. [2013-05-29 16:18:21,577: WARNING/MainProcess]
  394. <celery.worker.consumer.Consumer object at 0x101c2d8d0> is starting
  395. [2013-05-29 16:18:21,578: DEBUG/MainProcess] ^-- substep ok
  396. [2013-05-29 16:18:21,578: DEBUG/MainProcess] | Consumer: Starting Heart
  397. [2013-05-29 16:18:21,579: DEBUG/MainProcess] ^-- substep ok
  398. [2013-05-29 16:18:21,579: DEBUG/MainProcess] | Consumer: Starting Control
  399. [2013-05-29 16:18:21,583: DEBUG/MainProcess] ^-- substep ok
  400. [2013-05-29 16:18:21,583: DEBUG/MainProcess] | Consumer: Starting Tasks
  401. [2013-05-29 16:18:21,606: DEBUG/MainProcess] basic.qos: prefetch_count->80
  402. [2013-05-29 16:18:21,606: DEBUG/MainProcess] ^-- substep ok
  403. [2013-05-29 16:18:21,606: DEBUG/MainProcess] | Consumer: Starting event loop
  404. [2013-05-29 16:18:21,608: WARNING/MainProcess] celery@example.com ready.
  405. .. _extending-programs:
  406. Command-line programs
  407. =====================
  408. .. _extending-commandoptions:
  409. Adding new command-line options
  410. -------------------------------
  411. .. _extending-command-options:
  412. Command-specific options
  413. ~~~~~~~~~~~~~~~~~~~~~~~~
  414. You can add additional command-line options to the ``worker``, ``beat`` and
  415. ``events`` commands by modifying the :attr:`~@Celery.user_options` attribute of the
  416. application instance.
  417. Celery commands uses the :mod:`optparse` module to parse command-line
  418. arguments, and so you have to use :mod:`optparse` specific option instances created
  419. using :func:`optparse.make_option`. Please see the :mod:`optparse`
  420. documentation to read about the fields supported.
  421. Example adding a custom option to the :program:`celery worker` command:
  422. .. code-block:: python
  423. from celery import Celery
  424. from celery.bin import Option # <-- alias to optparse.make_option
  425. app = Celery(broker='amqp://')
  426. app.user_options['worker'].add(
  427. Option('--enable-my-option', action='store_true', default=False,
  428. help='Enable custom option.'),
  429. )
  430. All bootsteps will now receive this argument as a keyword argument to
  431. ``Bootstep.__init__``:
  432. .. code-block:: python
  433. from celery import bootsteps
  434. class MyBootstep(bootsteps.Step):
  435. def __init__(self, worker, enable_my_option=False, **options):
  436. if enable_my_option:
  437. party()
  438. app.steps['worker'].add(MyBootstep)
  439. .. _extending-preload_options:
  440. Preload options
  441. ~~~~~~~~~~~~~~~
  442. The :program:`celery` umbrella command supports the concept of 'preload
  443. options', which are special options passed to all subcommands and parsed
  444. outside of the main parsing step.
  445. The list of default preload options can be found in the API reference:
  446. :mod:`celery.bin.base`.
  447. You can add new preload options too, e.g. to specify a configuration template:
  448. .. code-block:: python
  449. from celery import Celery
  450. from celery import signals
  451. from celery.bin import Option
  452. app = Celery()
  453. app.user_options['preload'].add(
  454. Option('-Z', '--template', default='default',
  455. help='Configuration template to use.'),
  456. )
  457. @signals.user_preload_options.connect
  458. def on_preload_parsed(options, **kwargs):
  459. use_template(options['template'])
  460. .. _extending-subcommands:
  461. Adding new :program:`celery` sub-commands
  462. -----------------------------------------
  463. New commands can be added to the :program:`celery` umbrella command by using
  464. `setuptools entry-points`_.
  465. .. _`setuptools entry-points`:
  466. http://reinout.vanrees.org/weblog/2010/01/06/zest-releaser-entry-points.html
  467. Entry-points is special metadata that can be added to your packages ``setup.py`` program,
  468. and then after installation, read from the system using the :mod:`pkg_resources` module.
  469. Celery recognizes ``celery.commands`` entry-points to install additional
  470. subcommands, where the value of the entry-point must point to a valid subclass
  471. of :class:`celery.bin.base.Command`. There is limited documentation,
  472. unfortunately, but you can find inspiration from the various commands in the
  473. :mod:`celery.bin` package.
  474. This is how the Flower_ monitoring extension adds the :program:`celery flower` command,
  475. by adding an entry-point in :file:`setup.py`:
  476. .. code-block:: python
  477. setup(
  478. name='flower',
  479. entry_points={
  480. 'celery.commands': [
  481. 'flower = flower.command.FlowerCommand',
  482. ],
  483. }
  484. )
  485. .. _Flower: http://pypi.python.org/pypi/flower
  486. The command definition is in two parts separated by the equal sign, where the
  487. first part is the name of the subcommand (flower), then the fully qualified
  488. module path to the class that implements the command
  489. (``flower.command.FlowerCommand``).
  490. In the module :file:`flower/command.py`, the command class is defined
  491. something like this:
  492. .. code-block:: python
  493. from celery.bin.base import Command, Option
  494. class FlowerCommand(Command):
  495. def get_options(self):
  496. return (
  497. Option('--port', default=8888, type='int',
  498. help='Webserver port',
  499. ),
  500. Option('--debug', action='store_true'),
  501. )
  502. def run(self, port=None, debug=False, **kwargs):
  503. print('Running our command')
  504. Worker API
  505. ==========
  506. :class:`~kombu.async.Hub` - The workers async event loop.
  507. ---------------------------------------------------------
  508. :supported transports: amqp, redis
  509. .. versionadded:: 3.0
  510. The worker uses asynchronous I/O when the amqp or redis broker transports are
  511. used. The eventual goal is for all transports to use the eventloop, but that
  512. will take some time so other transports still use a threading-based solution.
  513. .. method:: hub.add(fd, callback, flags)
  514. .. method:: hub.add_reader(fd, callback, \*args)
  515. Add callback to be called when ``fd`` is readable.
  516. The callback will stay registered until explictly removed using
  517. :meth:`hub.remove(fd) <hub.remove>`, or the fd is automatically discarded
  518. because it's no longer valid.
  519. Note that only one callback can be registered for any given fd at a time,
  520. so calling ``add`` a second time will remove any callback that
  521. was previously registered for that fd.
  522. A file descriptor is any file-like object that supports the ``fileno``
  523. method, or it can be the file descriptor number (int).
  524. .. method:: hub.add_writer(fd, callback, \*args)
  525. Add callback to be called when ``fd`` is writable.
  526. See also notes for :meth:`hub.add_reader` above.
  527. .. method:: hub.remove(fd)
  528. Remove all callbacks for ``fd`` from the loop.
  529. Timer - Scheduling events
  530. -------------------------
  531. .. method:: timer.call_after(secs, callback, args=(), kwargs=(),
  532. priority=0)
  533. .. method:: timer.call_repeatedly(secs, callback, args=(), kwargs=(),
  534. priority=0)
  535. .. method:: timer.call_at(eta, callback, args=(), kwargs=(),
  536. priority=0)