|
@@ -99,6 +99,27 @@ def migrate_tasks(source, dest, migrate=migrate_task, app=None,
|
|
|
|
|
|
|
|
|
def move_tasks(conn, predicate, exchange, routing_key, app=None, **kwargs):
|
|
|
+ """Find tasks by filtering them and move the tasks to a new queue.
|
|
|
+
|
|
|
+ :param conn: Connection to use.
|
|
|
+ :param predicate: Filter function with signature ``(body, message)``.
|
|
|
+ :param exchange: Destination exchange.
|
|
|
+ :param routing_key: Destination routing key.
|
|
|
+
|
|
|
+ Also supports the same keyword arguments as :func:`start_filter`.
|
|
|
+
|
|
|
+ To demonstrate, the :func:`move_task_by_id` operation can be implemented
|
|
|
+ like this:
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
+
|
|
|
+ def is_wanted_task(body, message):
|
|
|
+ if body['id'] == wanted_id:
|
|
|
+ return True
|
|
|
+
|
|
|
+ move_tasks(conn, is_wanted_task, exchange, routing_key)
|
|
|
+
|
|
|
+ """
|
|
|
app = app_or_default(app)
|
|
|
producer = app.amqp.TaskProducer(conn)
|
|
|
|
|
@@ -112,7 +133,16 @@ def move_tasks(conn, predicate, exchange, routing_key, app=None, **kwargs):
|
|
|
|
|
|
|
|
|
def move_task_by_id(conn, task_id, exchange, routing_key, **kwargs):
|
|
|
+ """Find a task by id and move it to another queue.
|
|
|
+
|
|
|
+ :param conn: Connection to use.
|
|
|
+ :param task_id: Id of task to move.
|
|
|
+ :param exchange: Destination exchange.
|
|
|
+ :param exchange: Destination routing key.
|
|
|
+
|
|
|
+ Also supports the same keyword arguments as :func:`start_filter`.
|
|
|
|
|
|
+ """
|
|
|
def predicate(body, message):
|
|
|
if body['id'] == task_id:
|
|
|
return True
|
|
@@ -128,6 +158,7 @@ def prepare_queues(queues):
|
|
|
for q in queues)
|
|
|
if queues is None:
|
|
|
queues = {}
|
|
|
+ return queues
|
|
|
|
|
|
|
|
|
def start_filter(app, conn, filter, limit=None, timeout=1.0,
|