| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 | # -*- coding: utf-8 -*-"""    celery.task.sets    ~~~~~~~~~~~~~~~~    Old ``group`` implementation, this module should    not be used anymore use :func:`celery.group` instead."""from __future__ import absolute_importfrom __future__ import with_statementfrom celery.state import get_current_worker_taskfrom celery.app import app_or_defaultfrom celery.canvas import subtask, maybe_subtask  # noqafrom celery.utils import uuidclass TaskSet(list):    """A task containing several subtasks, making it possible    to track how many, or when all of the tasks have been completed.    :param tasks: A list of :class:`subtask` instances.    Example::        >>> urls = ('http://cnn.com/rss', 'http://bbc.co.uk/rss')        >>> s = TaskSet(refresh_feed.s(url) for url in urls)        >>> taskset_result = s.apply_async()        >>> list_of_return_values = taskset_result.join()  # *expensive*    """    app = None    def __init__(self, tasks=None, app=None, Publisher=None):        super(TaskSet, self).__init__(maybe_subtask(t) for t in tasks or [])        self.app = app_or_default(app or self.app)        self.Publisher = Publisher or self.app.amqp.TaskProducer        self.total = len(self)  # XXX compat    def apply_async(self, connection=None, connect_timeout=None,            publisher=None, taskset_id=None):        """Apply TaskSet."""        app = self.app        if app.conf.CELERY_ALWAYS_EAGER:            return self.apply(taskset_id=taskset_id)        with app.default_connection(connection, connect_timeout) as conn:            setid = taskset_id or uuid()            pub = publisher or self.Publisher(conn)            results = self._async_results(setid, pub)            result = app.TaskSetResult(setid, results)            parent = get_current_worker_task()            if parent:                parent.request.children.append(result)            return result    def _async_results(self, taskset_id, publisher):        return [task.apply_async(taskset_id=taskset_id, publisher=publisher)                    for task in self]    def apply(self, taskset_id=None):        """Applies the TaskSet locally by blocking until all tasks return."""        setid = taskset_id or uuid()        return self.app.TaskSetResult(setid, self._sync_results(setid))    def _sync_results(self, taskset_id):        return [task.apply(taskset_id=taskset_id) for task in self]    def _get_tasks(self):        return self    def _set_tasks(self, tasks):        self[:] = tasks    tasks = property(_get_tasks, _set_tasks)
 |