Explorar o código

Canvas: Unroll groups with only one item. Closes #1656

Ask Solem %!s(int64=11) %!d(string=hai) anos
pai
achega
531b7f342f
Modificáronse 3 ficheiros con 40 adicións e 0 borrados
  1. 19 0
      celery/app/builtins.py
  2. 18 0
      celery/canvas.py
  3. 3 0
      celery/utils/functional.py

+ 19 - 0
celery/app/builtins.py

@@ -21,6 +21,20 @@ __all__ = ['shared_task', 'load_shared_tasks']
 _shared_tasks = set()
 
 
+def maybe_unroll_group(g):
+    try:
+        size = len(g.tasks)
+    except TypeError:
+        try:
+            size = g.tasks.__length_hint__()
+        except (AttributeError, TypeError):
+            pass
+        else:
+            return list(g.tasks)[0] if size == 1 else g
+    else:
+        return g.tasks[0] if size == 1 else g
+
+
 def shared_task(constructor):
     """Decorator that specifies a function that generates a built-in task.
 
@@ -244,10 +258,13 @@ def add_chain_task(app):
                 res = task.freeze()
                 i += 1
 
+                if isinstance(task, group):
+                    task = maybe_unroll_group(task)
                 if isinstance(task, chain):
                     # splice the chain
                     steps.extendleft(reversed(task.tasks))
                     continue
+
                 elif isinstance(task, group) and steps and \
                         not isinstance(steps[0], group):
                     # automatically upgrade group(..) | s to chord(group, s)
@@ -271,6 +288,8 @@ def add_chain_task(app):
                     tasks.append(task)
                 prev_task, prev_res = task, res
 
+            print(tasks)
+
             return tasks, results
 
         def apply_async(self, args=(), kwargs={}, group_id=None, chord=None,

+ 18 - 0
celery/canvas.py

@@ -82,6 +82,22 @@ class _getitem_property(object):
         self._path(obj)[self.key] = value
 
 
+def maybe_unroll_group(g):
+    """Unroll group with only one member."""
+    # Issue #1656
+    try:
+        size = len(g.tasks)
+    except TypeError:
+        try:
+            size = g.tasks.__length_hint__()
+        except (AttributeError, TypeError):
+            pass
+        else:
+            return list(g.tasks)[0] if size == 1 else g
+    else:
+        return g.tasks[0] if size == 1 else g
+
+
 class Signature(dict):
     """Class that wraps the arguments and execution options
     for a single task invocation.
@@ -240,6 +256,8 @@ class Signature(dict):
         )))
 
     def __or__(self, other):
+        if isinstance(other, group):
+            other = maybe_unroll_group(other)
         if not isinstance(self, chain) and isinstance(other, chain):
             return chain((self, ) + other.tasks, app=self._app)
         elif isinstance(other, chain):

+ 3 - 0
celery/utils/functional.py

@@ -289,6 +289,9 @@ class _regen(UserList, list):
     def __reduce__(self):
         return list, (self.data, )
 
+    def __length_hint__(self):
+        return self.__it.__length_hint__()
+
     @cached_property
     def data(self):
         return list(self.__it)