|
@@ -11,18 +11,25 @@ from __future__ import with_statement
|
|
|
|
|
|
import socket
|
|
|
|
|
|
-from functools import partial
|
|
|
+from functools import partial, wraps
|
|
|
from itertools import cycle, islice
|
|
|
|
|
|
from kombu import eventloop, Queue
|
|
|
+from kombu.common import maybe_declare
|
|
|
from kombu.exceptions import StdChannelError
|
|
|
from kombu.utils.encoding import ensure_bytes
|
|
|
|
|
|
from celery.app import app_or_default
|
|
|
+from celery.utils import worker_direct
|
|
|
+
|
|
|
+
|
|
|
+class StopFiltering(Exception):
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
class State(object):
|
|
|
count = 0
|
|
|
+ filtered = 0
|
|
|
total_apx = 0
|
|
|
|
|
|
@property
|
|
@@ -32,6 +39,8 @@ class State(object):
|
|
|
return unicode(self.total_apx)
|
|
|
|
|
|
def __repr__(self):
|
|
|
+ if self.filtered:
|
|
|
+ return '^%s' % self.filtered
|
|
|
return '%s/%s' % (self.count, self.strtotal)
|
|
|
|
|
|
|
|
@@ -98,13 +107,43 @@ def migrate_tasks(source, dest, migrate=migrate_task, app=None,
|
|
|
on_declare_queue=on_declare_queue, **kwargs)
|
|
|
|
|
|
|
|
|
-def move(predicate, conn, exchange=None, routing_key=None, app=None, **kwargs):
|
|
|
- """Find tasks by filtering them and move the tasks to a new queue.
|
|
|
+def _maybe_queue(app, q):
|
|
|
+ if isinstance(q, basestring):
|
|
|
+ return app.amqp.queues[q]
|
|
|
+ return q
|
|
|
+
|
|
|
|
|
|
- :param predicate: Filter function with signature ``(body, message)``.
|
|
|
- :param conn: Connection to use.
|
|
|
+def move(predicate, connection=None, exchange=None, routing_key=None,
|
|
|
+ source=None, app=None, callback=None, limit=None, transform=None,
|
|
|
+ **kwargs):
|
|
|
+ """Find tasks by filtering them and move the tasks to a new queue.
|
|
|
+ :param predicate: Predicate function used to filter messages to move.
|
|
|
+ Must accept the standard signature of ``(body, message)`` used
|
|
|
+ by Kombu consumer callbacks. If the predicate wants the message
|
|
|
+ to be moved it should return the hostname of the worker to move it
|
|
|
+ to, otherwise it should return :const:`False`
|
|
|
+ :keyword queues: A list of queues to consume from, if not specified
|
|
|
+ it will consume from all configured task queues in ``CELERY_QUEUES``.
|
|
|
+
|
|
|
+ :param predicate: Filter function used to decide which messages
|
|
|
+ to move. Must accept the standard signature of ``(body, message)``
|
|
|
+ used by Kombu consumer callbacks. If the predicate wants the message
|
|
|
+ to be moved it must return either
|
|
|
+ 1) a tuple of ``(exchange, routing_key)``, or
|
|
|
+ 2) a :class:`~kombu.entity.Queue` instance, or
|
|
|
+ 3) any other true value which means the specified
|
|
|
+ ``exchange`` and ``routing_key`` arguments will be used.
|
|
|
+ :keyword connection: Custom connection to use.
|
|
|
+ :keyword source: Optional list of source queues to use instead of the
|
|
|
+ default (which is the queues in :setting:`CELERY_QUEUES`).
|
|
|
+ This list can also contain new :class:`~kombu.entity.Queue` instances.
|
|
|
:keyword exchange: Default destination exchange.
|
|
|
:keyword routing_key: Default destination routing key.
|
|
|
+ :keyword limit: Limit number of messages to filter.
|
|
|
+ :keyword callback: Callback called after message moved,
|
|
|
+ with signature ``(state, body, message)``.
|
|
|
+ :keyword transform: Optional function to transform the return
|
|
|
+ value (destination) of the filter function.
|
|
|
|
|
|
Also supports the same keyword arguments as :func:`start_filter`.
|
|
|
|
|
@@ -115,10 +154,19 @@ def move(predicate, conn, exchange=None, routing_key=None, app=None, **kwargs):
|
|
|
|
|
|
def is_wanted_task(body, message):
|
|
|
if body['id'] == wanted_id:
|
|
|
- return True
|
|
|
+ return Queue('foo', exchange=Exchange('foo'),
|
|
|
+ routing_key='foo')
|
|
|
+
|
|
|
+ move(is_wanted_task)
|
|
|
+
|
|
|
+ or with a transform:
|
|
|
|
|
|
- move(is_wanted_task, conn, exchange, routing_key)
|
|
|
+ def transform(value):
|
|
|
+ if isinstance(value, basestring):
|
|
|
+ return Queue(value, Exchange(value), value)
|
|
|
+ return value
|
|
|
|
|
|
+ move(is_wanted_task, transform=transform)
|
|
|
|
|
|
The predicate may also return a tuple of ``(exchange, routing_key)``
|
|
|
to specify the destination to where the task should be moved,
|
|
@@ -128,20 +176,32 @@ def move(predicate, conn, exchange=None, routing_key=None, app=None, **kwargs):
|
|
|
|
|
|
"""
|
|
|
app = app_or_default(app)
|
|
|
- producer = app.amqp.TaskProducer(conn)
|
|
|
-
|
|
|
- def on_task(body, message):
|
|
|
- ret = predicate(body, message)
|
|
|
- if ret:
|
|
|
- if isinstance(ret, Queue):
|
|
|
- ex, rk = ret.exchange.name, ret.routing_key
|
|
|
- else:
|
|
|
- ex, rk = expand_dest(ret, exchange, routing_key)
|
|
|
- republish(producer, message,
|
|
|
- exchange=ex, routing_key=rk)
|
|
|
- message.ack()
|
|
|
-
|
|
|
- return start_filter(app, conn, on_task, **kwargs)
|
|
|
+ queues = [_maybe_queue(app, queue) for queue in source or []] or None
|
|
|
+ with app.default_connection(connection, pool=False) as conn:
|
|
|
+ producer = app.amqp.TaskProducer(conn)
|
|
|
+ state = State()
|
|
|
+
|
|
|
+ def on_task(body, message):
|
|
|
+ ret = predicate(body, message)
|
|
|
+ if ret:
|
|
|
+ if transform:
|
|
|
+ ret = transform(ret)
|
|
|
+ if isinstance(ret, Queue):
|
|
|
+ maybe_declare(ret, conn.default_channel)
|
|
|
+ ex, rk = ret.exchange.name, ret.routing_key
|
|
|
+ else:
|
|
|
+ ex, rk = expand_dest(ret, exchange, routing_key)
|
|
|
+ republish(producer, message,
|
|
|
+ exchange=ex, routing_key=rk)
|
|
|
+ message.ack()
|
|
|
+
|
|
|
+ state.filtered += 1
|
|
|
+ if callback:
|
|
|
+ callback(state, body, message)
|
|
|
+ if limit and state.filtered >= limit:
|
|
|
+ raise StopFiltering()
|
|
|
+
|
|
|
+ return start_filter(app, conn, on_task, consume_from=queues, **kwargs)
|
|
|
|
|
|
|
|
|
def expand_dest(ret, exchange, routing_key):
|
|
@@ -152,11 +212,6 @@ def expand_dest(ret, exchange, routing_key):
|
|
|
return ex, rk
|
|
|
|
|
|
|
|
|
-
|
|
|
-# XXX Deprecated (arguments rearranged)
|
|
|
-move_tasks = lambda conn, pred, *a, **kw: move(pred, conn, *a, **kw)
|
|
|
-
|
|
|
-
|
|
|
def task_id_eq(task_id, body, message):
|
|
|
return body['id'] == task_id
|
|
|
|
|
@@ -165,21 +220,6 @@ def task_id_in(ids, body, message):
|
|
|
return body['id'] in ids
|
|
|
|
|
|
|
|
|
-def move_task_by_id(conn, task_id, exchange, routing_key, **kwargs):
|
|
|
- """Find a task by id and move it to another queue.
|
|
|
-
|
|
|
- :param conn: Connection to use.
|
|
|
- :param task_id: Id of task to move.
|
|
|
- :param exchange: Destination exchange.
|
|
|
- :param exchange: Destination routing key.
|
|
|
-
|
|
|
- Also supports the same keyword arguments as :func:`start_filter`.
|
|
|
-
|
|
|
- """
|
|
|
- return move(conn, partial(task_id_eq, task_id),
|
|
|
- exchange, routing_key, **kwargs)
|
|
|
-
|
|
|
-
|
|
|
def prepare_queues(queues):
|
|
|
if isinstance(queues, basestring):
|
|
|
queues = queues.split(',')
|
|
@@ -192,10 +232,10 @@ def prepare_queues(queues):
|
|
|
|
|
|
|
|
|
def start_filter(app, conn, filter, limit=None, timeout=1.0,
|
|
|
- ack_messages=False, migrate=migrate_task, tasks=None, queues=None,
|
|
|
+ ack_messages=False, tasks=None, queues=None,
|
|
|
callback=None, forever=False, on_declare_queue=None,
|
|
|
- consume_from=None, **kwargs):
|
|
|
- state = State()
|
|
|
+ consume_from=None, state=None, **kwargs):
|
|
|
+ state = state or State()
|
|
|
queues = prepare_queues(queues)
|
|
|
if isinstance(tasks, basestring):
|
|
|
tasks = set(tasks.split(','))
|
|
@@ -204,6 +244,8 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
|
|
|
|
|
|
def update_state(body, message):
|
|
|
state.count += 1
|
|
|
+ if limit and state.count >= limit:
|
|
|
+ raise StopFiltering()
|
|
|
|
|
|
def ack_message(body, message):
|
|
|
message.ack()
|
|
@@ -241,9 +283,74 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
|
|
|
# start migrating messages.
|
|
|
with consumer:
|
|
|
try:
|
|
|
- for _ in eventloop(conn, limit=limit, # pragma: no cover
|
|
|
+ for _ in eventloop(conn, # pragma: no cover
|
|
|
timeout=timeout, ignore_timeouts=forever):
|
|
|
pass
|
|
|
except socket.timeout:
|
|
|
pass
|
|
|
+ except StopFiltering:
|
|
|
+ pass
|
|
|
return state
|
|
|
+
|
|
|
+
|
|
|
+def move_task_by_id(task_id, dest, **kwargs):
|
|
|
+ """Find a task by id and move it to another queue.
|
|
|
+
|
|
|
+ :param task_id: Id of task to move.
|
|
|
+ :param dest: Destination queue.
|
|
|
+
|
|
|
+ Also supports the same keyword arguments as :func:`move`.
|
|
|
+
|
|
|
+ """
|
|
|
+ return move_by_idmap({task_id: dest}, **kwargs)
|
|
|
+
|
|
|
+
|
|
|
+def move_by_idmap(map, **kwargs):
|
|
|
+ """Moves tasks by matching from a ``task_id: queue`` mapping,
|
|
|
+ where ``queue`` is a queue to move the task to.
|
|
|
+
|
|
|
+ Example::
|
|
|
+
|
|
|
+ >>> reroute_idmap({
|
|
|
+ ... '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue(...),
|
|
|
+ ... 'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue(...),
|
|
|
+ ... '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue(...)},
|
|
|
+ ... queues=['hipri'])
|
|
|
+
|
|
|
+ """
|
|
|
+ def task_id_in_map(body, message):
|
|
|
+ return map.get(body['id'])
|
|
|
+
|
|
|
+ # adding the limit means that we don't have to consume any more
|
|
|
+ # when we've found everything.
|
|
|
+ return move(task_id_in_map, limit=len(map), **kwargs)
|
|
|
+
|
|
|
+
|
|
|
+def move_by_taskmap(map, **kwargs):
|
|
|
+ """Moves tasks by matching from a ``task_name: queue`` mapping,
|
|
|
+ where ``queue`` is the queue to move the task to.
|
|
|
+
|
|
|
+ Example::
|
|
|
+
|
|
|
+ >>> reroute_idmap({
|
|
|
+ ... 'tasks.add': Queue(...),
|
|
|
+ ... 'tasks.mul': Queue(...),
|
|
|
+ ... })
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ def task_name_in_map(body, message):
|
|
|
+ return map.get(body['task']) # <- name of task
|
|
|
+
|
|
|
+ return move(task_name_in_map, **kwargs)
|
|
|
+
|
|
|
+
|
|
|
+move_direct = partial(move, transform=worker_direct)
|
|
|
+move_direct_by_id = partial(move_task_by_id, transform=worker_direct)
|
|
|
+move_direct_by_idmap = partial(move_by_idmap, transform=worker_direct)
|
|
|
+move_direct_by_taskmap = partial(move_by_taskmap, transform=worker_direct)
|
|
|
+
|
|
|
+
|
|
|
+def filter_status(state, body, message):
|
|
|
+ print('Moving task %s/%s: %s[%s]' % (
|
|
|
+ state.filtered, state.strtotal, body['task'], body['id']))
|