Browse Source

Deserialize all tasks in a chain (#4015)

When loading a chain that had been fully serialized to json, deserialization
happened only in part of the original task. Specifically, take a
json-serialized task that looks like the following:

    {
        ...,
        "kwargs": {
            "tasks": [
                {...}, {...}, {...}
            ]
        }
    }

After calling `celery.signature(that_thing_above)`, we get an object that
actually looks like this:

    {
        ...,
        "kwargs": {
            "tasks": [
                task_1, # an instance of celery.Signature
                {...}, {...} # same as before deserialization
            ]
        }
    }

The culprit was `chain.from_dict`, which was converting only the first subtask
of the chain to actual `Signature` instances. This commit changes that
behaviour and converts all subtasks to signatures instead. Without this, some
operations on the chain object, such as calling `chain.on_error(...)` would
cause errors of the form `'dict' object has no attribute 'xyz'`.
Felipe 7 years ago
parent
commit
1735336dad
2 changed files with 12 additions and 2 deletions
  1. 1 2
      celery/canvas.py
  2. 11 0
      t/unit/tasks/test_canvas.py

+ 1 - 2
celery/canvas.py

@@ -531,8 +531,7 @@ class _chain(Signature):
         if tasks:
             if isinstance(tasks, tuple):  # aaaargh
                 tasks = d['kwargs']['tasks'] = list(tasks)
-            # First task must be signature object to get app
-            tasks[0] = maybe_signature(tasks[0], app=app)
+            tasks = [maybe_signature(task, app=app) for task in tasks]
         return _upgrade(d, _chain(tasks, app=app, **d['options']))
 
     def __init__(self, *tasks, **options):

+ 11 - 0
t/unit/tasks/test_canvas.py

@@ -1,4 +1,5 @@
 from __future__ import absolute_import, unicode_literals
+import json
 import pytest
 from case import MagicMock, Mock
 from celery._state import _task_stack
@@ -260,6 +261,16 @@ class test_chain(CanvasCase):
     def test_from_dict_no_tasks(self):
         assert chain.from_dict(dict(chain(app=self.app)), app=self.app)
 
+    def test_from_dict_full_subtasks(self):
+        c = chain(self.add.si(1, 2), self.add.si(3, 4), self.add.si(5, 6))
+
+        serialized = json.loads(json.dumps(c))
+
+        deserialized = chain.from_dict(serialized)
+
+        for task in deserialized.tasks:
+            assert isinstance(task, Signature)
+
     @pytest.mark.usefixtures('depends_on_current_app')
     def test_app_falls_back_to_default(self):
         from celery._state import current_app