Kaynağa Gözat

Run chord_unlock on same queue as chord body - fixes #4337 (#4448)

Alex Hill 7 yıl önce
ebeveyn
işleme
fde58ad677
2 değiştirilmiş dosya ile 35 ekleme ve 2 silme
  1. 4 1
      celery/backends/base.py
  2. 31 1
      t/unit/backends/test_base.py

+ 4 - 1
celery/backends/base.py

@@ -415,8 +415,11 @@ class Backend(object):
     def fallback_chord_unlock(self, header_result, body, countdown=1,
                               **kwargs):
         kwargs['result'] = [r.as_tuple() for r in header_result]
+        queue = body.options.get('queue', getattr(body.type, 'queue', None))
         self.app.tasks['celery.chord_unlock'].apply_async(
-            (header_result.id, body,), kwargs, countdown=countdown,
+            (header_result.id, body,), kwargs,
+            countdown=countdown,
+            queue=queue,
         )
 
     def ensure_chords_allowed(self):

+ 31 - 1
t/unit/backends/test_base.py

@@ -63,6 +63,12 @@ class test_BaseBackend_interface:
     def setup(self):
         self.b = BaseBackend(self.app)
 
+        @self.app.task(shared=False)
+        def callback(result):
+            pass
+
+        self.callback = callback
+
     def test__forget(self):
         with pytest.raises(NotImplementedError):
             self.b._forget('SOMExx-N0Nex1stant-IDxx-')
@@ -80,9 +86,33 @@ class test_BaseBackend_interface:
             uuid(),
             [self.app.AsyncResult(x) for x in range(3)],
         )
-        self.b.apply_chord(header_result, None)
+        self.b.apply_chord(header_result, self.callback.s())
         assert self.app.tasks[unlock].apply_async.call_count
 
+    def test_chord_unlock_queue(self, unlock='celery.chord_unlock'):
+        self.app.tasks[unlock] = Mock()
+        header_result = self.app.GroupResult(
+            uuid(),
+            [self.app.AsyncResult(x) for x in range(3)],
+        )
+        body = self.callback.s()
+
+        self.b.apply_chord(header_result, body)
+        called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
+        assert called_kwargs['queue'] is None
+
+        self.b.apply_chord(header_result, body.set(queue='test_queue'))
+        called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
+        assert called_kwargs['queue'] == 'test_queue'
+
+        @self.app.task(shared=False, queue='test_queue_two')
+        def callback_queue(result):
+            pass
+
+        self.b.apply_chord(header_result, callback_queue.s())
+        called_kwargs = self.app.tasks[unlock].apply_async.call_args[1]
+        assert called_kwargs['queue'] == 'test_queue_two'
+
 
 class test_exception_pickle: