Explorar o código

Refactor celery.task. Split into three modules:

    * celery.task

        Contains apply_async, delay_task, discard_all, and task shortcuts
        plus imports objects from celery.task.base and celery.task.builtins

    * celery.task.base

        Contains task base classes: Task, PeriodicTask, TaskSet

    * celery.task.builtins

        Built-in tasks: PingTask, AsynchronousMapTask, ExecuteRemoteTask, ++
Ask Solem %!s(int64=16) %!d(string=hai) anos
pai
achega
0a4d9248a8
Modificáronse 3 ficheiros con 469 adicións e 450 borrados
  1. 9 450
      celery/task/__init__.py
  2. 388 0
      celery/task/base.py
  3. 72 0
      celery/task/builtins.py

+ 9 - 450
celery/task/__init__.py

@@ -5,16 +5,18 @@ Working with tasks and task sets.
 """
 from carrot.connection import DjangoAMQPConnection
 from celery.conf import AMQP_CONNECTION_TIMEOUT
-from celery.conf import STATISTICS_COLLECT_INTERVAL
-from celery.messaging import TaskPublisher, TaskConsumer
-from celery.log import setup_logger
+from celery.messaging import TaskPublisher
 from celery.registry import tasks
-from datetime import timedelta
 from celery.backends import default_backend
-from celery.result import AsyncResult, TaskSetResult
+from celery.result import AsyncResult
+from celery.task.base import Task, TaskSet, PeriodicTask
+from celery.task.builtins import AsynchronousMapTask, ExecuteRemoteTask
+from celery.task.builtins import DeleteExpiredTaskMetaTask, PingTask
 from functools import partial as curry
-import uuid
-import pickle
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
 
 
 def apply_async(task, args=None, kwargs=None, routing_key=None,
@@ -137,340 +139,6 @@ def is_done(task_id):
     return default_backend.is_done(task_id)
 
 
-class Task(object):
-    """A task that can be delayed for execution by the ``celery`` daemon.
-
-    All subclasses of :class:`Task` must define the :meth:`run` method,
-    which is the actual method the ``celery`` daemon executes.
-
-    The :meth:`run` method supports both positional, and keyword arguments.
-
-    .. attribute:: name
-
-        *REQUIRED* All subclasses of :class:`Task` has to define the
-        :attr:`name` attribute. This is the name of the task, registered
-        in the task registry, and passed to :func:`delay_task`.
-
-    .. attribute:: type
-
-        The type of task, currently this can be ``regular``, or ``periodic``,
-        however if you want a periodic task, you should subclass
-        :class:`PeriodicTask` instead.
-
-    .. attribute:: routing_key
-
-        Override the global default ``routing_key`` for this task.
-
-    .. attribute:: mandatory
-
-        If set, the message has mandatory routing. By default the message
-        is silently dropped by the broker if it can't be routed to a queue.
-        However - If the message is mandatory, an exception will be raised
-        instead.
-
-    .. attribute:: immediate:
-            
-        Request immediate delivery. If the message cannot be routed to a
-        task worker immediately, an exception will be raised. This is
-        instead of the default behaviour, where the broker will accept and
-        queue the message, but with no guarantee that the message will ever
-        be consumed.
-
-    .. attribute:: priority:
-    
-        The message priority. A number from ``0`` to ``9``.
-
-    .. attribute:: ignore_result
-
-        Don't store the status and return value. This means you can't
-        use the :class:`celery.result.AsyncResult` to check if the task is
-        done, or get its return value. Only use if you need the performance
-        and is able live without these features. Any exceptions raised will
-        store the return value/status as usual.
-
-    .. attribute:: disable_error_emails
-
-        Disable all error e-mails for this task (only applicable if
-        ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
-
-    :raises NotImplementedError: if the :attr:`name` attribute is not set.
-
-    The resulting class is callable, which if called will apply the
-    :meth:`run` method.
-
-    Examples
-
-    This is a simple task just logging a message,
-
-        >>> from celery.task import tasks, Task
-        >>> class MyTask(Task):
-        ...     name = "mytask"
-        ...
-        ...     def run(self, some_arg=None, **kwargs):
-        ...         logger = self.get_logger(**kwargs)
-        ...         logger.info("Running MyTask with arg some_arg=%s" %
-        ...                     some_arg))
-        ...         return 42
-        ... tasks.register(MyTask)
-
-    You can delay the task using the classmethod :meth:`delay`...
-
-        >>> result = MyTask.delay(some_arg="foo")
-        >>> result.status # after some time
-        'DONE'
-        >>> result.result
-        42
-
-    ...or using the :func:`delay_task` function, by passing the name of
-    the task.
-
-        >>> from celery.task import delay_task
-        >>> result = delay_task(MyTask.name, some_arg="foo")
-
-
-    """
-    name = None
-    type = "regular"
-    routing_key = None
-    immediate = False
-    mandatory = False
-    priority = None
-    ignore_result = False
-    disable_error_emails = False
-
-    def __init__(self):
-        if not self.name:
-            raise NotImplementedError("Tasks must define a name attribute.")
-
-    def __call__(self, *args, **kwargs):
-        return self.run(*args, **kwargs)
-
-    def run(self, *args, **kwargs):
-        """*REQUIRED* The actual task.
-
-        All subclasses of :class:`Task` must define the run method.
-
-        :raises NotImplementedError: by default, so you have to override
-            this method in your subclass.
-
-        """
-        raise NotImplementedError("Tasks must define a run method.")
-
-    def get_logger(self, **kwargs):
-        """Get process-aware logger object.
-
-        See :func:`celery.log.setup_logger`.
-
-        """
-        return setup_logger(**kwargs)
-
-    def get_publisher(self):
-        """Get a celery task message publisher.
-
-        :rtype: :class:`celery.messaging.TaskPublisher`.
-
-        Please be sure to close the AMQP connection when you're done
-        with this object, i.e.:
-
-            >>> publisher = self.get_publisher()
-            >>> # do something with publisher
-            >>> publisher.connection.close()
-
-        """
-        return TaskPublisher(connection=DjangoAMQPConnection(
-                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
-
-    def get_consumer(self):
-        """Get a celery task message consumer.
-
-        :rtype: :class:`celery.messaging.TaskConsumer`.
-
-        Please be sure to close the AMQP connection when you're done
-        with this object. i.e.:
-
-            >>> consumer = self.get_consumer()
-            >>> # do something with consumer
-            >>> consumer.connection.close()
-
-        """
-        return TaskConsumer(connection=DjangoAMQPConnection(
-                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
-
-    @classmethod
-    def delay(cls, *args, **kwargs):
-        """Delay this task for execution by the ``celery`` daemon(s).
-
-        :param \*args: positional arguments passed on to the task.
-
-        :param \*\*kwargs: keyword arguments passed on to the task.
-
-        :rtype: :class:`celery.result.AsyncResult`
-
-        See :func:`delay_task`.
-
-        """
-        return apply_async(cls, args, kwargs)
-
-    @classmethod
-    def apply_async(cls, args=None, kwargs=None, **options):
-        """Delay this task for execution by the ``celery`` daemon(s).
-
-        :param args: positional arguments passed on to the task.
-
-        :param kwargs: keyword arguments passed on to the task.
-
-        :rtype: :class:`celery.result.AsyncResult`
-
-        See :func:`apply_async`.
-
-        """
-        return apply_async(cls, args, kwargs, **options)
-
-
-class TaskSet(object):
-    """A task containing several subtasks, making it possible
-    to track how many, or when all of the tasks has been completed.
-
-    :param task: The task class or name.
-        Can either be a fully qualified task name, or a task class.
-
-    :param args: A list of args, kwargs pairs.
-        e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
-
-
-    .. attribute:: task_name
-
-        The name of the task.
-
-    .. attribute:: arguments
-
-        The arguments, as passed to the task set constructor.
-
-    .. attribute:: total
-
-        Total number of tasks in this task set.
-
-    Example
-
-        >>> from djangofeeds.tasks import RefreshFeedTask
-        >>> taskset = TaskSet(RefreshFeedTask, args=[
-        ...                 [], {"feed_url": "http://cnn.com/rss"},
-        ...                 [], {"feed_url": "http://bbc.com/rss"},
-        ...                 [], {"feed_url": "http://xkcd.com/rss"}])
-
-        >>> taskset_result = taskset.run()
-        >>> list_of_return_values = taskset.join()
-
-
-    """
-
-    def __init__(self, task, args):
-        try:
-            task_name = task.name
-            task_obj = task
-        except AttributeError:
-            task_name = task
-            task_obj = tasks[task_name]
-
-        self.task = task_obj
-        self.task_name = task_name
-        self.arguments = args
-        self.total = len(args)
-
-    def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
-        """Run all tasks in the taskset.
-
-        :returns: A :class:`celery.result.TaskSetResult` instance.
-
-        Example
-
-            >>> ts = TaskSet(RefreshFeedTask, [
-            ...         ["http://foo.com/rss", {}],
-            ...         ["http://bar.com/rss", {}],
-            ... )
-            >>> result = ts.run()
-            >>> result.taskset_id
-            "d2c9b261-8eff-4bfb-8459-1e1b72063514"
-            >>> result.subtask_ids
-            ["b4996460-d959-49c8-aeb9-39c530dcde25",
-            "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
-            >>> result.waiting()
-            True
-            >>> time.sleep(10)
-            >>> result.ready()
-            True
-            >>> result.successful()
-            True
-            >>> result.failed()
-            False
-            >>> result.join()
-            [True, True]
-
-        """
-        taskset_id = str(uuid.uuid4())
-        conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
-        publisher = TaskPublisher(connection=conn)
-        subtasks = [apply_async(self.task, args, kwargs,
-                                taskset_id=taskset_id, publisher=publisher)
-                        for args, kwargs in self.arguments]
-        publisher.close()
-        conn.close()
-        return TaskSetResult(taskset_id, subtasks)
-
-    def iterate(self):
-        """Iterate over the results returned after calling :meth:`run`.
-
-        If any of the tasks raises an exception, the exception will
-        be re-raised.
-
-        """
-        return iter(self.run())
-
-    def join(self, timeout=None):
-        """Gather the results for all of the tasks in the taskset,
-        and return a list with them ordered by the order of which they
-        were called.
-
-        :keyword timeout: The time in seconds, how long
-            it will wait for results, before the operation times out.
-
-        :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
-            and the operation takes longer than ``timeout`` seconds.
-
-        If any of the tasks raises an exception, the exception
-        will be reraised by :meth:`join`.
-
-        :returns: list of return values for all tasks in the taskset.
-
-        """
-        return self.run().join(timeout=timeout)
-
-    @classmethod
-    def remote_execute(cls, func, args):
-        """Apply ``args`` to function by distributing the args to the
-        celery server(s)."""
-        pickled = pickle.dumps(func)
-        arguments = [[[pickled, arg, {}], {}] for arg in args]
-        return cls(ExecuteRemoteTask, arguments)
-
-    @classmethod
-    def map(cls, func, args, timeout=None):
-        """Distribute processing of the arguments and collect the results."""
-        remote_task = cls.remote_execute(func, args)
-        return remote_task.join(timeout=timeout)
-
-    @classmethod
-    def map_async(cls, func, args, timeout=None):
-        """Distribute processing of the arguments and collect the results
-        asynchronously.
-
-        :returns: :class:`celery.result.AsyncResult` instance.
-
-        """
-        serfunc = pickle.dumps(func)
-        return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
-
-
 def dmap(func, args, timeout=None):
     """Distribute processing of the arguments and collect the results.
 
@@ -485,18 +153,6 @@ def dmap(func, args, timeout=None):
     return TaskSet.map(func, args, timeout=timeout)
 
 
-class AsynchronousMapTask(Task):
-    """Task used internally by :func:`dmap_async` and
-    :meth:`TaskSet.map_async`.  """
-    name = "celery.map_async"
-
-    def run(self, serfunc, args, **kwargs):
-        """The method run by ``celeryd``."""
-        timeout = kwargs.get("timeout")
-        return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
-tasks.register(AsynchronousMapTask)
-
-
 def dmap_async(func, args, timeout=None):
     """Distribute processing of the arguments and collect the results
     asynchronously.
@@ -519,75 +175,6 @@ def dmap_async(func, args, timeout=None):
     return TaskSet.map_async(func, args, timeout=timeout)
 
 
-class PeriodicTask(Task):
-    """A periodic task is a task that behaves like a :manpage:`cron` job.
-
-    .. attribute:: run_every
-
-        *REQUIRED* Defines how often the task is run (its interval),
-        it can be either a :class:`datetime.timedelta` object or an
-        integer specifying the time in seconds.
-
-    :raises NotImplementedError: if the :attr:`run_every` attribute is
-        not defined.
-
-    You have to register the periodic task in the task registry.
-
-    Example
-
-        >>> from celery.task import tasks, PeriodicTask
-        >>> from datetime import timedelta
-        >>> class MyPeriodicTask(PeriodicTask):
-        ...     name = "my_periodic_task"
-        ...     run_every = timedelta(seconds=30)
-        ...
-        ...     def run(self, **kwargs):
-        ...         logger = self.get_logger(**kwargs)
-        ...         logger.info("Running MyPeriodicTask")
-        >>> tasks.register(MyPeriodicTask)
-
-    """
-    run_every = timedelta(days=1)
-    type = "periodic"
-
-    def __init__(self):
-        if not self.run_every:
-            raise NotImplementedError(
-                    "Periodic tasks must have a run_every attribute")
-
-        # If run_every is a integer, convert it to timedelta seconds.
-        if isinstance(self.run_every, int):
-            self.run_every = timedelta(seconds=self.run_every)
-
-        super(PeriodicTask, self).__init__()
-
-
-class ExecuteRemoteTask(Task):
-    """Execute an arbitrary function or object.
-
-    *Note* You probably want :func:`execute_remote` instead, which this
-    is an internal component of.
-
-    The object must be pickleable, so you can't use lambdas or functions
-    defined in the REPL (that is the python shell, or ``ipython``).
-
-    """
-    name = "celery.execute_remote"
-
-    def run(self, ser_callable, fargs, fkwargs, **kwargs):
-        """
-        :param ser_callable: A pickled function or callable object.
-
-        :param fargs: Positional arguments to apply to the function.
-
-        :param fkwargs: Keyword arguments to apply to the function.
-
-        """
-        callable_ = pickle.loads(ser_callable)
-        return callable_(*fargs, **fkwargs)
-tasks.register(ExecuteRemoteTask)
-
-
 def execute_remote(func, *args, **kwargs):
     """Execute arbitrary function/object remotely.
 
@@ -606,34 +193,6 @@ def execute_remote(func, *args, **kwargs):
     return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
 
 
-class DeleteExpiredTaskMetaTask(PeriodicTask):
-    """A periodic task that deletes expired task metadata every day.
-
-    This runs the current backend's
-    :meth:`celery.backends.base.BaseBackend.cleanup` method.
-
-    """
-    name = "celery.delete_expired_task_meta"
-    run_every = timedelta(days=1)
-
-    def run(self, **kwargs):
-        """The method run by ``celeryd``."""
-        logger = self.get_logger(**kwargs)
-        logger.info("Deleting expired task meta objects...")
-        default_backend.cleanup()
-tasks.register(DeleteExpiredTaskMetaTask)
-
-
-class PingTask(Task):
-    """The task used by :func:`ping`."""
-    name = "celery.ping"
-
-    def run(self, **kwargs):
-        """:returns: the string ``"pong"``."""
-        return "pong"
-tasks.register(PingTask)
-
-
 def ping():
     """Test if the server is alive.
 

+ 388 - 0
celery/task/base.py

@@ -0,0 +1,388 @@
+from carrot.connection import DjangoAMQPConnection
+from celery.conf import AMQP_CONNECTION_TIMEOUT
+from celery.messaging import TaskPublisher, TaskConsumer
+from celery.log import setup_logger
+from celery.result import TaskSetResult
+from datetime import timedelta
+import uuid
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+
+class Task(object):
+    """A task that can be delayed for execution by the ``celery`` daemon.
+
+    All subclasses of :class:`Task` must define the :meth:`run` method,
+    which is the actual method the ``celery`` daemon executes.
+
+    The :meth:`run` method supports both positional, and keyword arguments.
+
+    .. attribute:: name
+
+        *REQUIRED* All subclasses of :class:`Task` has to define the
+        :attr:`name` attribute. This is the name of the task, registered
+        in the task registry, and passed to :func:`delay_task`.
+
+    .. attribute:: type
+
+        The type of task, currently this can be ``regular``, or ``periodic``,
+        however if you want a periodic task, you should subclass
+        :class:`PeriodicTask` instead.
+
+    .. attribute:: routing_key
+
+        Override the global default ``routing_key`` for this task.
+
+    .. attribute:: mandatory
+
+        If set, the message has mandatory routing. By default the message
+        is silently dropped by the broker if it can't be routed to a queue.
+        However - If the message is mandatory, an exception will be raised
+        instead.
+
+    .. attribute:: immediate:
+            
+        Request immediate delivery. If the message cannot be routed to a
+        task worker immediately, an exception will be raised. This is
+        instead of the default behaviour, where the broker will accept and
+        queue the message, but with no guarantee that the message will ever
+        be consumed.
+
+    .. attribute:: priority:
+    
+        The message priority. A number from ``0`` to ``9``.
+
+    .. attribute:: ignore_result
+
+        Don't store the status and return value. This means you can't
+        use the :class:`celery.result.AsyncResult` to check if the task is
+        done, or get its return value. Only use if you need the performance
+        and is able live without these features. Any exceptions raised will
+        store the return value/status as usual.
+
+    .. attribute:: disable_error_emails
+
+        Disable all error e-mails for this task (only applicable if
+        ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
+
+    :raises NotImplementedError: if the :attr:`name` attribute is not set.
+
+    The resulting class is callable, which if called will apply the
+    :meth:`run` method.
+
+    Examples
+
+    This is a simple task just logging a message,
+
+        >>> from celery.task import tasks, Task
+        >>> class MyTask(Task):
+        ...     name = "mytask"
+        ...
+        ...     def run(self, some_arg=None, **kwargs):
+        ...         logger = self.get_logger(**kwargs)
+        ...         logger.info("Running MyTask with arg some_arg=%s" %
+        ...                     some_arg))
+        ...         return 42
+        ... tasks.register(MyTask)
+
+    You can delay the task using the classmethod :meth:`delay`...
+
+        >>> result = MyTask.delay(some_arg="foo")
+        >>> result.status # after some time
+        'DONE'
+        >>> result.result
+        42
+
+    ...or using the :func:`delay_task` function, by passing the name of
+    the task.
+
+        >>> from celery.task import delay_task
+        >>> result = delay_task(MyTask.name, some_arg="foo")
+
+
+    """
+    name = None
+    type = "regular"
+    routing_key = None
+    immediate = False
+    mandatory = False
+    priority = None
+    ignore_result = False
+    disable_error_emails = False
+
+    def __init__(self):
+        if not self.name:
+            raise NotImplementedError("Tasks must define a name attribute.")
+
+    def __call__(self, *args, **kwargs):
+        return self.run(*args, **kwargs)
+
+    def run(self, *args, **kwargs):
+        """*REQUIRED* The actual task.
+
+        All subclasses of :class:`Task` must define the run method.
+
+        :raises NotImplementedError: by default, so you have to override
+            this method in your subclass.
+
+        """
+        raise NotImplementedError("Tasks must define a run method.")
+
+    def get_logger(self, **kwargs):
+        """Get process-aware logger object.
+
+        See :func:`celery.log.setup_logger`.
+
+        """
+        return setup_logger(**kwargs)
+
+    def get_publisher(self):
+        """Get a celery task message publisher.
+
+        :rtype: :class:`celery.messaging.TaskPublisher`.
+
+        Please be sure to close the AMQP connection when you're done
+        with this object, i.e.:
+
+            >>> publisher = self.get_publisher()
+            >>> # do something with publisher
+            >>> publisher.connection.close()
+
+        """
+        return TaskPublisher(connection=DjangoAMQPConnection(
+                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
+
+    def get_consumer(self):
+        """Get a celery task message consumer.
+
+        :rtype: :class:`celery.messaging.TaskConsumer`.
+
+        Please be sure to close the AMQP connection when you're done
+        with this object. i.e.:
+
+            >>> consumer = self.get_consumer()
+            >>> # do something with consumer
+            >>> consumer.connection.close()
+
+        """
+        return TaskConsumer(connection=DjangoAMQPConnection(
+                                connect_timeout=AMQP_CONNECTION_TIMEOUT))
+
+    @classmethod
+    def delay(cls, *args, **kwargs):
+        """Delay this task for execution by the ``celery`` daemon(s).
+
+        :param \*args: positional arguments passed on to the task.
+
+        :param \*\*kwargs: keyword arguments passed on to the task.
+
+        :rtype: :class:`celery.result.AsyncResult`
+
+        See :func:`delay_task`.
+
+        """
+        return apply_async(cls, args, kwargs)
+
+    @classmethod
+    def apply_async(cls, args=None, kwargs=None, **options):
+        """Delay this task for execution by the ``celery`` daemon(s).
+
+        :param args: positional arguments passed on to the task.
+
+        :param kwargs: keyword arguments passed on to the task.
+
+        :rtype: :class:`celery.result.AsyncResult`
+
+        See :func:`apply_async`.
+
+        """
+        return apply_async(cls, args, kwargs, **options)
+
+
+class TaskSet(object):
+    """A task containing several subtasks, making it possible
+    to track how many, or when all of the tasks has been completed.
+
+    :param task: The task class or name.
+        Can either be a fully qualified task name, or a task class.
+
+    :param args: A list of args, kwargs pairs.
+        e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
+
+
+    .. attribute:: task_name
+
+        The name of the task.
+
+    .. attribute:: arguments
+
+        The arguments, as passed to the task set constructor.
+
+    .. attribute:: total
+
+        Total number of tasks in this task set.
+
+    Example
+
+        >>> from djangofeeds.tasks import RefreshFeedTask
+        >>> taskset = TaskSet(RefreshFeedTask, args=[
+        ...                 [], {"feed_url": "http://cnn.com/rss"},
+        ...                 [], {"feed_url": "http://bbc.com/rss"},
+        ...                 [], {"feed_url": "http://xkcd.com/rss"}])
+
+        >>> taskset_result = taskset.run()
+        >>> list_of_return_values = taskset.join()
+
+
+    """
+
+    def __init__(self, task, args):
+        try:
+            task_name = task.name
+            task_obj = task
+        except AttributeError:
+            task_name = task
+            task_obj = tasks[task_name]
+
+        self.task = task_obj
+        self.task_name = task_name
+        self.arguments = args
+        self.total = len(args)
+
+    def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
+        """Run all tasks in the taskset.
+
+        :returns: A :class:`celery.result.TaskSetResult` instance.
+
+        Example
+
+            >>> ts = TaskSet(RefreshFeedTask, [
+            ...         ["http://foo.com/rss", {}],
+            ...         ["http://bar.com/rss", {}],
+            ... )
+            >>> result = ts.run()
+            >>> result.taskset_id
+            "d2c9b261-8eff-4bfb-8459-1e1b72063514"
+            >>> result.subtask_ids
+            ["b4996460-d959-49c8-aeb9-39c530dcde25",
+            "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
+            >>> result.waiting()
+            True
+            >>> time.sleep(10)
+            >>> result.ready()
+            True
+            >>> result.successful()
+            True
+            >>> result.failed()
+            False
+            >>> result.join()
+            [True, True]
+
+        """
+        taskset_id = str(uuid.uuid4())
+        conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
+        publisher = TaskPublisher(connection=conn)
+        subtasks = [apply_async(self.task, args, kwargs,
+                                taskset_id=taskset_id, publisher=publisher)
+                        for args, kwargs in self.arguments]
+        publisher.close()
+        conn.close()
+        return TaskSetResult(taskset_id, subtasks)
+
+    def iterate(self):
+        """Iterate over the results returned after calling :meth:`run`.
+
+        If any of the tasks raises an exception, the exception will
+        be re-raised.
+
+        """
+        return iter(self.run())
+
+    def join(self, timeout=None):
+        """Gather the results for all of the tasks in the taskset,
+        and return a list with them ordered by the order of which they
+        were called.
+
+        :keyword timeout: The time in seconds, how long
+            it will wait for results, before the operation times out.
+
+        :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
+            and the operation takes longer than ``timeout`` seconds.
+
+        If any of the tasks raises an exception, the exception
+        will be reraised by :meth:`join`.
+
+        :returns: list of return values for all tasks in the taskset.
+
+        """
+        return self.run().join(timeout=timeout)
+
+    @classmethod
+    def remote_execute(cls, func, args):
+        """Apply ``args`` to function by distributing the args to the
+        celery server(s)."""
+        pickled = pickle.dumps(func)
+        arguments = [[[pickled, arg, {}], {}] for arg in args]
+        return cls(ExecuteRemoteTask, arguments)
+
+    @classmethod
+    def map(cls, func, args, timeout=None):
+        """Distribute processing of the arguments and collect the results."""
+        remote_task = cls.remote_execute(func, args)
+        return remote_task.join(timeout=timeout)
+
+    @classmethod
+    def map_async(cls, func, args, timeout=None):
+        """Distribute processing of the arguments and collect the results
+        asynchronously.
+
+        :returns: :class:`celery.result.AsyncResult` instance.
+
+        """
+        serfunc = pickle.dumps(func)
+        return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
+
+
+class PeriodicTask(Task):
+    """A periodic task is a task that behaves like a :manpage:`cron` job.
+
+    .. attribute:: run_every
+
+        *REQUIRED* Defines how often the task is run (its interval),
+        it can be either a :class:`datetime.timedelta` object or an
+        integer specifying the time in seconds.
+
+    :raises NotImplementedError: if the :attr:`run_every` attribute is
+        not defined.
+
+    You have to register the periodic task in the task registry.
+
+    Example
+
+        >>> from celery.task import tasks, PeriodicTask
+        >>> from datetime import timedelta
+        >>> class MyPeriodicTask(PeriodicTask):
+        ...     name = "my_periodic_task"
+        ...     run_every = timedelta(seconds=30)
+        ...
+        ...     def run(self, **kwargs):
+        ...         logger = self.get_logger(**kwargs)
+        ...         logger.info("Running MyPeriodicTask")
+        >>> tasks.register(MyPeriodicTask)
+
+    """
+    run_every = timedelta(days=1)
+    type = "periodic"
+
+    def __init__(self):
+        if not self.run_every:
+            raise NotImplementedError(
+                    "Periodic tasks must have a run_every attribute")
+
+        # If run_every is a integer, convert it to timedelta seconds.
+        if isinstance(self.run_every, int):
+            self.run_every = timedelta(seconds=self.run_every)
+
+        super(PeriodicTask, self).__init__()

+ 72 - 0
celery/task/builtins.py

@@ -0,0 +1,72 @@
+from celery.task.base import Task, TaskSet, PeriodicTask
+from datetime import timedelta
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+
+class AsynchronousMapTask(Task):
+    """Task used internally by :func:`dmap_async` and
+    :meth:`TaskSet.map_async`.  """
+    name = "celery.map_async"
+
+    def run(self, serfunc, args, **kwargs):
+        """The method run by ``celeryd``."""
+        timeout = kwargs.get("timeout")
+        return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
+tasks.register(AsynchronousMapTask)
+
+
+class ExecuteRemoteTask(Task):
+    """Execute an arbitrary function or object.
+
+    *Note* You probably want :func:`execute_remote` instead, which this
+    is an internal component of.
+
+    The object must be pickleable, so you can't use lambdas or functions
+    defined in the REPL (that is the python shell, or ``ipython``).
+
+    """
+    name = "celery.execute_remote"
+
+    def run(self, ser_callable, fargs, fkwargs, **kwargs):
+        """
+        :param ser_callable: A pickled function or callable object.
+
+        :param fargs: Positional arguments to apply to the function.
+
+        :param fkwargs: Keyword arguments to apply to the function.
+
+        """
+        callable_ = pickle.loads(ser_callable)
+        return callable_(*fargs, **fkwargs)
+tasks.register(ExecuteRemoteTask)
+
+
+class DeleteExpiredTaskMetaTask(PeriodicTask):
+    """A periodic task that deletes expired task metadata every day.
+
+    This runs the current backend's
+    :meth:`celery.backends.base.BaseBackend.cleanup` method.
+
+    """
+    name = "celery.delete_expired_task_meta"
+    run_every = timedelta(days=1)
+
+    def run(self, **kwargs):
+        """The method run by ``celeryd``."""
+        logger = self.get_logger(**kwargs)
+        logger.info("Deleting expired task meta objects...")
+        default_backend.cleanup()
+tasks.register(DeleteExpiredTaskMetaTask)
+
+
+class PingTask(Task):
+    """The task used by :func:`ping`."""
+    name = "celery.ping"
+
+    def run(self, **kwargs):
+        """:returns: the string ``"pong"``."""
+        return "pong"
+tasks.register(PingTask)