123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- # -*- coding: utf-8 -*-
- """
- celery.app.control
- ~~~~~~~~~~~~~~~~~~~
- Client for worker remote control commands.
- Server implementation is in :mod:`celery.worker.control`.
- :copyright: (c) 2009 - 2012 by Ask Solem.
- :license: BSD, see LICENSE for more details.
- """
- from __future__ import absolute_import
- from __future__ import with_statement
- from kombu.pidbox import Mailbox
- from kombu.utils import cached_property
- from . import app_or_default
- def flatten_reply(reply):
- nodes = {}
- for item in reply:
- nodes.update(item)
- return nodes
- class Inspect(object):
- app = None
- def __init__(self, destination=None, timeout=1, callback=None,
- connection=None, app=None):
- self.app = app or self.app
- self.destination = destination
- self.timeout = timeout
- self.callback = callback
- self.connection = connection
- def _prepare(self, reply):
- if not reply:
- return
- by_node = flatten_reply(reply)
- if self.destination and \
- not isinstance(self.destination, (list, tuple)):
- return by_node.get(self.destination)
- return by_node
- def _request(self, command, **kwargs):
- return self._prepare(self.app.control.broadcast(command,
- arguments=kwargs,
- destination=self.destination,
- callback=self.callback,
- connection=self.connection,
- timeout=self.timeout, reply=True))
- def report(self):
- return self._request("report")
- def active(self, safe=False):
- return self._request("dump_active", safe=safe)
- def scheduled(self, safe=False):
- return self._request("dump_schedule", safe=safe)
- def reserved(self, safe=False):
- return self._request("dump_reserved", safe=safe)
- def stats(self):
- return self._request("stats")
- def revoked(self):
- return self._request("dump_revoked")
- def registered(self):
- return self._request("dump_tasks")
- registered_tasks = registered
- def ping(self):
- return self._request("ping")
- def active_queues(self):
- return self._request("active_queues")
- class Control(object):
- Mailbox = Mailbox
- def __init__(self, app=None):
- self.app = app_or_default(app)
- self.mailbox = self.Mailbox("celeryd", type="fanout")
- @cached_property
- def inspect(self):
- return self.app.subclass_with_self(Inspect, reverse="control.inspect")
- def purge(self, connection=None):
- """Discard all waiting tasks.
- This will ignore all tasks waiting for execution, and they will
- be deleted from the messaging server.
- :returns: the number of tasks discarded.
- """
- with self.app.default_connection(connection) as conn:
- return self.app.amqp.TaskConsumer(conn).purge()
- discard_all = purge
- def revoke(self, task_id, destination=None, terminate=False,
- signal="SIGTERM", **kwargs):
- """Tell all (or specific) workers to revoke a task by id.
- If a task is revoked, the workers will ignore the task and
- not execute it after all.
- :param task_id: Id of the task to revoke.
- :keyword terminate: Also terminate the process currently working
- on the task (if any).
- :keyword signal: Name of signal to send to process if terminate.
- Default is TERM.
- See :meth:`broadcast` for supported keyword arguments.
- """
- return self.broadcast("revoke", destination=destination,
- arguments={"task_id": task_id,
- "terminate": terminate,
- "signal": signal}, **kwargs)
- def ping(self, destination=None, timeout=1, **kwargs):
- """Ping all (or specific) workers.
- Returns answer from alive workers.
- See :meth:`broadcast` for supported keyword arguments.
- """
- return self.broadcast("ping", reply=True, destination=destination,
- timeout=timeout, **kwargs)
- def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
- """Tell all (or specific) workers to set a new rate limit
- for task by type.
- :param task_name: Name of task to change rate limit for.
- :param rate_limit: The rate limit as tasks per second, or a rate limit
- string (`"100/m"`, etc.
- see :attr:`celery.task.base.Task.rate_limit` for
- more information).
- See :meth:`broadcast` for supported keyword arguments.
- """
- return self.broadcast("rate_limit", destination=destination,
- arguments={"task_name": task_name,
- "rate_limit": rate_limit},
- **kwargs)
- def add_consumer(self, queue, exchange=None, exchange_type="direct",
- routing_key=None, **options):
- """Tell all (or specific) workers to start consuming from a new queue.
- Only the queue name is required as if only the queue is specified
- then the exchange/routing key will be set to the same name (
- like automatic queues do).
- .. note::
- This command does not respect the default queue/exchange
- options in the configuration.
- :param queue: Name of queue to start consuming from.
- :keyword exchange: Optional name of exchange.
- :keyword exchange_type: Type of exchange (defaults to "direct")
- command to, when empty broadcast to all workers.
- :keyword routing_key: Optional routing key.
- See :meth:`broadcast` for supported keyword arguments.
- """
- return self.broadcast("add_consumer",
- arguments={"queue": queue, "exchange": exchange,
- "exchange_type": exchange_type,
- "routing_key": routing_key}, **options)
- def cancel_consumer(self, queue, **kwargs):
- """Tell all (or specific) workers to stop consuming from ``queue``.
- Supports the same keyword arguments as :meth:`broadcast`.
- """
- return self.broadcast("cancel_consumer",
- arguments={"queue": queue}, **kwargs)
- def time_limit(self, task_name, soft=None, hard=None, **kwargs):
- """Tell all (or specific) workers to set time limits for
- a task by type.
- :param task_name: Name of task to change time limits for.
- :keyword soft: New soft time limit (in seconds).
- :keyword hard: New hard time limit (in seconds).
- Any additional keyword arguments are passed on to :meth:`broadcast`.
- """
- return self.broadcast("time_limit",
- arguments={"task_name": task_name,
- "hard": hard, "soft": soft}, **kwargs)
- def enable_events(self, destination=None, **kwargs):
- """Tell all (or specific) workers to enable events."""
- return self.broadcast("enable_events", {}, destination, **kwargs)
- def disable_events(self, destination=None, **kwargs):
- """Tell all (or specific) workers to enable events."""
- return self.broadcast("disable_events", {}, destination, **kwargs)
- def pool_grow(self, n=1, destination=None, **kwargs):
- """Tell all (or specific) workers to grow the pool by ``n``.
- Supports the same arguments as :meth:`broadcast`.
- """
- return self.broadcast("pool_grow", {}, destination, **kwargs)
- def pool_shrink(self, n=1, destination=None, **kwargs):
- """Tell all (or specific) workers to shrink the pool by ``n``.
- Supports the same arguments as :meth:`broadcast`.
- """
- return self.broadcast("pool_shrink", {}, destination, **kwargs)
- def broadcast(self, command, arguments=None, destination=None,
- connection=None, reply=False, timeout=1, limit=None,
- callback=None, channel=None, **extra_kwargs):
- """Broadcast a control command to the celery workers.
- :param command: Name of command to send.
- :param arguments: Keyword arguments for the command.
- :keyword destination: If set, a list of the hosts to send the
- command to, when empty broadcast to all workers.
- :keyword connection: Custom broker connection to use, if not set,
- a connection will be established automatically.
- :keyword reply: Wait for and return the reply.
- :keyword timeout: Timeout in seconds to wait for the reply.
- :keyword limit: Limit number of replies.
- :keyword callback: Callback called immediately for each reply
- received.
- """
- with self.app.default_connection(connection) as conn:
- if channel is None:
- channel = conn.default_channel
- arguments = dict(arguments, **extra_kwargs)
- return self.mailbox(conn)._broadcast(command, arguments,
- destination, reply, timeout,
- limit, callback,
- channel=channel)
|