Prechádzať zdrojové kódy

Split eager run in TaskSet.run into TaskSet.run_eager

Ask Solem 15 rokov pred
rodič
commit
5fac4f99c7
1 zmenil súbory, kde vykonal 11 pridanie a 10 odobranie
  1. 11 10
      celery/task/base.py

+ 11 - 10
celery/task/base.py

@@ -326,8 +326,7 @@ class Task(object):
         if kwargs.get("task_is_eager", False):
             result = self.apply(args=args, kwargs=kwargs, **options)
             if isinstance(result, EagerResult):
-                # get() propogates any exceptions.
-                return result.get()
+                return result.get() # propogates exceptions.
             return result
 
         self.apply_async(args=args, kwargs=kwargs, **options)
@@ -369,7 +368,7 @@ class Task(object):
     def on_success(self, retval, task_id, args, kwargs):
         """Success handler.
 
-        This is run by the worker when the task executed successfully.
+        Run by the worker if the task executes successfully.
 
         :param retval: The return value of the task.
         :param task_id: Unique id of the executed task.
@@ -387,9 +386,7 @@ class Task(object):
         has finished executing.
 
         :param args: positional arguments passed on to the task.
-
         :param kwargs: keyword arguments passed on to the task.
-
         :rtype: :class:`celery.result.EagerResult`
 
         See :func:`celery.execute.apply`.
@@ -517,13 +514,11 @@ class TaskSet(object):
             [True, True]
 
         """
-        taskset_id = gen_unique_id()
-
         from celery.conf import ALWAYS_EAGER
         if ALWAYS_EAGER:
-            subtasks = [apply(self.task, args, kwargs)
-                            for args, kwargs in self.arguments]
-            return TaskSetResult(taskset_id, subtasks)
+            return self.run_eager()
+
+        taskset_id = gen_unique_id()
 
         conn = self.task.establish_connection(connect_timeout=connect_timeout)
         publisher = self.task.get_publisher(connection=conn)
@@ -536,6 +531,12 @@ class TaskSet(object):
             conn.close()
         return TaskSetResult(taskset_id, subtasks)
 
+    def run_eager(self):
+        taskset_id = gen_unique_id()
+        subtasks = [apply(self.task, args, kwargs)
+                        for args, kwargs in self.arguments]
+        return TaskSetResult(taskset_id, subtasks)
+
     @classmethod
     def remote_execute(cls, func, args):
         """Apply ``args`` to function by distributing the args to the