Ask Solem 16 лет назад
Родитель
Сommit
c4126fe626
4 измененных файлов с 286 добавлено и 265 удалено
  1. 4 2
      README.rst
  2. 205 0
      celery/task/__init__.py
  3. 5 263
      celery/task/base.py
  4. 72 0
      celery/task/builtins.py

+ 4 - 2
README.rst

@@ -237,7 +237,8 @@ advanced features of celery later.
 This is a task that basically does nothing but take some arguments,
 This is a task that basically does nothing but take some arguments,
 and return a value:
 and return a value:
 
 
-    >>> from celery.task import Task, tasks
+    >>> from celery.task import Task
+    >>> from celery.registry import tasks
     >>> class MyTask(Task):
     >>> class MyTask(Task):
     ...     name = "myapp.mytask"
     ...     name = "myapp.mytask"
     ...     def run(self, some_arg, **kwargs):
     ...     def run(self, some_arg, **kwargs):
@@ -296,7 +297,8 @@ Periodic Tasks
 Periodic tasks are tasks that are run every ``n`` seconds. 
 Periodic tasks are tasks that are run every ``n`` seconds. 
 Here's an example of a periodic task:
 Here's an example of a periodic task:
 
 
-    >>> from celery.task import tasks, PeriodicTask
+    >>> from celery.task import PeriodicTask
+    >>> from celery.registry import tasks
     >>> from datetime import timedelta
     >>> from datetime import timedelta
     >>> class MyPeriodicTask(PeriodicTask):
     >>> class MyPeriodicTask(PeriodicTask):
     ...     name = "foo.my-periodic-task"
     ...     name = "foo.my-periodic-task"

+ 205 - 0
celery/task/__init__.py

@@ -0,0 +1,205 @@
+"""
+
+Working with tasks and task sets.
+
+"""
+from carrot.connection import DjangoAMQPConnection
+from celery.conf import AMQP_CONNECTION_TIMEOUT
+from celery.messaging import TaskPublisher
+from celery.registry import tasks
+from celery.backends import default_backend
+from celery.result import AsyncResult
+from celery.task.base import Task, TaskSet, PeriodicTask
+from celery.task.builtins import AsynchronousMapTask, ExecuteRemoteTask
+from celery.task.builtins import DeleteExpiredTaskMetaTask, PingTask
+from functools import partial as curry
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+
+def apply_async(task, args=None, kwargs=None, routing_key=None,
+        immediate=None, mandatory=None, connection=None,
+        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None, **opts):
+    """Run a task asynchronously by the celery daemon(s).
+
+    :param task: The task to run (a callable object, or a :class:`Task`
+        instance
+
+    :param args: The positional arguments to pass on to the task (a ``list``).
+
+    :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
+
+
+    :keyword routing_key: The routing key used to route the task to a worker
+        server.
+
+    :keyword immediate: Request immediate delivery. Will raise an exception
+        if the task cannot be routed to a worker immediately.
+
+    :keyword mandatory: Mandatory routing. Raises an exception if there's
+        no running workers able to take on this task.
+
+    :keyword connection: Re-use existing AMQP connection.
+        The ``connect_timeout`` argument is not respected if this is set.
+
+    :keyword connect_timeout: The timeout in seconds, before we give up
+        on establishing a connection to the AMQP server.
+
+    :keyword priority: The task priority, a number between ``0`` and ``9``.
+
+    """
+    args = args or []
+    kwargs = kwargs or {}
+    routing_key = routing_key or getattr(task, "routing_key", None)
+    immediate = immediate or getattr(task, "immediate", None)
+    mandatory = mandatory or getattr(task, "mandatory", None)
+    priority = priority or getattr(task, "priority", None)
+    taskset_id = opts.get("taskset_id")
+    publisher = opts.get("publisher")
+
+    need_to_close_connection = False
+    if not publisher:
+        if not connection:
+            connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+            need_to_close_connection = True
+        publisher = TaskPublisher(connection=connection)
+
+    delay_task = publisher.delay_task
+    if taskset_id:
+        delay_task = curry(publisher.delay_task_in_set, taskset_id)
+        
+    task_id = delay_task(task.name, args, kwargs,
+                         routing_key=routing_key, mandatory=mandatory,
+                         immediate=immediate, priority=priority)
+
+    if need_to_close_connection:
+        publisher.close()
+        connection.close()
+
+    return AsyncResult(task_id)
+
+
+def delay_task(task_name, *args, **kwargs):
+    """Delay a task for execution by the ``celery`` daemon.
+
+    :param task_name: the name of a task registered in the task registry.
+
+    :param \*args: positional arguments to pass on to the task.
+
+    :param \*\*kwargs: keyword arguments to pass on to the task.
+
+    :raises celery.registry.NotRegistered: exception if no such task
+        has been registered in the task registry.
+
+    :rtype: :class:`celery.result.AsyncResult`.
+
+    Example
+
+        >>> r = delay_task("update_record", name="George Constanza", age=32)
+        >>> r.ready()
+        True
+        >>> r.result
+        "Record was updated"
+
+    """
+    if task_name not in tasks:
+        raise tasks.NotRegistered(
+                "Task with name %s not registered in the task registry." % (
+                    task_name))
+    task = tasks[task_name]
+    return apply_async(task, args, kwargs)
+
+
+def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
+    """Discard all waiting tasks.
+
+    This will ignore all tasks waiting for execution, and they will
+    be deleted from the messaging server.
+
+    :returns: the number of tasks discarded.
+
+    :rtype: int
+
+    """
+    amqp_connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
+    consumer = TaskConsumer(connection=amqp_connection)
+    discarded_count = consumer.discard_all()
+    amqp_connection.close()
+    return discarded_count
+
+
+def is_done(task_id):
+    """Returns ``True`` if task with ``task_id`` has been executed.
+
+    :rtype: bool
+
+    """
+    return default_backend.is_done(task_id)
+
+
+def dmap(func, args, timeout=None):
+    """Distribute processing of the arguments and collect the results.
+
+    Example
+
+        >>> from celery.task import map
+        >>> import operator
+        >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
+        [4, 8, 16]
+
+    """
+    return TaskSet.map(func, args, timeout=timeout)
+
+
+def dmap_async(func, args, timeout=None):
+    """Distribute processing of the arguments and collect the results
+    asynchronously.
+
+    :returns: :class:`celery.result.AsyncResult` object.
+
+    Example
+
+        >>> from celery.task import dmap_async
+        >>> import operator
+        >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
+        >>> presult
+        <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
+        >>> presult.status
+        'DONE'
+        >>> presult.result
+        [4, 8, 16]
+
+    """
+    return TaskSet.map_async(func, args, timeout=timeout)
+
+
+def execute_remote(func, *args, **kwargs):
+    """Execute arbitrary function/object remotely.
+
+    :param func: A callable function or object.
+
+    :param \*args: Positional arguments to apply to the function.
+
+    :param \*\*kwargs: Keyword arguments to apply to the function.
+
+    The object must be picklable, so you can't use lambdas or functions
+    defined in the REPL (the objects must have an associated module).
+
+    :returns: class:`celery.result.AsyncResult`.
+
+    """
+    return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
+
+
+def ping():
+    """Test if the server is alive.
+
+    Example:
+
+        >>> from celery.task import ping
+        >>> ping()
+        'pong'
+    """
+    return PingTask.apply_async().get()

+ 5 - 263
celery/task.py → celery/task/base.py

@@ -1,140 +1,14 @@
-"""
-
-Working with tasks and task sets.
-
-"""
 from carrot.connection import DjangoAMQPConnection
 from carrot.connection import DjangoAMQPConnection
 from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.conf import AMQP_CONNECTION_TIMEOUT
-from celery.conf import STATISTICS_COLLECT_INTERVAL
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
 from celery.log import setup_logger
-from celery.registry import tasks
+from celery.result import TaskSetResult
 from datetime import timedelta
 from datetime import timedelta
-from celery.backends import default_backend
-from celery.result import AsyncResult, TaskSetResult
-from functools import partial as curry
 import uuid
 import uuid
-import pickle
-
-
-def apply_async(task, args=None, kwargs=None, routing_key=None,
-        immediate=None, mandatory=None, connection=None,
-        connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None, **opts):
-    """Run a task asynchronously by the celery daemon(s).
-
-    :param task: The task to run (a callable object, or a :class:`Task`
-        instance
-
-    :param args: The positional arguments to pass on to the task (a ``list``).
-
-    :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
-
-
-    :keyword routing_key: The routing key used to route the task to a worker
-        server.
-
-    :keyword immediate: Request immediate delivery. Will raise an exception
-        if the task cannot be routed to a worker immediately.
-
-    :keyword mandatory: Mandatory routing. Raises an exception if there's
-        no running workers able to take on this task.
-
-    :keyword connection: Re-use existing AMQP connection.
-        The ``connect_timeout`` argument is not respected if this is set.
-
-    :keyword connect_timeout: The timeout in seconds, before we give up
-        on establishing a connection to the AMQP server.
-
-    :keyword priority: The task priority, a number between ``0`` and ``9``.
-
-    """
-    args = args or []
-    kwargs = kwargs or {}
-    routing_key = routing_key or getattr(task, "routing_key", None)
-    immediate = immediate or getattr(task, "immediate", None)
-    mandatory = mandatory or getattr(task, "mandatory", None)
-    priority = priority or getattr(task, "priority", None)
-    taskset_id = opts.get("taskset_id")
-    publisher = opts.get("publisher")
-
-    need_to_close_connection = False
-    if not publisher:
-        if not connection:
-            connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
-            need_to_close_connection = True
-        publisher = TaskPublisher(connection=connection)
-
-    delay_task = publisher.delay_task
-    if taskset_id:
-        delay_task = curry(publisher.delay_task_in_set, taskset_id)
-        
-    task_id = delay_task(task.name, args, kwargs,
-                         routing_key=routing_key, mandatory=mandatory,
-                         immediate=immediate, priority=priority)
-
-    if need_to_close_connection:
-        publisher.close()
-        connection.close()
-
-    return AsyncResult(task_id)
-
-
-def delay_task(task_name, *args, **kwargs):
-    """Delay a task for execution by the ``celery`` daemon.
-
-    :param task_name: the name of a task registered in the task registry.
-
-    :param \*args: positional arguments to pass on to the task.
-
-    :param \*\*kwargs: keyword arguments to pass on to the task.
-
-    :raises celery.registry.NotRegistered: exception if no such task
-        has been registered in the task registry.
-
-    :rtype: :class:`celery.result.AsyncResult`.
-
-    Example
-
-        >>> r = delay_task("update_record", name="George Constanza", age=32)
-        >>> r.ready()
-        True
-        >>> r.result
-        "Record was updated"
-
-    """
-    if task_name not in tasks:
-        raise tasks.NotRegistered(
-                "Task with name %s not registered in the task registry." % (
-                    task_name))
-    task = tasks[task_name]
-    return apply_async(task, args, kwargs)
-
-
-def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
-    """Discard all waiting tasks.
-
-    This will ignore all tasks waiting for execution, and they will
-    be deleted from the messaging server.
-
-    :returns: the number of tasks discarded.
-
-    :rtype: int
-
-    """
-    amqp_connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
-    consumer = TaskConsumer(connection=amqp_connection)
-    discarded_count = consumer.discard_all()
-    amqp_connection.close()
-    return discarded_count
-
-
-def is_done(task_id):
-    """Returns ``True`` if task with ``task_id`` has been executed.
-
-    :rtype: bool
-
-    """
-    return default_backend.is_done(task_id)
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
 
 
 
 
 class Task(object):
 class Task(object):
@@ -471,54 +345,6 @@ class TaskSet(object):
         return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
         return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
 
 
 
 
-def dmap(func, args, timeout=None):
-    """Distribute processing of the arguments and collect the results.
-
-    Example
-
-        >>> from celery.task import map
-        >>> import operator
-        >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
-        [4, 8, 16]
-
-    """
-    return TaskSet.map(func, args, timeout=timeout)
-
-
-class AsynchronousMapTask(Task):
-    """Task used internally by :func:`dmap_async` and
-    :meth:`TaskSet.map_async`.  """
-    name = "celery.map_async"
-
-    def run(self, serfunc, args, **kwargs):
-        """The method run by ``celeryd``."""
-        timeout = kwargs.get("timeout")
-        return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
-tasks.register(AsynchronousMapTask)
-
-
-def dmap_async(func, args, timeout=None):
-    """Distribute processing of the arguments and collect the results
-    asynchronously.
-
-    :returns: :class:`celery.result.AsyncResult` object.
-
-    Example
-
-        >>> from celery.task import dmap_async
-        >>> import operator
-        >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
-        >>> presult
-        <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
-        >>> presult.status
-        'DONE'
-        >>> presult.result
-        [4, 8, 16]
-
-    """
-    return TaskSet.map_async(func, args, timeout=timeout)
-
-
 class PeriodicTask(Task):
 class PeriodicTask(Task):
     """A periodic task is a task that behaves like a :manpage:`cron` job.
     """A periodic task is a task that behaves like a :manpage:`cron` job.
 
 
@@ -560,87 +386,3 @@ class PeriodicTask(Task):
             self.run_every = timedelta(seconds=self.run_every)
             self.run_every = timedelta(seconds=self.run_every)
 
 
         super(PeriodicTask, self).__init__()
         super(PeriodicTask, self).__init__()
-
-
-class ExecuteRemoteTask(Task):
-    """Execute an arbitrary function or object.
-
-    *Note* You probably want :func:`execute_remote` instead, which this
-    is an internal component of.
-
-    The object must be pickleable, so you can't use lambdas or functions
-    defined in the REPL (that is the python shell, or ``ipython``).
-
-    """
-    name = "celery.execute_remote"
-
-    def run(self, ser_callable, fargs, fkwargs, **kwargs):
-        """
-        :param ser_callable: A pickled function or callable object.
-
-        :param fargs: Positional arguments to apply to the function.
-
-        :param fkwargs: Keyword arguments to apply to the function.
-
-        """
-        callable_ = pickle.loads(ser_callable)
-        return callable_(*fargs, **fkwargs)
-tasks.register(ExecuteRemoteTask)
-
-
-def execute_remote(func, *args, **kwargs):
-    """Execute arbitrary function/object remotely.
-
-    :param func: A callable function or object.
-
-    :param \*args: Positional arguments to apply to the function.
-
-    :param \*\*kwargs: Keyword arguments to apply to the function.
-
-    The object must be picklable, so you can't use lambdas or functions
-    defined in the REPL (the objects must have an associated module).
-
-    :returns: class:`celery.result.AsyncResult`.
-
-    """
-    return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
-
-
-class DeleteExpiredTaskMetaTask(PeriodicTask):
-    """A periodic task that deletes expired task metadata every day.
-
-    This runs the current backend's
-    :meth:`celery.backends.base.BaseBackend.cleanup` method.
-
-    """
-    name = "celery.delete_expired_task_meta"
-    run_every = timedelta(days=1)
-
-    def run(self, **kwargs):
-        """The method run by ``celeryd``."""
-        logger = self.get_logger(**kwargs)
-        logger.info("Deleting expired task meta objects...")
-        default_backend.cleanup()
-tasks.register(DeleteExpiredTaskMetaTask)
-
-
-class PingTask(Task):
-    """The task used by :func:`ping`."""
-    name = "celery.ping"
-
-    def run(self, **kwargs):
-        """:returns: the string ``"pong"``."""
-        return "pong"
-tasks.register(PingTask)
-
-
-def ping():
-    """Test if the server is alive.
-
-    Example:
-
-        >>> from celery.task import ping
-        >>> ping()
-        'pong'
-    """
-    return PingTask.apply_async().get()

+ 72 - 0
celery/task/builtins.py

@@ -0,0 +1,72 @@
+from celery.task.base import Task, TaskSet, PeriodicTask
+from datetime import timedelta
+try:
+    import cPickle as pickle
+except ImportError:
+    import pickle
+
+
+class AsynchronousMapTask(Task):
+    """Task used internally by :func:`dmap_async` and
+    :meth:`TaskSet.map_async`.  """
+    name = "celery.map_async"
+
+    def run(self, serfunc, args, **kwargs):
+        """The method run by ``celeryd``."""
+        timeout = kwargs.get("timeout")
+        return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
+tasks.register(AsynchronousMapTask)
+
+
+class ExecuteRemoteTask(Task):
+    """Execute an arbitrary function or object.
+
+    *Note* You probably want :func:`execute_remote` instead, which this
+    is an internal component of.
+
+    The object must be pickleable, so you can't use lambdas or functions
+    defined in the REPL (that is the python shell, or ``ipython``).
+
+    """
+    name = "celery.execute_remote"
+
+    def run(self, ser_callable, fargs, fkwargs, **kwargs):
+        """
+        :param ser_callable: A pickled function or callable object.
+
+        :param fargs: Positional arguments to apply to the function.
+
+        :param fkwargs: Keyword arguments to apply to the function.
+
+        """
+        callable_ = pickle.loads(ser_callable)
+        return callable_(*fargs, **fkwargs)
+tasks.register(ExecuteRemoteTask)
+
+
+class DeleteExpiredTaskMetaTask(PeriodicTask):
+    """A periodic task that deletes expired task metadata every day.
+
+    This runs the current backend's
+    :meth:`celery.backends.base.BaseBackend.cleanup` method.
+
+    """
+    name = "celery.delete_expired_task_meta"
+    run_every = timedelta(days=1)
+
+    def run(self, **kwargs):
+        """The method run by ``celeryd``."""
+        logger = self.get_logger(**kwargs)
+        logger.info("Deleting expired task meta objects...")
+        default_backend.cleanup()
+tasks.register(DeleteExpiredTaskMetaTask)
+
+
+class PingTask(Task):
+    """The task used by :func:`ping`."""
+    name = "celery.ping"
+
+    def run(self, **kwargs):
+        """:returns: the string ``"pong"``."""
+        return "pong"
+tasks.register(PingTask)