sets.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.sets
  4. ~~~~~~~~~~~~~~~~
  5. Old ``group`` implementation, this module should
  6. not be used anymore use :func:`celery.group` instead.
  7. """
  8. from __future__ import absolute_import
  9. from __future__ import with_statement
  10. from celery.state import get_current_worker_task
  11. from celery.app import app_or_default
  12. from celery.canvas import subtask, maybe_subtask # noqa
  13. from celery.utils import uuid
  14. class TaskSet(list):
  15. """A task containing several subtasks, making it possible
  16. to track how many, or when all of the tasks have been completed.
  17. :param tasks: A list of :class:`subtask` instances.
  18. Example::
  19. >>> urls = ('http://cnn.com/rss', 'http://bbc.co.uk/rss')
  20. >>> s = TaskSet(refresh_feed.s(url) for url in urls)
  21. >>> taskset_result = s.apply_async()
  22. >>> list_of_return_values = taskset_result.join() # *expensive*
  23. """
  24. app = None
  25. def __init__(self, tasks=None, app=None, Publisher=None):
  26. super(TaskSet, self).__init__(maybe_subtask(t) for t in tasks or [])
  27. self.app = app_or_default(app or self.app)
  28. self.Publisher = Publisher or self.app.amqp.TaskProducer
  29. self.total = len(self) # XXX compat
  30. def apply_async(self, connection=None, connect_timeout=None,
  31. publisher=None, taskset_id=None):
  32. """Apply TaskSet."""
  33. app = self.app
  34. if app.conf.CELERY_ALWAYS_EAGER:
  35. return self.apply(taskset_id=taskset_id)
  36. with app.default_connection(connection, connect_timeout) as conn:
  37. setid = taskset_id or uuid()
  38. pub = publisher or self.Publisher(conn)
  39. results = self._async_results(setid, pub)
  40. result = app.TaskSetResult(setid, results)
  41. parent = get_current_worker_task()
  42. if parent:
  43. parent.request.children.append(result)
  44. return result
  45. def _async_results(self, taskset_id, publisher):
  46. return [task.apply_async(taskset_id=taskset_id, publisher=publisher)
  47. for task in self]
  48. def apply(self, taskset_id=None):
  49. """Applies the TaskSet locally by blocking until all tasks return."""
  50. setid = taskset_id or uuid()
  51. return self.app.TaskSetResult(setid, self._sync_results(setid))
  52. def _sync_results(self, taskset_id):
  53. return [task.apply(taskset_id=taskset_id) for task in self]
  54. def _get_tasks(self):
  55. return self
  56. def _set_tasks(self, tasks):
  57. self[:] = tasks
  58. tasks = property(_get_tasks, _set_tasks)