Ver Fonte

Adds eager execution. celery.execute.apply|Task.apply executes the function locally by
blocking until the task is done, for API compatiblity it returns an
celery.result.EagerResult instance. You can configure celery to always run
tasks locally by setting the CELERY_ALWAYS_EAGER setting to True.

Ask Solem há 16 anos atrás
pai
commit
1a03757b78
4 ficheiros alterados com 105 adições e 5 exclusões
  1. 13 0
      celery/conf.py
  2. 36 2
      celery/execute.py
  3. 38 0
      celery/result.py
  4. 18 3
      celery/task/base.py

+ 13 - 0
celery/conf.py

@@ -15,6 +15,7 @@ DEFAULT_DAEMON_LOG_FILE = "celeryd.log"
 DEFAULT_AMQP_CONNECTION_TIMEOUT = 4
 DEFAULT_STATISTICS = False
 DEFAULT_STATISTICS_COLLECT_INTERVAL = 60 * 5
+DEFAULT_ALWAYS_EAGER = False
 
 """
 .. data:: LOG_LEVELS
@@ -164,3 +165,15 @@ SEND_CELERY_TASK_ERROR_EMAILS = getattr(settings,
 STATISTICS_COLLECT_INTERVAL = getattr(settings,
                                 "CELERY_STATISTICS_COLLECT_INTERVAL",
                                 DEFAULT_STATISTICS_COLLECT_INTERVAL)
+
+"""
+.. data:: ALWAYS_EAGER
+
+    If this is ``True``, all tasks will be executed locally by blocking
+    until it is finished. ``apply_async`` and ``delay_task`` will return
+    a :class:`celery.result.EagerResult` which emulates the behaviour of
+    an :class:`celery.result.AsyncResult`.
+
+"""
+ALWAYS_EAGER = getattr(settings, "CELERY_ALWAYS_EAGER",
+                       DEFAULT_ALWAYS_EAGER)

+ 36 - 2
celery/execute.py

@@ -1,10 +1,12 @@
 from carrot.connection import DjangoAMQPConnection
-from celery.conf import AMQP_CONNECTION_TIMEOUT
-from celery.result import AsyncResult
+from celery.conf import AMQP_CONNECTION_TIMEOUT, ALWAYS_EAGER
+from celery.result import AsyncResult, EagerResult
 from celery.messaging import TaskPublisher
 from celery.registry import tasks
+from celery.utils import gen_unique_id
 from functools import partial as curry
 from datetime import datetime, timedelta
+import inspect
 
 
 def apply_async(task, args=None, kwargs=None, routing_key=None,
@@ -48,6 +50,10 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
 
     :keyword priority: The task priority, a number between ``0`` and ``9``.
 
+    :keyword eager: Don't actually send the task to the worker servers,
+        but execute them locally at once. This will block until the execution
+        is finished and return an :class:`celery.result.EagerResult` instance.
+
     """
     args = args or []
     kwargs = kwargs or {}
@@ -60,6 +66,9 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
     if countdown:
         eta = datetime.now() + timedelta(seconds=countdown)
 
+    if ALWAYS_EAGER:
+        return apply(task, args, kwargs)
+
     need_to_close_connection = False
     if not publisher:
         if not connection:
@@ -112,3 +121,28 @@ def delay_task(task_name, *args, **kwargs):
                     task_name))
     task = tasks[task_name]
     return apply_async(task, args, kwargs)
+
+
+def apply(task, args, kwargs, **ignored):
+    """Apply the task locally.
+
+    This will block until the task completes, and returns a
+    :class:`celery.result.EagerResult` instance.
+
+    """
+    args = args or []
+    kwargs = kwargs or {}
+    task_id = gen_unique_id()
+
+    # If it's a Task class we need to have to instance
+    # for it to be callable.
+    task = inspect.isclass(task) and task() or task
+
+    try:
+        ret_value = task(*args, **kwargs)
+        status = "DONE"
+    except Exception, exc:
+        ret_value = exc
+        status = "FAILURE"
+
+    return EagerResult(task_id, ret_value, status)

+ 38 - 0
celery/result.py

@@ -294,3 +294,41 @@ class TaskSetResult(object):
     def total(self):
         """The total number of tasks in the :class:`celery.task.TaskSet`."""
         return len(self.subtasks)
+
+
+class EagerResult(BaseAsyncResult):
+    """Result that we know has already been executed.  """
+    TimeoutError = TimeoutError
+
+    def __init__(self, task_id, ret_value, status):
+        self.task_id = task_id
+        self._result = ret_value
+        self._status = status
+
+    def is_done(self):
+        """Returns ``True`` if the task executed without failure."""
+        return self.status == "DONE"
+
+    def is_ready(self):
+        """Returns ``True`` if the task has been executed."""
+        return True
+
+    def wait(self, timeout=None):
+        """Wait until the task has been executed and return its result."""
+        if self.status == "DONE":
+            return self.result
+        elif self.status == "FAILURE":
+            raise self.result
+
+    @property
+    def result(self):
+        """The tasks return value"""
+        return self._result
+
+    @property
+    def status(self):
+        """The tasks status"""
+        return self._status
+    
+    def __repr__(self):
+        return "<EagerResult: %s>" % self.task_id

+ 18 - 3
celery/task/base.py

@@ -3,7 +3,7 @@ from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
 from celery.result import TaskSetResult
-from celery.execute import apply_async, delay_task
+from celery.execute import apply_async, delay_task, apply
 from celery.utils import gen_unique_id
 from datetime import timedelta
 
@@ -182,7 +182,7 @@ class Task(object):
 
         :rtype: :class:`celery.result.AsyncResult`
 
-        See :func:`delay_task`.
+        See :func:`celery.execute.delay_task`.
 
         """
         return apply_async(cls, args, kwargs)
@@ -197,11 +197,26 @@ class Task(object):
 
         :rtype: :class:`celery.result.AsyncResult`
 
-        See :func:`apply_async`.
+        See :func:`celery.execute.apply_async`.
 
         """
         return apply_async(cls, args, kwargs, **options)
 
+    @classmethod
+    def apply(cls, args=None, kwargs=None, **options):
+        """Execute this task at once, by blocking until the task
+        has finished executing.
+
+        :param args: positional arguments passed on to the task.
+
+        :param kwargs: keyword arguments passed on to the task.
+
+        :rtype: :class:`celery.result.EagerResult`
+
+        See :func:`celery.execute.apply`.
+
+        """
+        return apply(cls, args, kwargs, **options)
 
 class TaskSet(object):
     """A task containing several subtasks, making it possible