|  | @@ -11,18 +11,25 @@ from __future__ import with_statement
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import socket
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -from functools import partial
 | 
	
		
			
				|  |  | +from functools import partial, wraps
 | 
	
		
			
				|  |  |  from itertools import cycle, islice
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from kombu import eventloop, Queue
 | 
	
		
			
				|  |  | +from kombu.common import maybe_declare
 | 
	
		
			
				|  |  |  from kombu.exceptions import StdChannelError
 | 
	
		
			
				|  |  |  from kombu.utils.encoding import ensure_bytes
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from celery.app import app_or_default
 | 
	
		
			
				|  |  | +from celery.utils import worker_direct
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class StopFiltering(Exception):
 | 
	
		
			
				|  |  | +    pass
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class State(object):
 | 
	
		
			
				|  |  |      count = 0
 | 
	
		
			
				|  |  | +    filtered = 0
 | 
	
		
			
				|  |  |      total_apx = 0
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      @property
 | 
	
	
		
			
				|  | @@ -32,6 +39,8 @@ class State(object):
 | 
	
		
			
				|  |  |          return unicode(self.total_apx)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __repr__(self):
 | 
	
		
			
				|  |  | +        if self.filtered:
 | 
	
		
			
				|  |  | +            return '^%s' % self.filtered
 | 
	
		
			
				|  |  |          return '%s/%s' % (self.count, self.strtotal)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -98,13 +107,43 @@ def migrate_tasks(source, dest, migrate=migrate_task, app=None,
 | 
	
		
			
				|  |  |                          on_declare_queue=on_declare_queue, **kwargs)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def move(predicate, conn, exchange=None, routing_key=None, app=None, **kwargs):
 | 
	
		
			
				|  |  | -    """Find tasks by filtering them and move the tasks to a new queue.
 | 
	
		
			
				|  |  | +def _maybe_queue(app, q):
 | 
	
		
			
				|  |  | +    if isinstance(q, basestring):
 | 
	
		
			
				|  |  | +        return app.amqp.queues[q]
 | 
	
		
			
				|  |  | +    return q
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    :param predicate: Filter function with signature ``(body, message)``.
 | 
	
		
			
				|  |  | -    :param conn: Connection to use.
 | 
	
		
			
				|  |  | +def move(predicate, connection=None, exchange=None, routing_key=None,
 | 
	
		
			
				|  |  | +        source=None, app=None, callback=None, limit=None, transform=None,
 | 
	
		
			
				|  |  | +        **kwargs):
 | 
	
		
			
				|  |  | +    """Find tasks by filtering them and move the tasks to a new queue.
 | 
	
		
			
				|  |  | +    :param predicate: Predicate function used to filter messages to move.
 | 
	
		
			
				|  |  | +        Must accept the standard signature of ``(body, message)`` used
 | 
	
		
			
				|  |  | +        by Kombu consumer callbacks.  If the predicate wants the message
 | 
	
		
			
				|  |  | +        to be moved it should return the hostname of the worker to move it
 | 
	
		
			
				|  |  | +        to, otherwise it should return :const:`False`
 | 
	
		
			
				|  |  | +    :keyword queues: A list of queues to consume from, if not specified
 | 
	
		
			
				|  |  | +        it will consume from all configured task queues in ``CELERY_QUEUES``.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    :param predicate: Filter function used to decide which messages
 | 
	
		
			
				|  |  | +        to move.  Must accept the standard signature of ``(body, message)``
 | 
	
		
			
				|  |  | +        used by Kombu consumer callbacks. If the predicate wants the message
 | 
	
		
			
				|  |  | +        to be moved it must return either
 | 
	
		
			
				|  |  | +            1) a tuple of ``(exchange, routing_key)``, or
 | 
	
		
			
				|  |  | +            2) a :class:`~kombu.entity.Queue` instance, or
 | 
	
		
			
				|  |  | +            3) any other true value which means the specified
 | 
	
		
			
				|  |  | +               ``exchange`` and ``routing_key`` arguments will be used.
 | 
	
		
			
				|  |  | +    :keyword connection: Custom connection to use.
 | 
	
		
			
				|  |  | +    :keyword source: Optional list of source queues to use instead of the
 | 
	
		
			
				|  |  | +        default (which is the queues in :setting:`CELERY_QUEUES`).
 | 
	
		
			
				|  |  | +        This list can also contain new :class:`~kombu.entity.Queue` instances.
 | 
	
		
			
				|  |  |      :keyword exchange: Default destination exchange.
 | 
	
		
			
				|  |  |      :keyword routing_key: Default destination routing key.
 | 
	
		
			
				|  |  | +    :keyword limit: Limit number of messages to filter.
 | 
	
		
			
				|  |  | +    :keyword callback: Callback called after message moved,
 | 
	
		
			
				|  |  | +        with signature ``(state, body, message)``.
 | 
	
		
			
				|  |  | +    :keyword transform: Optional function to transform the return
 | 
	
		
			
				|  |  | +        value (destination) of the filter function.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      Also supports the same keyword arguments as :func:`start_filter`.
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -115,10 +154,19 @@ def move(predicate, conn, exchange=None, routing_key=None, app=None, **kwargs):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          def is_wanted_task(body, message):
 | 
	
		
			
				|  |  |              if body['id'] == wanted_id:
 | 
	
		
			
				|  |  | -                return True
 | 
	
		
			
				|  |  | +                return Queue('foo', exchange=Exchange('foo'),
 | 
	
		
			
				|  |  | +                             routing_key='foo')
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        move(is_wanted_task)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    or with a transform:
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        move(is_wanted_task, conn, exchange, routing_key)
 | 
	
		
			
				|  |  | +        def transform(value):
 | 
	
		
			
				|  |  | +            if isinstance(value, basestring):
 | 
	
		
			
				|  |  | +                return Queue(value, Exchange(value), value)
 | 
	
		
			
				|  |  | +            return value
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        move(is_wanted_task, transform=transform)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      The predicate may also return a tuple of ``(exchange, routing_key)``
 | 
	
		
			
				|  |  |      to specify the destination to where the task should be moved,
 | 
	
	
		
			
				|  | @@ -128,20 +176,32 @@ def move(predicate, conn, exchange=None, routing_key=None, app=None, **kwargs):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      """
 | 
	
		
			
				|  |  |      app = app_or_default(app)
 | 
	
		
			
				|  |  | -    producer = app.amqp.TaskProducer(conn)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    def on_task(body, message):
 | 
	
		
			
				|  |  | -        ret = predicate(body, message)
 | 
	
		
			
				|  |  | -        if ret:
 | 
	
		
			
				|  |  | -            if isinstance(ret, Queue):
 | 
	
		
			
				|  |  | -                ex, rk = ret.exchange.name, ret.routing_key
 | 
	
		
			
				|  |  | -            else:
 | 
	
		
			
				|  |  | -                ex, rk = expand_dest(ret, exchange, routing_key)
 | 
	
		
			
				|  |  | -            republish(producer, message,
 | 
	
		
			
				|  |  | -                      exchange=ex, routing_key=rk)
 | 
	
		
			
				|  |  | -            message.ack()
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    return start_filter(app, conn, on_task, **kwargs)
 | 
	
		
			
				|  |  | +    queues = [_maybe_queue(app, queue) for queue in source or []] or None
 | 
	
		
			
				|  |  | +    with app.default_connection(connection, pool=False) as conn:
 | 
	
		
			
				|  |  | +        producer = app.amqp.TaskProducer(conn)
 | 
	
		
			
				|  |  | +        state = State()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        def on_task(body, message):
 | 
	
		
			
				|  |  | +            ret = predicate(body, message)
 | 
	
		
			
				|  |  | +            if ret:
 | 
	
		
			
				|  |  | +                if transform:
 | 
	
		
			
				|  |  | +                    ret = transform(ret)
 | 
	
		
			
				|  |  | +                if isinstance(ret, Queue):
 | 
	
		
			
				|  |  | +                    maybe_declare(ret, conn.default_channel)
 | 
	
		
			
				|  |  | +                    ex, rk = ret.exchange.name, ret.routing_key
 | 
	
		
			
				|  |  | +                else:
 | 
	
		
			
				|  |  | +                    ex, rk = expand_dest(ret, exchange, routing_key)
 | 
	
		
			
				|  |  | +                republish(producer, message,
 | 
	
		
			
				|  |  | +                        exchange=ex, routing_key=rk)
 | 
	
		
			
				|  |  | +                message.ack()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                state.filtered += 1
 | 
	
		
			
				|  |  | +                if callback:
 | 
	
		
			
				|  |  | +                    callback(state, body, message)
 | 
	
		
			
				|  |  | +                if limit and state.filtered >= limit:
 | 
	
		
			
				|  |  | +                    raise StopFiltering()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        return start_filter(app, conn, on_task, consume_from=queues, **kwargs)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def expand_dest(ret, exchange, routing_key):
 | 
	
	
		
			
				|  | @@ -152,11 +212,6 @@ def expand_dest(ret, exchange, routing_key):
 | 
	
		
			
				|  |  |      return ex, rk
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -# XXX Deprecated (arguments rearranged)
 | 
	
		
			
				|  |  | -move_tasks = lambda conn, pred, *a, **kw: move(pred, conn, *a, **kw)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  def task_id_eq(task_id, body, message):
 | 
	
		
			
				|  |  |      return body['id'] == task_id
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -165,21 +220,6 @@ def task_id_in(ids, body, message):
 | 
	
		
			
				|  |  |      return body['id'] in ids
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -def move_task_by_id(conn, task_id, exchange, routing_key, **kwargs):
 | 
	
		
			
				|  |  | -    """Find a task by id and move it to another queue.
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    :param conn: Connection to use.
 | 
	
		
			
				|  |  | -    :param task_id: Id of task to move.
 | 
	
		
			
				|  |  | -    :param exchange: Destination exchange.
 | 
	
		
			
				|  |  | -    :param exchange: Destination routing key.
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    Also supports the same keyword arguments as :func:`start_filter`.
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    """
 | 
	
		
			
				|  |  | -    return move(conn, partial(task_id_eq, task_id),
 | 
	
		
			
				|  |  | -                exchange, routing_key, **kwargs)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  def prepare_queues(queues):
 | 
	
		
			
				|  |  |      if isinstance(queues, basestring):
 | 
	
		
			
				|  |  |          queues = queues.split(',')
 | 
	
	
		
			
				|  | @@ -192,10 +232,10 @@ def prepare_queues(queues):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def start_filter(app, conn, filter, limit=None, timeout=1.0,
 | 
	
		
			
				|  |  | -        ack_messages=False, migrate=migrate_task, tasks=None, queues=None,
 | 
	
		
			
				|  |  | +        ack_messages=False, tasks=None, queues=None,
 | 
	
		
			
				|  |  |          callback=None, forever=False, on_declare_queue=None,
 | 
	
		
			
				|  |  | -        consume_from=None, **kwargs):
 | 
	
		
			
				|  |  | -    state = State()
 | 
	
		
			
				|  |  | +        consume_from=None, state=None, **kwargs):
 | 
	
		
			
				|  |  | +    state = state or State()
 | 
	
		
			
				|  |  |      queues = prepare_queues(queues)
 | 
	
		
			
				|  |  |      if isinstance(tasks, basestring):
 | 
	
		
			
				|  |  |          tasks = set(tasks.split(','))
 | 
	
	
		
			
				|  | @@ -204,6 +244,8 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def update_state(body, message):
 | 
	
		
			
				|  |  |          state.count += 1
 | 
	
		
			
				|  |  | +        if limit and state.count >= limit:
 | 
	
		
			
				|  |  | +            raise StopFiltering()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def ack_message(body, message):
 | 
	
		
			
				|  |  |          message.ack()
 | 
	
	
		
			
				|  | @@ -241,9 +283,74 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
 | 
	
		
			
				|  |  |      # start migrating messages.
 | 
	
		
			
				|  |  |      with consumer:
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  | -            for _ in eventloop(conn, limit=limit,  # pragma: no cover
 | 
	
		
			
				|  |  | +            for _ in eventloop(conn,  # pragma: no cover
 | 
	
		
			
				|  |  |                                 timeout=timeout, ignore_timeouts=forever):
 | 
	
		
			
				|  |  |                  pass
 | 
	
		
			
				|  |  |          except socket.timeout:
 | 
	
		
			
				|  |  |              pass
 | 
	
		
			
				|  |  | +        except StopFiltering:
 | 
	
		
			
				|  |  | +            pass
 | 
	
		
			
				|  |  |      return state
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def move_task_by_id(task_id, dest, **kwargs):
 | 
	
		
			
				|  |  | +    """Find a task by id and move it to another queue.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    :param task_id: Id of task to move.
 | 
	
		
			
				|  |  | +    :param dest: Destination queue.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Also supports the same keyword arguments as :func:`move`.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    return move_by_idmap({task_id: dest}, **kwargs)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def move_by_idmap(map, **kwargs):
 | 
	
		
			
				|  |  | +    """Moves tasks by matching from a ``task_id: queue`` mapping,
 | 
	
		
			
				|  |  | +    where ``queue`` is a queue to move the task to.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Example::
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        >>> reroute_idmap({
 | 
	
		
			
				|  |  | +        ...     '5bee6e82-f4ac-468e-bd3d-13e8600250bc': Queue(...),
 | 
	
		
			
				|  |  | +        ...     'ada8652d-aef3-466b-abd2-becdaf1b82b3': Queue(...),
 | 
	
		
			
				|  |  | +        ...     '3a2b140d-7db1-41ba-ac90-c36a0ef4ab1f': Queue(...)},
 | 
	
		
			
				|  |  | +        ...   queues=['hipri'])
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    def task_id_in_map(body, message):
 | 
	
		
			
				|  |  | +        return map.get(body['id'])
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    # adding the limit means that we don't have to consume any more
 | 
	
		
			
				|  |  | +    # when we've found everything.
 | 
	
		
			
				|  |  | +    return move(task_id_in_map, limit=len(map), **kwargs)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def move_by_taskmap(map, **kwargs):
 | 
	
		
			
				|  |  | +    """Moves tasks by matching from a ``task_name: queue`` mapping,
 | 
	
		
			
				|  |  | +    where ``queue`` is the queue to move the task to.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Example::
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        >>> reroute_idmap({
 | 
	
		
			
				|  |  | +        ...     'tasks.add': Queue(...),
 | 
	
		
			
				|  |  | +        ...     'tasks.mul': Queue(...),
 | 
	
		
			
				|  |  | +        ... })
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def task_name_in_map(body, message):
 | 
	
		
			
				|  |  | +        return map.get(body['task'])  # <- name of task
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    return move(task_name_in_map, **kwargs)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +move_direct = partial(move, transform=worker_direct)
 | 
	
		
			
				|  |  | +move_direct_by_id = partial(move_task_by_id, transform=worker_direct)
 | 
	
		
			
				|  |  | +move_direct_by_idmap = partial(move_by_idmap, transform=worker_direct)
 | 
	
		
			
				|  |  | +move_direct_by_taskmap = partial(move_by_taskmap, transform=worker_direct)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def filter_status(state, body, message):
 | 
	
		
			
				|  |  | +    print('Moving task %s/%s: %s[%s]' % (
 | 
	
		
			
				|  |  | +            state.filtered, state.strtotal, body['task'], body['id']))
 |