Kaynağa Gözat

Refactored .execute.apply_async: .messaging.TaskPublisher.send_task now
incorpoates all the functionality apply_async previously did (like converting
countdowns to eta), so apply_async is a front-end to .send_task, just using
the task classes default options. Also celery.execute.send_task has been
introduced, which can apply tasks using just the task name (perfect if the
client does not have the destination task in its task registry).

Ask Solem 15 yıl önce
ebeveyn
işleme
db933ed0ad
2 değiştirilmiş dosya ile 29 ekleme ve 23 silme
  1. 23 19
      celery/execute/__init__.py
  2. 6 4
      celery/messaging.py

+ 23 - 19
celery/execute/__init__.py

@@ -2,16 +2,18 @@ from datetime import datetime, timedelta
 
 from celery import conf
 from celery.utils import gen_unique_id, fun_takes_kwargs, mattrgetter
-from celery.result import EagerResult
+from celery.result import AsyncResult, EagerResult
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
 from celery.messaging import with_connection
+from celery.messaging import TaskPublisher
 
 extract_exec_options = mattrgetter("routing_key", "exchange",
                                    "immediate", "mandatory",
                                    "priority", "serializer")
 
 
+@with_connection
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
         task_id=None, publisher=None, connection=None, connect_timeout=None,
         **options):
@@ -68,34 +70,36 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     """
     if conf.ALWAYS_EAGER:
         return apply(task, args, kwargs, task_id=task_id)
-    return _apply_async(task, args=args, kwargs=kwargs, countdown=countdown,
-                        eta=eta, task_id=task_id, publisher=publisher,
-                        connection=connection,
-                        connect_timeout=connect_timeout, **options)
+
+    task = tasks[task.name] # get instance from registry
+    options = dict(extract_exec_options(task), **options)
+    exchange = options.get("exchange")
+
+    publish = publisher or task.get_publisher(connection, exchange=exchange)
+    try:
+        task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
+                                     countdown=countdown, eta=eta, **options)
+    finally:
+        publisher or publish.close()
+
+    return task.AsyncResult(task_id)
+
 
 
 @with_connection
-def _apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
+def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
         task_id=None, publisher=None, connection=None, connect_timeout=None,
-        **options):
+        result_cls=AsyncResult, **options):
 
-    task = tasks[task.name] # Get instance.
     exchange = options.get("exchange")
-    options = dict(extract_exec_options(task), **options)
-
-    if countdown: # Convert countdown to ETA.
-        eta = datetime.now() + timedelta(seconds=countdown)
-
-    publish = publisher or task.get_publisher(connection, exchange=exchange)
+    publish = publisher or TaskPublisher(connection, exchange=exchange)
     try:
-        task_id = publish.delay_task(task.name, args or [], kwargs or {},
-                                     task_id=task_id,
-                                     eta=eta,
-                                     **options)
+        task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
+                                     countdown=countdown, eta=eta, **options)
     finally:
         publisher or publish.close()
 
-    return task.AsyncResult(task_id)
+    return result_cls(task_id)
 
 
 def delay_task(task_name, *args, **kwargs):

+ 6 - 4
celery/messaging.py

@@ -4,6 +4,7 @@ Sending and Receiving Messages
 
 """
 import socket
+from datetime import datetime, timedelta
 
 from carrot.connection import DjangoBrokerConnection
 from carrot.messaging import Publisher, Consumer, ConsumerSet
@@ -43,12 +44,13 @@ class TaskPublisher(Publisher):
             _queues_declared = True
 
     def delay_task(self, task_name, task_args=None, task_kwargs=None,
-            task_id=None, taskset_id=None, **kwargs):
+            countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
         """Delay task for execution by the celery nodes."""
 
         task_id = task_id or gen_unique_id()
-        eta = kwargs.get("eta")
-        eta = eta and eta.isoformat()
+
+        if countdown: # Convert countdown to ETA.
+            eta = datetime.now() + timedelta(seconds=countdown)
 
         message_data = {
             "task": task_name,
@@ -56,7 +58,7 @@ class TaskPublisher(Publisher):
             "args": task_args or [],
             "kwargs": task_kwargs or {},
             "retries": kwargs.get("retries", 0),
-            "eta": eta,
+            "eta": eta and eta.isoformat(),
         }
 
         if taskset_id: