소스 검색

Group is now lazy until .apply_async, but having regen support lazy __getitem__ for iterators

Ask Solem 9 년 전
부모
커밋
9982773022
2개의 변경된 파일52개의 추가작업 그리고 28개의 파일을 삭제
  1. 25 23
      celery/canvas.py
  2. 27 5
      celery/utils/functional.py

+ 25 - 23
celery/canvas.py

@@ -27,7 +27,7 @@ from celery.local import try_import
 from celery.result import GroupResult
 from celery.utils import abstract
 from celery.utils.functional import (
-    maybe_list, is_list, regen, chunks as _chunks,
+    maybe_list, is_list, _regen, regen, chunks as _chunks,
 )
 from celery.utils.text import truncate
 
@@ -661,7 +661,7 @@ def _maybe_group(tasks, app):
     elif isinstance(tasks, abstract.CallableSignature):
         tasks = [tasks]
     else:
-        tasks = [signature(t, app=app) for t in regen(tasks)]
+        tasks = [signature(t, app=app) for t in tasks]
     return tasks
 
 
@@ -670,9 +670,12 @@ class group(Signature):
     tasks = _getitem_property('kwargs.tasks')
 
     def __init__(self, *tasks, **options):
-        app = options.get('app')
         if len(tasks) == 1:
-            tasks = _maybe_group(tasks[0], app)
+            tasks = tasks[0]
+            if isinstance(tasks, group):
+                tasks = tasks.tasks
+            if not isinstance(tasks, _regen):
+                tasks = regen(tasks)
         Signature.__init__(
             self, 'celery.group', (), {'tasks': tasks}, **options
         )
@@ -691,25 +694,24 @@ class group(Signature):
                   CallableSignature=abstract.CallableSignature,
                   from_dict=Signature.from_dict):
         for task in tasks:
-            if isinstance(task, dict):
-                if isinstance(task, CallableSignature):
-                    # local sigs are always of type Signature, and we
-                    # clone them to make sure we do not modify the originals.
-                    task = task.clone()
-                else:
-                    # serialized sigs must be converted to Signature.
-                    task = from_dict(task, app=app)
-                if isinstance(task, group):
-                    # needs yield_from :(
-                    unroll = task._prepared(
-                        task.tasks, partial_args, group_id, root_id, app,
-                    )
-                    for taskN, resN in unroll:
-                        yield taskN, resN
-                else:
-                    if partial_args and not task.immutable:
-                        task.args = tuple(partial_args) + tuple(task.args)
-                    yield task, task.freeze(group_id=group_id, root_id=root_id)
+            if isinstance(task, CallableSignature):
+                # local sigs are always of type Signature, and we
+                # clone them to make sure we do not modify the originals.
+                task = task.clone()
+            else:
+                # serialized sigs must be converted to Signature.
+                task = from_dict(task, app=app)
+            if isinstance(task, group):
+                # needs yield_from :(
+                unroll = task._prepared(
+                    task.tasks, partial_args, group_id, root_id, app,
+                )
+                for taskN, resN in unroll:
+                    yield taskN, resN
+            else:
+                if partial_args and not task.immutable:
+                    task.args = tuple(partial_args) + tuple(task.args)
+                yield task, task.freeze(group_id=group_id, root_id=root_id)
 
     def _apply_tasks(self, tasks, producer=None, app=None,
                      add_to_parent=None, **options):

+ 27 - 5
celery/utils/functional.py

@@ -14,10 +14,9 @@ import threading
 from collections import OrderedDict
 from functools import partial, wraps
 from inspect import getargspec, isfunction
-from itertools import islice
+from itertools import chain, islice
 
 from amqp import promise
-from kombu.utils import cached_property
 from kombu.utils.functional import lazy, maybe_evaluate, is_list, maybe_list
 
 from celery.five import UserDict, UserList, items, keys, range
@@ -320,6 +319,8 @@ class _regen(UserList, list):
     # must be subclass of list so that json can encode.
     def __init__(self, it):
         self.__it = it
+        self.__index = 0
+        self.__consumed = []
 
     def __reduce__(self):
         return list, (self.data,)
@@ -327,9 +328,30 @@ class _regen(UserList, list):
     def __length_hint__(self):
         return self.__it.__length_hint__()
 
-    @cached_property
+    def __iter__(self):
+        return chain(self.__consumed, self.__it)
+
+    def __getitem__(self, index):
+        if index < 0:
+            return self.data[index]
+        try:
+            return self.__consumed[index]
+        except IndexError:
+            try:
+                for i in range(self.__index, index + 1):
+                    self.__consumed.append(next(self.__it))
+            except StopIteration:
+                raise IndexError(index)
+            else:
+                return self.__consumed[index]
+
+    @property
     def data(self):
-        return list(self.__it)
+        try:
+            self.__consumed.extend(list(self.__it))
+        except StopIteration:
+            pass
+        return self.__consumed
 
 
 def dictfilter(d=None, **kw):
@@ -365,7 +387,7 @@ def head_from_fun(fun, bound=False, debug=False):
         fun_args=_argsfromspec(getargspec(fun)),
         fun_value=1,
     )
-    if debug:
+    if debug:  # pragma: no cover
         print(definition, file=sys.stderr)
     namespace = {'__name__': 'headof_{0}'.format(name)}
     exec(definition, namespace)