Browse Source

Always pass app to maybe_signature

Ask Solem 11 years ago
parent
commit
99ebacd8fd

+ 8 - 7
celery/app/builtins.py

@@ -183,12 +183,11 @@ def add_group_task(app):
             return result
 
         def prepare(self, options, tasks, args, **kwargs):
-            AsyncResult = self.AsyncResult
             options['group_id'] = group_id = (
                 options.setdefault('task_id', uuid()))
 
             def prepare_member(task):
-                task = maybe_signature(task)
+                task = maybe_signature(task, app=app)
                 task.options['group_id'] = group_id
                 return task, task.freeze()
 
@@ -235,7 +234,7 @@ def add_chain_task(app):
             i = 0
             while steps:
                 # First task get partial args from chain.
-                task = maybe_signature(steps.popleft())
+                task = maybe_signature(steps.popleft(), app=app)
                 task = task.clone() if i else task.clone(args)
                 res = task.freeze()
                 i += 1
@@ -327,7 +326,9 @@ def add_chord_task(app):
 
             # - convert back to group if serialized
             tasks = header.tasks if isinstance(header, group) else header
-            header = group([maybe_signature(s).clone() for s in tasks])
+            header = group([
+                maybe_signature(s, app=app).clone() for s in tasks
+            ])
             # - eager applies the group inline
             if eager:
                 return header.apply(args=partial_args, task_id=group_id)
@@ -363,8 +364,8 @@ def add_chord_task(app):
                 return self.apply(args, kwargs, **options)
             header = kwargs.pop('header')
             body = kwargs.pop('body')
-            header, body = (list(maybe_signature(header)),
-                            maybe_signature(body))
+            header, body = (list(maybe_signature(header, app=app)),
+                            maybe_signature(body, app=app))
             # forward certain options to body
             if chord is not None:
                 body.options['chord'] = chord
@@ -382,6 +383,6 @@ def add_chord_task(app):
             body = kwargs['body']
             res = super(Chord, self).apply(args, dict(kwargs, eager=True),
                                            **options)
-            return maybe_signature(body).apply(
+            return maybe_signature(body, app=app).apply(
                 args=(res.get(propagate=propagate).get(), ))
     return Chord

+ 2 - 2
celery/app/trace.py

@@ -233,7 +233,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     I = Info(FAILURE, exc)
                     state, retval = I.state, I.retval
                     R = I.handle_error_state(task, eager=eager)
-                    [signature(errback).apply_async((uuid, ))
+                    [signature(errback, app=app).apply_async((uuid, ))
                         for errback in task_request.errbacks or []]
                 except BaseException as exc:
                     raise
@@ -241,7 +241,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     # callback tasks must be applied before the result is
                     # stored, so that result.children is populated.
                     print('CALLBACKS: %r' % (task_request.callbacks, ))
-                    [signature(callback).apply_async((retval, ))
+                    [signature(callback, app=app).apply_async((retval, ))
                         for callback in task_request.callbacks or []]
                     if publish_result:
                         store_result(

+ 8 - 1
celery/backends/base.py

@@ -463,7 +463,14 @@ class KeyValueStoreBackend(BaseBackend):
         if not gid:
             return
         key = self.get_key_for_chord(gid)
-        deps = GroupResult.restore(gid, backend=task.backend)
+        try:
+            deps = GroupResult.restore(gid, backend=task.backend)
+        except Exception as exc:
+            callback = signature(task.request.chord)
+            return app._tasks[callback.task].backend.fail_from_current_stack(
+                callback.id,
+                exc=ChordError('Cannot restore group: {0!r}'.format(exc)),
+            )
         if deps is None:
             callback = signature(task.request.chord)
             return app._tasks[callback.task].backend.fail_from_current_stack(

+ 4 - 4
celery/canvas.py

@@ -488,7 +488,7 @@ class group(Signature):
             gid = opts['task_id'] = uuid()
         new_tasks, results = [], []
         for task in self.tasks:
-            task = maybe_signature(task).clone()
+            task = maybe_signature(task, app=self._app).clone()
             results.append(task._freeze())
             new_tasks.append(task)
         self.tasks = self.kwargs['tasks'] = new_tasks
@@ -520,7 +520,7 @@ class chord(Signature):
         Signature.__init__(
             self, task, args,
             dict(kwargs, header=_maybe_group(header),
-                 body=maybe_signature(body)), **options
+                 body=maybe_signature(body, app=self._app)), **options
         )
         self.subtask_type = 'chord'
 
@@ -596,8 +596,8 @@ def signature(varies, *args, **kwargs):
 subtask = signature   # XXX compat
 
 
-def maybe_signature(d):
+def maybe_signature(d, app=None):
     if d is not None and isinstance(d, dict) and not isinstance(d, Signature):
-        return signature(d)
+        return signature(d, app=app)
     return d
 maybe_subtask = maybe_signature  # XXX compat

+ 3 - 1
celery/task/sets.py

@@ -40,8 +40,10 @@ class TaskSet(list):
     app = None
 
     def __init__(self, tasks=None, app=None, Publisher=None):
-        super(TaskSet, self).__init__(maybe_signature(t) for t in tasks or [])
         self.app = app_or_default(app or self.app)
+        super(TaskSet, self).__init__(
+            maybe_signature(t, app=self.app) for t in tasks or []
+        )
         self.Publisher = Publisher or self.app.amqp.TaskProducer
         self.total = len(self)  # XXX compat
 

+ 5 - 3
celery/tests/tasks/test_canvas.py

@@ -321,11 +321,13 @@ class test_chord(CanvasCase):
 class test_maybe_signature(CanvasCase):
 
     def test_is_None(self):
-        self.assertIsNone(maybe_signature(None))
+        self.assertIsNone(maybe_signature(None, app=self.app))
 
     def test_is_dict(self):
-        self.assertIsInstance(maybe_signature(dict(self.add.s())), Signature)
+        self.assertIsInstance(
+            maybe_signature(dict(self.add.s()), app=self.app), Signature,
+        )
 
     def test_when_sig(self):
         s = self.add.s()
-        self.assertIs(maybe_signature(s), s)
+        self.assertIs(maybe_signature(s, app=self.app), s)