|
@@ -21,7 +21,7 @@ Starting the worker
|
|
|
|
|
|
You can start the worker in the foreground by executing the command::
|
|
|
|
|
|
- $ celery worker --loglevel=INFO --app=app
|
|
|
+ $ celery worker --app=app -l info
|
|
|
|
|
|
For a full list of available command line options see
|
|
|
:mod:`~celery.bin.celeryd`, or simply do::
|
|
@@ -120,18 +120,133 @@ argument and defaults to the number of CPUs available on the machine.
|
|
|
to find the numbers that works best for you, as this varies based on
|
|
|
application, work load, task run times and other factors.
|
|
|
|
|
|
+.. _worker-remote-control:
|
|
|
+
|
|
|
+Remote control
|
|
|
+==============
|
|
|
+
|
|
|
+.. versionadded:: 2.0
|
|
|
+
|
|
|
+.. sidebar:: The ``celery`` command
|
|
|
+
|
|
|
+ The :program:`celery` program is used to execute remote control
|
|
|
+ commands from the command line. It supports all of the commands
|
|
|
+ listed below. See :ref:`monitoring-celeryctl` for more information.
|
|
|
+
|
|
|
+pool support: *processes, eventlet, gevent*, blocking:*threads/solo* (see note)
|
|
|
+broker support: *amqp, redis, mongodb*
|
|
|
+
|
|
|
+Workers have the ability to be remote controlled using a high-priority
|
|
|
+broadcast message queue. The commands can be directed to all, or a specific
|
|
|
+list of workers.
|
|
|
+
|
|
|
+Commands can also have replies. The client can then wait for and collect
|
|
|
+those replies. Since there's no central authority to know how many
|
|
|
+workers are available in the cluster, there is also no way to estimate
|
|
|
+how many workers may send a reply, so the client has a configurable
|
|
|
+timeout — the deadline in seconds for replies to arrive in. This timeout
|
|
|
+defaults to one second. If the worker doesn't reply within the deadline
|
|
|
+it doesn't necessarily mean the worker didn't reply, or worse is dead, but
|
|
|
+may simply be caused by network latency or the worker being slow at processing
|
|
|
+commands, so adjust the timeout accordingly.
|
|
|
+
|
|
|
+In addition to timeouts, the client can specify the maximum number
|
|
|
+of replies to wait for. If a destination is specified, this limit is set
|
|
|
+to the number of destination hosts.
|
|
|
+
|
|
|
+.. note::
|
|
|
+
|
|
|
+ The solo and threads pool supports remote control commands,
|
|
|
+ but any task executing will block any waiting control command,
|
|
|
+ so it is of limited use if the worker is very busy. In that
|
|
|
+ case you must increase the timeout waitin for replies in the client.
|
|
|
+
|
|
|
+.. _worker-broadcast-fun:
|
|
|
+
|
|
|
+The :meth:`~@control.broadcast` function.
|
|
|
+----------------------------------------------------
|
|
|
+
|
|
|
+This is the client function used to send commands to the workers.
|
|
|
+Some remote control commands also have higher-level interfaces using
|
|
|
+:meth:`~@control.broadcast` in the background, like
|
|
|
+:meth:`~@control.rate_limit` and :meth:`~@control.ping`.
|
|
|
+
|
|
|
+Sending the :control:`rate_limit` command and keyword arguments::
|
|
|
+
|
|
|
+ >>> from celery.task.control import broadcast
|
|
|
+ >>> celery.control.broadcast("rate_limit",
|
|
|
+ ... arguments={"task_name": "myapp.mytask",
|
|
|
+ ... "rate_limit": "200/m"})
|
|
|
+
|
|
|
+This will send the command asynchronously, without waiting for a reply.
|
|
|
+To request a reply you have to use the `reply` argument::
|
|
|
+
|
|
|
+ >>> celery.control.broadcast("rate_limit", {
|
|
|
+ ... "task_name": "myapp.mytask", "rate_limit": "200/m"}, reply=True)
|
|
|
+ [{'worker1.example.com': 'New rate limit set successfully'},
|
|
|
+ {'worker2.example.com': 'New rate limit set successfully'},
|
|
|
+ {'worker3.example.com': 'New rate limit set successfully'}]
|
|
|
+
|
|
|
+Using the `destination` argument you can specify a list of workers
|
|
|
+to receive the command::
|
|
|
+
|
|
|
+ >>> celery.control.broadcast("rate_limit", {
|
|
|
+ ... "task_name": "myapp.mytask",
|
|
|
+ ... "rate_limit": "200/m"}, reply=True,
|
|
|
+ ... destination=["worker1.example.com"])
|
|
|
+ [{'worker1.example.com': 'New rate limit set successfully'}]
|
|
|
+
|
|
|
+
|
|
|
+Of course, using the higher-level interface to set rate limits is much
|
|
|
+more convenient, but there are commands that can only be requested
|
|
|
+using :meth:`~@control.broadcast`.
|
|
|
+
|
|
|
+.. control:: revoke
|
|
|
+
|
|
|
+Revoking tasks
|
|
|
+==============
|
|
|
+pool support: all
|
|
|
+broker support: *amqp, redis, mongodb*
|
|
|
+
|
|
|
+All worker nodes keeps a memory of revoked task ids, either in-memory or
|
|
|
+persistent on disk (see :ref:`worker-persistent-revokes`).
|
|
|
+
|
|
|
+When a worker receives a revoke request it will skip executing
|
|
|
+the task, but it won't terminate an already executing task unless
|
|
|
+the `terminate` option is set.
|
|
|
+
|
|
|
+If `terminate` is set the worker child process processing the task
|
|
|
+will be terminated. The default signal sent is `TERM`, but you can
|
|
|
+specify this using the `signal` argument. Signal can be the uppercase name
|
|
|
+of any signal defined in the :mod:`signal` module in the Python Standard
|
|
|
+Library.
|
|
|
+
|
|
|
+Terminating a task also revokes it.
|
|
|
+
|
|
|
+**Example**
|
|
|
+
|
|
|
+::
|
|
|
+
|
|
|
+ >>> celery.control.revoke("d9078da5-9915-40a0-bfa1-392c7bde42ed")
|
|
|
+
|
|
|
+ >>> celery.control.revoke("d9078da5-9915-40a0-bfa1-392c7bde42ed",
|
|
|
+ ... terminate=True)
|
|
|
+
|
|
|
+ >>> celery.control.revoke("d9078da5-9915-40a0-bfa1-392c7bde42ed",
|
|
|
+ ... terminate=True, signal="SIGKILL")
|
|
|
+
|
|
|
.. _worker-persistent-revokes:
|
|
|
|
|
|
Persistent revokes
|
|
|
-==================
|
|
|
+------------------
|
|
|
|
|
|
Revoking tasks works by sending a broadcast message to all the workers,
|
|
|
the workers then keep a list of revoked tasks in memory.
|
|
|
|
|
|
If you want tasks to remain revoked after worker restart you need to
|
|
|
specify a file for these to be stored in, either by using the `--statedb`
|
|
|
-argument to :mod:`~celery.bin.celeryd` or the :setting:`CELERYD_STATE_DB`
|
|
|
-setting. See :setting:`CELERYD_STATE_DB` for more information.
|
|
|
+argument to :program:`celery worker` or the :setting:`CELERYD_STATE_DB`
|
|
|
+setting.
|
|
|
|
|
|
Note that remote control commands must be working for revokes to work.
|
|
|
Remote control commands are only supported by the amqp, redis and mongodb
|
|
@@ -204,6 +319,32 @@ two minutes::
|
|
|
|
|
|
Only tasks that starts executing after the time limit change will be affected.
|
|
|
|
|
|
+.. _worker-rate-limits:
|
|
|
+
|
|
|
+Rate Limits
|
|
|
+===========
|
|
|
+
|
|
|
+.. control:: rate_limit
|
|
|
+
|
|
|
+Changing rate-limits at runtime
|
|
|
+-------------------------------
|
|
|
+
|
|
|
+Example changing the rate limit for the `myapp.mytask` task to accept
|
|
|
+200 tasks a minute on all servers::
|
|
|
+
|
|
|
+ >>> celery.control.rate_limit("myapp.mytask", "200/m")
|
|
|
+
|
|
|
+Example changing the rate limit on a single host by specifying the
|
|
|
+destination host name::
|
|
|
+
|
|
|
+ >>> celery.control.rate_limit("myapp.mytask", "200/m",
|
|
|
+ ... destination=["worker1.example.com"])
|
|
|
+
|
|
|
+.. warning::
|
|
|
+
|
|
|
+ This won't affect workers with the
|
|
|
+ :setting:`CELERY_DISABLE_RATE_LIMITS` setting enabled.
|
|
|
+
|
|
|
.. _worker-maxtasksperchild:
|
|
|
|
|
|
Max tasks per child setting
|
|
@@ -302,200 +443,16 @@ environment variable::
|
|
|
|
|
|
$ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
|
|
|
|
|
|
-.. _worker-remote-control:
|
|
|
-
|
|
|
-Remote control
|
|
|
-==============
|
|
|
-
|
|
|
-.. versionadded:: 2.0
|
|
|
-
|
|
|
-pool support: *processes, eventlet, gevent*, blocking:*threads/solo* (see note)
|
|
|
-broker support: *amqp, redis, mongodb*
|
|
|
-
|
|
|
-Workers have the ability to be remote controlled using a high-priority
|
|
|
-broadcast message queue. The commands can be directed to all, or a specific
|
|
|
-list of workers.
|
|
|
-
|
|
|
-Commands can also have replies. The client can then wait for and collect
|
|
|
-those replies. Since there's no central authority to know how many
|
|
|
-workers are available in the cluster, there is also no way to estimate
|
|
|
-how many workers may send a reply, so the client has a configurable
|
|
|
-timeout — the deadline in seconds for replies to arrive in. This timeout
|
|
|
-defaults to one second. If the worker doesn't reply within the deadline
|
|
|
-it doesn't necessarily mean the worker didn't reply, or worse is dead, but
|
|
|
-may simply be caused by network latency or the worker being slow at processing
|
|
|
-commands, so adjust the timeout accordingly.
|
|
|
-
|
|
|
-In addition to timeouts, the client can specify the maximum number
|
|
|
-of replies to wait for. If a destination is specified, this limit is set
|
|
|
-to the number of destination hosts.
|
|
|
-
|
|
|
-.. seealso::
|
|
|
-
|
|
|
- The :program:`celery` program is used to execute remote control
|
|
|
- commands from the command line. It supports all of the commands
|
|
|
- listed below. See :ref:`monitoring-celeryctl` for more information.
|
|
|
-
|
|
|
-.. note::
|
|
|
-
|
|
|
- The solo and threads pool supports remote control commands,
|
|
|
- but any task executing will block any waiting control command,
|
|
|
- so it is of limited use if the worker is very busy. In that
|
|
|
- case you must increase the timeout waitin for replies in the client.
|
|
|
-
|
|
|
-.. _worker-broadcast-fun:
|
|
|
-
|
|
|
-The :meth:`~@control.broadcast` function.
|
|
|
-----------------------------------------------------
|
|
|
-
|
|
|
-This is the client function used to send commands to the workers.
|
|
|
-Some remote control commands also have higher-level interfaces using
|
|
|
-:meth:`~@control.broadcast` in the background, like
|
|
|
-:meth:`~@control.rate_limit` and :meth:`~@control.ping`.
|
|
|
-
|
|
|
-Sending the :control:`rate_limit` command and keyword arguments::
|
|
|
-
|
|
|
- >>> from celery.task.control import broadcast
|
|
|
- >>> celery.control.broadcast("rate_limit",
|
|
|
- ... arguments={"task_name": "myapp.mytask",
|
|
|
- ... "rate_limit": "200/m"})
|
|
|
-
|
|
|
-This will send the command asynchronously, without waiting for a reply.
|
|
|
-To request a reply you have to use the `reply` argument::
|
|
|
-
|
|
|
- >>> celery.control.broadcast("rate_limit", {
|
|
|
- ... "task_name": "myapp.mytask", "rate_limit": "200/m"}, reply=True)
|
|
|
- [{'worker1.example.com': 'New rate limit set successfully'},
|
|
|
- {'worker2.example.com': 'New rate limit set successfully'},
|
|
|
- {'worker3.example.com': 'New rate limit set successfully'}]
|
|
|
-
|
|
|
-Using the `destination` argument you can specify a list of workers
|
|
|
-to receive the command::
|
|
|
-
|
|
|
- >>> celery.control.broadcast("rate_limit", {
|
|
|
- ... "task_name": "myapp.mytask",
|
|
|
- ... "rate_limit": "200/m"}, reply=True,
|
|
|
- ... destination=["worker1.example.com"])
|
|
|
- [{'worker1.example.com': 'New rate limit set successfully'}]
|
|
|
-
|
|
|
-
|
|
|
-Of course, using the higher-level interface to set rate limits is much
|
|
|
-more convenient, but there are commands that can only be requested
|
|
|
-using :meth:`~@control.broadcast`.
|
|
|
-
|
|
|
-.. _worker-rate-limits:
|
|
|
-
|
|
|
-.. control:: rate_limit
|
|
|
-
|
|
|
-Rate limits
|
|
|
------------
|
|
|
-
|
|
|
-Example changing the rate limit for the `myapp.mytask` task to accept
|
|
|
-200 tasks a minute on all servers::
|
|
|
-
|
|
|
- >>> celery.control.rate_limit("myapp.mytask", "200/m")
|
|
|
-
|
|
|
-Example changing the rate limit on a single host by specifying the
|
|
|
-destination host name::
|
|
|
-
|
|
|
- >>> celery.control.rate_limit("myapp.mytask", "200/m",
|
|
|
- ... destination=["worker1.example.com"])
|
|
|
-
|
|
|
-.. warning::
|
|
|
-
|
|
|
- This won't affect workers with the
|
|
|
- :setting:`CELERY_DISABLE_RATE_LIMITS` setting on. To re-enable rate limits
|
|
|
- then you have to restart the worker.
|
|
|
-
|
|
|
-.. control:: revoke
|
|
|
-
|
|
|
-Revoking tasks
|
|
|
---------------
|
|
|
-
|
|
|
-All worker nodes keeps a memory of revoked task ids, either in-memory or
|
|
|
-persistent on disk (see :ref:`worker-persistent-revokes`).
|
|
|
-
|
|
|
-When a worker receives a revoke request it will skip executing
|
|
|
-the task, but it won't terminate an already executing task unless
|
|
|
-the `terminate` option is set.
|
|
|
-
|
|
|
-If `terminate` is set the worker child process processing the task
|
|
|
-will be terminated. The default signal sent is `TERM`, but you can
|
|
|
-specify this using the `signal` argument. Signal can be the uppercase name
|
|
|
-of any signal defined in the :mod:`signal` module in the Python Standard
|
|
|
-Library.
|
|
|
-
|
|
|
-Terminating a task also revokes it.
|
|
|
-
|
|
|
-**Example**
|
|
|
-
|
|
|
-::
|
|
|
-
|
|
|
- >>> celery.control.revoke("d9078da5-9915-40a0-bfa1-392c7bde42ed")
|
|
|
-
|
|
|
- >>> celery.control.revoke("d9078da5-9915-40a0-bfa1-392c7bde42ed",
|
|
|
- ... terminate=True)
|
|
|
-
|
|
|
- >>> celery.control.revoke("d9078da5-9915-40a0-bfa1-392c7bde42ed",
|
|
|
- ... terminate=True, signal="SIGKILL")
|
|
|
-
|
|
|
-.. control:: shutdown
|
|
|
-
|
|
|
-Remote shutdown
|
|
|
----------------
|
|
|
-
|
|
|
-This command will gracefully shut down the worker remotely::
|
|
|
-
|
|
|
- >>> celery.control.broadcast("shutdown") # shutdown all workers
|
|
|
- >>> celery.control.broadcast("shutdown, destination="worker1.example.com")
|
|
|
-
|
|
|
-.. control:: ping
|
|
|
-
|
|
|
-Ping
|
|
|
-----
|
|
|
-
|
|
|
-This command requests a ping from alive workers.
|
|
|
-The workers reply with the string 'pong', and that's just about it.
|
|
|
-It will use the default one second timeout for replies unless you specify
|
|
|
-a custom timeout::
|
|
|
-
|
|
|
- >>> celery.control.ping(timeout=0.5)
|
|
|
- [{'worker1.example.com': 'pong'},
|
|
|
- {'worker2.example.com': 'pong'},
|
|
|
- {'worker3.example.com': 'pong'}]
|
|
|
-
|
|
|
-:meth:`~@control.ping` also supports the `destination` argument,
|
|
|
-so you can specify which workers to ping::
|
|
|
-
|
|
|
- >>> ping(['worker2.example.com', 'worker3.example.com'])
|
|
|
- [{'worker2.example.com': 'pong'},
|
|
|
- {'worker3.example.com': 'pong'}]
|
|
|
-
|
|
|
-.. _worker-enable-events:
|
|
|
-
|
|
|
-.. control:: enable_events
|
|
|
-.. control:: disable_events
|
|
|
-
|
|
|
-Enable/disable events
|
|
|
----------------------
|
|
|
-
|
|
|
-You can enable/disable events by using the `enable_events`,
|
|
|
-`disable_events` commands. This is useful to temporarily monitor
|
|
|
-a worker using :program:`celery events`/:program:`celerymon`.
|
|
|
-
|
|
|
-.. code-block:: python
|
|
|
-
|
|
|
- >>> celery.control.enable_events()
|
|
|
- >>> celery.control.disable_events()
|
|
|
-
|
|
|
.. _worker-autoreload:
|
|
|
|
|
|
-Autoreloading
|
|
|
--------------
|
|
|
+.. control:: pool_restart
|
|
|
+
|
|
|
+Pool Restart Command
|
|
|
+--------------------
|
|
|
|
|
|
.. versionadded:: 2.5
|
|
|
|
|
|
-The remote control command ``pool_restart`` sends restart requests to
|
|
|
+The remote control command :control:`pool_restart` sends restart requests to
|
|
|
the workers child processes. It is particularly useful for forcing
|
|
|
the worker to import new modules, or for reloading already imported
|
|
|
modules. This command does not interrupt executing tasks.
|
|
@@ -545,34 +502,6 @@ your own custom reloader by passing the ``reloader`` argument.
|
|
|
- http://www.indelible.org/ink/python-reloading/
|
|
|
- http://docs.python.org/library/functions.html#reload
|
|
|
|
|
|
-.. _worker-custom-control-commands:
|
|
|
-
|
|
|
-Writing your own remote control commands
|
|
|
-----------------------------------------
|
|
|
-
|
|
|
-Remote control commands are registered in the control panel and
|
|
|
-they take a single argument: the current
|
|
|
-:class:`~celery.worker.control.ControlDispatch` instance.
|
|
|
-From there you have access to the active
|
|
|
-:class:`~celery.worker.consumer.Consumer` if needed.
|
|
|
-
|
|
|
-Here's an example control command that restarts the broker connection:
|
|
|
-
|
|
|
-.. code-block:: python
|
|
|
-
|
|
|
- from celery.worker.control import Panel
|
|
|
-
|
|
|
- @Panel.register
|
|
|
- def reset_connection(panel):
|
|
|
- panel.logger.critical("Connection reset by remote control.")
|
|
|
- panel.consumer.reset_connection()
|
|
|
- return {"ok": "connection reset"}
|
|
|
-
|
|
|
-
|
|
|
-These can be added to task modules, or you can keep them in their own module
|
|
|
-then import them using the :setting:`CELERY_IMPORTS` setting::
|
|
|
-
|
|
|
- CELERY_IMPORTS = ("myapp.worker.control", )
|
|
|
|
|
|
.. _worker-inspect:
|
|
|
|
|
@@ -582,6 +511,9 @@ Inspecting workers
|
|
|
:class:`@control.inspect` lets you inspect running workers. It
|
|
|
uses remote control commands under the hood.
|
|
|
|
|
|
+You can also use the ``celery`` command to inspect workers,
|
|
|
+and it supports the same commands as the :class:`@Celery.control` interface.
|
|
|
+
|
|
|
.. code-block:: python
|
|
|
|
|
|
# Inspect all nodes.
|
|
@@ -603,12 +535,7 @@ You can get a list of tasks registered in the worker using the
|
|
|
:meth:`~@control.inspect.registered`::
|
|
|
|
|
|
>>> i.registered()
|
|
|
- [{'worker1.example.com': ['celery.delete_expired_task_meta',
|
|
|
- 'celery.execute_remote',
|
|
|
- 'celery.map_async',
|
|
|
- 'celery.ping',
|
|
|
- 'celery.task.http.HttpDispatchTask',
|
|
|
- 'tasks.add',
|
|
|
+ [{'worker1.example.com': ['tasks.add',
|
|
|
'tasks.sleeptask']}]
|
|
|
|
|
|
.. _worker-inspect-active-tasks:
|
|
@@ -649,7 +576,9 @@ You can get a list of tasks waiting to be scheduled by using
|
|
|
"args": "[2]",
|
|
|
"kwargs": "{}"}}]}]
|
|
|
|
|
|
-Note that these are tasks with an eta/countdown argument, not periodic tasks.
|
|
|
+.. note::
|
|
|
+
|
|
|
+ These are tasks with an eta/countdown argument, not periodic tasks.
|
|
|
|
|
|
.. _worker-inspect-reserved:
|
|
|
|
|
@@ -668,3 +597,79 @@ You can get a list of these using
|
|
|
"id": "32666e9b-809c-41fa-8e93-5ae0c80afbbf",
|
|
|
"args": "(8,)",
|
|
|
"kwargs": "{}"}]}]
|
|
|
+
|
|
|
+
|
|
|
+Additional Commands
|
|
|
+===================
|
|
|
+
|
|
|
+.. control:: shutdown
|
|
|
+
|
|
|
+Remote shutdown
|
|
|
+---------------
|
|
|
+
|
|
|
+This command will gracefully shut down the worker remotely::
|
|
|
+
|
|
|
+ >>> celery.control.broadcast("shutdown") # shutdown all workers
|
|
|
+ >>> celery.control.broadcast("shutdown, destination="worker1.example.com")
|
|
|
+
|
|
|
+.. control:: ping
|
|
|
+
|
|
|
+Ping
|
|
|
+----
|
|
|
+
|
|
|
+This command requests a ping from alive workers.
|
|
|
+The workers reply with the string 'pong', and that's just about it.
|
|
|
+It will use the default one second timeout for replies unless you specify
|
|
|
+a custom timeout::
|
|
|
+
|
|
|
+ >>> celery.control.ping(timeout=0.5)
|
|
|
+ [{'worker1.example.com': 'pong'},
|
|
|
+ {'worker2.example.com': 'pong'},
|
|
|
+ {'worker3.example.com': 'pong'}]
|
|
|
+
|
|
|
+:meth:`~@control.ping` also supports the `destination` argument,
|
|
|
+so you can specify which workers to ping::
|
|
|
+
|
|
|
+ >>> ping(['worker2.example.com', 'worker3.example.com'])
|
|
|
+ [{'worker2.example.com': 'pong'},
|
|
|
+ {'worker3.example.com': 'pong'}]
|
|
|
+
|
|
|
+.. _worker-enable-events:
|
|
|
+
|
|
|
+.. control:: enable_events
|
|
|
+.. control:: disable_events
|
|
|
+
|
|
|
+Enable/disable events
|
|
|
+---------------------
|
|
|
+
|
|
|
+You can enable/disable events by using the `enable_events`,
|
|
|
+`disable_events` commands. This is useful to temporarily monitor
|
|
|
+a worker using :program:`celery events`/:program:`celerymon`.
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ >>> celery.control.enable_events()
|
|
|
+ >>> celery.control.disable_events()
|
|
|
+
|
|
|
+.. _worker-custom-control-commands:
|
|
|
+
|
|
|
+Writing your own remote control commands
|
|
|
+========================================
|
|
|
+
|
|
|
+Remote control commands are registered in the control panel and
|
|
|
+they take a single argument: the current
|
|
|
+:class:`~celery.worker.control.ControlDispatch` instance.
|
|
|
+From there you have access to the active
|
|
|
+:class:`~celery.worker.consumer.Consumer` if needed.
|
|
|
+
|
|
|
+Here's an example control command that restarts the broker connection:
|
|
|
+
|
|
|
+.. code-block:: python
|
|
|
+
|
|
|
+ from celery.worker.control import Panel
|
|
|
+
|
|
|
+ @Panel.register
|
|
|
+ def reset_connection(panel):
|
|
|
+ panel.logger.critical("Connection reset by remote control.")
|
|
|
+ panel.consumer.reset_connection()
|
|
|
+ return {"ok": "connection reset"}
|