浏览代码

chord(group()) now works properly, and subtask proxies are now immutable

Ask Solem 13 年之前
父节点
当前提交
cea68a0e54
共有 3 个文件被更改,包括 34 次插入18 次删除
  1. 7 4
      celery/_state.py
  2. 1 1
      celery/app/builtins.py
  3. 26 13
      celery/canvas.py

+ 7 - 4
celery/_state.py

@@ -17,8 +17,12 @@ import weakref
 from celery.local import Proxy
 from celery.local import Proxy
 from celery.utils.threads import LocalStack
 from celery.utils.threads import LocalStack
 
 
+#: Global default app used when no current app.
 default_app = None
 default_app = None
 
 
+#: List of all app instances (weakrefs), must not be used directly.
+_apps = set()
+
 
 
 class _TLS(threading.local):
 class _TLS(threading.local):
     #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
     #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
@@ -61,12 +65,11 @@ def get_current_worker_task():
             return task
             return task
 
 
 
 
+#: Proxy to current app.
 current_app = Proxy(get_current_app)
 current_app = Proxy(get_current_app)
-current_task = Proxy(get_current_task)
 
 
-#: WeakSet does not seem to work properly,
-#: it doesn't recognize when objects go out of scope.
-_apps = set()
+#: Proxy to current task.
+current_task = Proxy(get_current_task)
 
 
 
 
 def _register_app(app):
 def _register_app(app):

+ 1 - 1
celery/app/builtins.py

@@ -254,7 +254,7 @@ def add_chord_task(app):
                 return self.apply(args, kwargs, **options)
                 return self.apply(args, kwargs, **options)
             group_id = options.pop('group_id', None)
             group_id = options.pop('group_id', None)
             chord = options.pop('chord', None)
             chord = options.pop('chord', None)
-            header, body = (list(kwargs['header']),
+            header, body = (list(maybe_subtask(kwargs['header'])),
                             maybe_subtask(kwargs['body']))
                             maybe_subtask(kwargs['body']))
             if group_id:
             if group_id:
                 body.set(group_id=group_id)
                 body.set(group_id=group_id)

+ 26 - 13
celery/canvas.py

@@ -201,7 +201,8 @@ class chain(Signature):
 
 
     def __init__(self, *tasks, **options):
     def __init__(self, *tasks, **options):
         tasks = tasks[0] if len(tasks) == 1 and is_list(tasks[0]) else tasks
         tasks = tasks[0] if len(tasks) == 1 and is_list(tasks[0]) else tasks
-        Signature.__init__(self, 'celery.chain', (), {'tasks': tasks}, options)
+        Signature.__init__(self, 'celery.chain', (), {'tasks': tasks},
+                           options, immutable=True)
         self.tasks = tasks
         self.tasks = tasks
         self.subtask_type = 'chain'
         self.subtask_type = 'chain'
 
 
@@ -223,7 +224,7 @@ class _basemap(Signature):
 
 
     def __init__(self, task, it, **options):
     def __init__(self, task, it, **options):
         Signature.__init__(self, self._task_name, (),
         Signature.__init__(self, self._task_name, (),
-                {'task': task, 'it': regen(it)}, **options)
+                {'task': task, 'it': regen(it)}, immutable=True, **options)
 
 
     def apply_async(self, args=(), kwargs={}, **opts):
     def apply_async(self, args=(), kwargs={}, **opts):
         # need to evaluate generators
         # need to evaluate generators
@@ -259,22 +260,21 @@ class chunks(Signature):
 
 
     def __init__(self, task, it, n, **options):
     def __init__(self, task, it, n, **options):
         Signature.__init__(self, 'celery.chunks', (),
         Signature.__init__(self, 'celery.chunks', (),
-                {'task': task, 'it': regen(it), 'n': n}, **options)
+                {'task': task, 'it': regen(it), 'n': n},
+                immutable=True, **options)
 
 
     @classmethod
     @classmethod
     def from_dict(self, d):
     def from_dict(self, d):
         return chunks(*self._unpack_args(d['kwargs']), **d['options'])
         return chunks(*self._unpack_args(d['kwargs']), **d['options'])
 
 
     def apply_async(self, args=(), kwargs={}, **opts):
     def apply_async(self, args=(), kwargs={}, **opts):
-        # need to evaluate generators
-        task, it, n = self._unpack_args(self.kwargs)
-        return self.type.apply_async((),
-                {'task': task, 'it': list(it), 'n': n}, **opts)
+        return self.group().apply_async(args, kwargs, **opts)
 
 
     def __call__(self, **options):
     def __call__(self, **options):
         return self.group()(**options)
         return self.group()(**options)
 
 
     def group(self):
     def group(self):
+        # need to evaluate generators
         task, it, n = self._unpack_args(self.kwargs)
         task, it, n = self._unpack_args(self.kwargs)
         return group(xstarmap(task, part) for part in _chunks(iter(it), n))
         return group(xstarmap(task, part) for part in _chunks(iter(it), n))
 
 
@@ -284,12 +284,21 @@ class chunks(Signature):
 Signature.register_type(chunks)
 Signature.register_type(chunks)
 
 
 
 
+def _maybe_group(tasks):
+    if isinstance(tasks, group):
+        tasks = list(tasks.tasks)
+    else:
+        tasks = regen(tasks if is_list(tasks) else tasks)
+    return tasks
+
+
 class group(Signature):
 class group(Signature):
 
 
     def __init__(self, *tasks, **options):
     def __init__(self, *tasks, **options):
-        tasks = regen(tasks[0] if len(tasks) == 1 and is_list(tasks[0])
-                               else tasks)
-        Signature.__init__(self, 'celery.group', (), {'tasks': tasks}, options)
+        if len(tasks) == 1:
+            tasks = _maybe_group(tasks[0])
+        Signature.__init__(self, 'celery.group', (),
+                {'tasks': tasks}, options, immutable=True)
         self.tasks, self.subtask_type = tasks, 'group'
         self.tasks, self.subtask_type = tasks, 'group'
 
 
     @classmethod
     @classmethod
@@ -307,6 +316,9 @@ class group(Signature):
             task.set(countdown=_next_skew())
             task.set(countdown=_next_skew())
         return self
         return self
 
 
+    def __iter__(self):
+        return iter(self.tasks)
+
     def __repr__(self):
     def __repr__(self):
         return repr(self.tasks)
         return repr(self.tasks)
 Signature.register_type(group)
 Signature.register_type(group)
@@ -317,8 +329,9 @@ class chord(Signature):
 
 
     def __init__(self, header, body=None, **options):
     def __init__(self, header, body=None, **options):
         Signature.__init__(self, 'celery.chord', (),
         Signature.__init__(self, 'celery.chord', (),
-                         {'header': regen(header),
-                          'body': maybe_subtask(body)}, options)
+                         {'header': _maybe_group(header),
+                          'body': maybe_subtask(body)},
+                         options, immutable=True)
         self.subtask_type = 'chord'
         self.subtask_type = 'chord'
 
 
     @classmethod
     @classmethod
@@ -329,7 +342,7 @@ class chord(Signature):
 
 
     def __call__(self, body=None, **options):
     def __call__(self, body=None, **options):
         _chord = self.Chord
         _chord = self.Chord
-        self.kwargs['body'] = body or self.kwargs['body']
+        body = self.kwargs['body'] = body or self.kwargs['body']
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
             return self.apply((), {}, **options)
             return self.apply((), {}, **options)
         callback_id = body.options.setdefault('task_id', uuid())
         callback_id = body.options.setdefault('task_id', uuid())