123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- # -*- coding: utf-8 -*-
- """Worker Remote Control Client.
- Client for worker remote control commands.
- Server implementation is in :mod:`celery.worker.control`.
- """
- from __future__ import absolute_import, unicode_literals
- import warnings
- from billiard.common import TERM_SIGNAME
- from kombu.pidbox import Mailbox
- from kombu.utils.functional import lazy
- from kombu.utils.objects import cached_property
- from celery.exceptions import DuplicateNodenameWarning
- from celery.utils.text import pluralize
- __all__ = ['Inspect', 'Control', 'flatten_reply']
- W_DUPNODE = """\
- Received multiple replies from node {0}: {1}.
- Please make sure you give each node a unique nodename using
- the celery worker `-n` option.\
- """
- def flatten_reply(reply):
- """Flatten node replies.
- Convert from a list of replies in this format::
- [{'a@example.com': reply},
- {'b@example.com': reply}]
- into this format::
- {'a@example.com': reply,
- 'b@example.com': reply}
- """
- nodes, dupes = {}, set()
- for item in reply:
- [dupes.add(name) for name in item if name in nodes]
- nodes.update(item)
- if dupes:
- warnings.warn(DuplicateNodenameWarning(
- W_DUPNODE.format(
- pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
- ),
- ))
- return nodes
- class Inspect(object):
- """API for app.control.inspect."""
- app = None
- def __init__(self, destination=None, timeout=1, callback=None,
- connection=None, app=None, limit=None):
- self.app = app or self.app
- self.destination = destination
- self.timeout = timeout
- self.callback = callback
- self.connection = connection
- self.limit = limit
- def _prepare(self, reply):
- if reply:
- by_node = flatten_reply(reply)
- if (self.destination and
- not isinstance(self.destination, (list, tuple))):
- return by_node.get(self.destination)
- return by_node
- def _request(self, command, **kwargs):
- return self._prepare(self.app.control.broadcast(
- command,
- arguments=kwargs,
- destination=self.destination,
- callback=self.callback,
- connection=self.connection,
- limit=self.limit,
- timeout=self.timeout, reply=True,
- ))
- def report(self):
- return self._request('report')
- def clock(self):
- return self._request('clock')
- def active(self, safe=None):
- # safe is ignored since 4.0
- # as no objects will need serialization now that we
- # have argsrepr/kwargsrepr.
- return self._request('active')
- def scheduled(self, safe=None):
- return self._request('scheduled')
- def reserved(self, safe=None):
- return self._request('reserved')
- def stats(self):
- return self._request('stats')
- def revoked(self):
- return self._request('revoked')
- def registered(self, *taskinfoitems):
- return self._request('registered', taskinfoitems=taskinfoitems)
- registered_tasks = registered
- def ping(self):
- return self._request('ping')
- def active_queues(self):
- return self._request('active_queues')
- def query_task(self, ids):
- return self._request('query_task', ids=ids)
- def conf(self, with_defaults=False):
- return self._request('conf', with_defaults=with_defaults)
- def hello(self, from_node, revoked=None):
- return self._request('hello', from_node=from_node, revoked=revoked)
- def memsample(self):
- return self._request('memsample')
- def memdump(self, samples=10):
- return self._request('memdump', samples=samples)
- def objgraph(self, type='Request', n=200, max_depth=10):
- return self._request('objgraph', num=n, max_depth=max_depth, type=type)
- class Control(object):
- """Worker remote control client."""
- Mailbox = Mailbox
- def __init__(self, app=None):
- self.app = app
- self.mailbox = self.Mailbox(
- 'celery',
- type='fanout',
- accept=['json'],
- producer_pool=lazy(lambda: self.app.amqp.producer_pool),
- )
- @cached_property
- def inspect(self):
- return self.app.subclass_with_self(Inspect, reverse='control.inspect')
- def purge(self, connection=None):
- """Discard all waiting tasks.
- This will ignore all tasks waiting for execution, and they will
- be deleted from the messaging server.
- Arguments:
- connection (kombu.Connection): Optional specific connection
- instance to use. If not provided a connection will
- be acquired from the connection pool.
- Returns:
- int: the number of tasks discarded.
- """
- with self.app.connection_or_acquire(connection) as conn:
- return self.app.amqp.TaskConsumer(conn).purge()
- discard_all = purge
- def election(self, id, topic, action=None, connection=None):
- self.broadcast('election', connection=connection, arguments={
- 'id': id, 'topic': topic, 'action': action,
- })
- def revoke(self, task_id, destination=None, terminate=False,
- signal=TERM_SIGNAME, **kwargs):
- """Tell all (or specific) workers to revoke a task by id.
- If a task is revoked, the workers will ignore the task and
- not execute it after all.
- Arguments:
- task_id (str): Id of the task to revoke.
- terminate (bool): Also terminate the process currently working
- on the task (if any).
- signal (str): Name of signal to send to process if terminate.
- Default is TERM.
- See Also:
- :meth:`broadcast` for supported keyword arguments.
- """
- return self.broadcast('revoke', destination=destination,
- arguments={'task_id': task_id,
- 'terminate': terminate,
- 'signal': signal}, **kwargs)
- def ping(self, destination=None, timeout=1, **kwargs):
- """Ping all (or specific) workers.
- Returns:
- List[Dict]: List of ``{'hostname': reply}`` dictionaries.
- See Also:
- :meth:`broadcast` for supported keyword arguments.
- """
- return self.broadcast('ping', reply=True, destination=destination,
- timeout=timeout, **kwargs)
- def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
- """Tell workers to set a new rate limit for task by type.
- Arguments:
- task_name (str): Name of task to change rate limit for.
- rate_limit (int, str): The rate limit as tasks per second,
- or a rate limit string (`'100/m'`, etc.
- see :attr:`celery.task.base.Task.rate_limit` for
- more information).
- See Also:
- :meth:`broadcast` for supported keyword arguments.
- """
- return self.broadcast('rate_limit', destination=destination,
- arguments={'task_name': task_name,
- 'rate_limit': rate_limit},
- **kwargs)
- def add_consumer(self, queue, exchange=None, exchange_type='direct',
- routing_key=None, options=None, **kwargs):
- """Tell all (or specific) workers to start consuming from a new queue.
- Only the queue name is required as if only the queue is specified
- then the exchange/routing key will be set to the same name (
- like automatic queues do).
- Note:
- This command does not respect the default queue/exchange
- options in the configuration.
- Arguments:
- queue (str): Name of queue to start consuming from.
- exchange (str): Optional name of exchange.
- exchange_type (str): Type of exchange (defaults to 'direct')
- command to, when empty broadcast to all workers.
- routing_key (str): Optional routing key.
- options (Dict): Additional options as supported
- by :meth:`kombu.entitiy.Queue.from_dict`.
- See Also:
- :meth:`broadcast` for supported keyword arguments.
- """
- return self.broadcast(
- 'add_consumer',
- arguments=dict({'queue': queue, 'exchange': exchange,
- 'exchange_type': exchange_type,
- 'routing_key': routing_key}, **options or {}),
- **kwargs
- )
- def cancel_consumer(self, queue, **kwargs):
- """Tell all (or specific) workers to stop consuming from ``queue``.
- See Also:
- Supports the same arguments as :meth:`broadcast`.
- """
- return self.broadcast(
- 'cancel_consumer', arguments={'queue': queue}, **kwargs
- )
- def time_limit(self, task_name, soft=None, hard=None, **kwargs):
- """Tell workers to set time limits for a task by type.
- Arguments:
- task_name (str): Name of task to change time limits for.
- soft (float): New soft time limit (in seconds).
- hard (float): New hard time limit (in seconds).
- **kwargs (Any): arguments passed on to :meth:`broadcast`.
- """
- return self.broadcast(
- 'time_limit',
- arguments={'task_name': task_name,
- 'hard': hard, 'soft': soft}, **kwargs)
- def enable_events(self, destination=None, **kwargs):
- """Tell all (or specific) workers to enable events.
- See Also:
- Supports the same arguments as :meth:`broadcast`.
- """
- return self.broadcast('enable_events', {}, destination, **kwargs)
- def disable_events(self, destination=None, **kwargs):
- """Tell all (or specific) workers to disable events.
- See Also:
- Supports the same arguments as :meth:`broadcast`.
- """
- return self.broadcast('disable_events', {}, destination, **kwargs)
- def pool_grow(self, n=1, destination=None, **kwargs):
- """Tell all (or specific) workers to grow the pool by ``n``.
- See Also:
- Supports the same arguments as :meth:`broadcast`.
- """
- return self.broadcast('pool_grow', {'n': n}, destination, **kwargs)
- def pool_shrink(self, n=1, destination=None, **kwargs):
- """Tell all (or specific) workers to shrink the pool by ``n``.
- See Also:
- Supports the same arguments as :meth:`broadcast`.
- """
- return self.broadcast('pool_shrink', {'n': n}, destination, **kwargs)
- def broadcast(self, command, arguments=None, destination=None,
- connection=None, reply=False, timeout=1, limit=None,
- callback=None, channel=None, **extra_kwargs):
- """Broadcast a control command to the celery workers.
- Arguments:
- command (str): Name of command to send.
- arguments (Dict): Keyword arguments for the command.
- destination (List): If set, a list of the hosts to send the
- command to, when empty broadcast to all workers.
- connection (kombu.Connection): Custom broker connection to use,
- if not set, a connection will be acquired from the pool.
- reply (bool): Wait for and return the reply.
- timeout (float): Timeout in seconds to wait for the reply.
- limit (int): Limit number of replies.
- callback (Callable): Callback called immediately for
- each reply received.
- """
- with self.app.connection_or_acquire(connection) as conn:
- arguments = dict(arguments or {}, **extra_kwargs)
- return self.mailbox(conn)._broadcast(
- command, arguments, destination, reply, timeout,
- limit, callback, channel=channel,
- )
|