Browse Source

RPC and disabled backend should raise if workflows with chords are used (Issue #3534)

Ask Solem 8 years ago
parent
commit
a883f6871b
5 changed files with 48 additions and 3 deletions
  1. 6 1
      celery/backends/base.py
  2. 12 0
      celery/backends/rpc.py
  3. 1 0
      celery/canvas.py
  4. 8 2
      t/unit/backends/test_base.py
  5. 21 0
      t/unit/backends/test_rpc.py

+ 6 - 1
celery/backends/base.py

@@ -417,8 +417,12 @@ class Backend(object):
             (group_id, body,), kwargs, countdown=countdown,
         )
 
+    def ensure_chords_allowed(self):
+        pass
+
     def apply_chord(self, header, partial_args, group_id, body,
                     options={}, **kwargs):
+        self.ensure_chords_allowed()
         fixed_options = {k: v for k, v in items(options) if k != 'task_id'}
         result = header(*partial_args, task_id=group_id, **fixed_options or {})
         self.fallback_chord_unlock(group_id, body, **kwargs)
@@ -677,6 +681,7 @@ class BaseKeyValueStoreBackend(Backend):
 
     def _apply_chord_incr(self, header, partial_args, group_id, body,
                           result=None, options={}, **kwargs):
+        self.ensure_chords_allowed()
         self.save_group(group_id, self.app.GroupResult(group_id, result))
 
         fixed_options = {k: v for k, v in items(options) if k != 'task_id'}
@@ -760,7 +765,7 @@ class DisabledBackend(BaseBackend):
     def store_result(self, *args, **kwargs):
         pass
 
-    def apply_chord(self, *args, **kwargs):
+    def ensure_chords_allowed(self):
         raise NotImplementedError(E_CHORD_NO_BACKEND.strip())
 
     def _is_disabled(self, *args, **kwargs):

+ 12 - 0
celery/backends/rpc.py

@@ -21,6 +21,15 @@ from .async import AsyncBackendMixin, BaseResultConsumer
 
 __all__ = ['BacklogLimitExceeded', 'RPCBackend']
 
+E_NO_CHORD_SUPPORT = """
+The "rpc" result backend does not support chords!
+
+Note that a group chained with a task is also upgraded to be a chord,
+as this pattern requires synchronization.
+
+Result backends that supports chords: Redis, Database, Memcached, and more.
+"""
+
 
 class BacklogLimitExceeded(Exception):
     """Too much state history to fast-forward."""
@@ -147,6 +156,9 @@ class RPCBackend(base.Backend, AsyncBackendMixin):
         # RPC backend caches the binding, as one queue is used for all tasks.
         return self.binding
 
+    def ensure_chords_allowed(self):
+        raise NotImplementedError(E_NO_CHORD_SUPPORT.strip())
+
     def on_task_call(self, producer, task_id):
         # Called every time a task is sent when using this backend.
         # We declare the queue we receive replies on in advance of sending

+ 1 - 0
celery/canvas.py

@@ -697,6 +697,7 @@ class _chain(Signature):
 
             prev_task, prev_res = task, res
             if isinstance(task, chord):
+                app.backend.ensure_chords_allowed()
                 # If the task is a chord, and the body is a chain
                 # the chain has already been prepared, and res is
                 # set to the last task in the callback chain.

+ 8 - 2
t/unit/backends/test_base.py

@@ -9,7 +9,7 @@ from contextlib import contextmanager
 from case import ANY, Mock, call, patch, skip
 
 from celery import states
-from celery import group, uuid
+from celery import chord, group, uuid
 from celery.backends.base import (
     BaseBackend,
     KeyValueStoreBackend,
@@ -590,10 +590,16 @@ class test_DisabledBackend:
 
     @pytest.mark.celery(result_backend='disabled')
     def test_chord_raises_error(self):
-        from celery import chord
         with pytest.raises(NotImplementedError):
             chord(self.add.s(i, i) for i in range(10))(self.add.s([2]))
 
+    @pytest.mark.celery(result_backend='disabled')
+    def test_chain_with_chord_raises_error(self):
+        with pytest.raises(NotImplementedError):
+            (self.add.s(2, 2) |
+             group(self.add.s(2, 2),
+                   self.add.s(5, 6)) | self.add.s()).delay()
+
 
 class test_as_uri:
 

+ 21 - 0
t/unit/backends/test_rpc.py

@@ -1,6 +1,7 @@
 from __future__ import absolute_import, unicode_literals
 import pytest
 from case import Mock, patch
+from celery import chord, group
 from celery.backends.rpc import RPCBackend
 from celery._state import _task_stack
 
@@ -19,6 +20,26 @@ class test_RPCBackend:
     def test_interface(self):
         self.b.on_reply_declare('task_id')
 
+    def test_ensure_chords_allowed(self):
+        with pytest.raises(NotImplementedError):
+            self.b.ensure_chords_allowed()
+
+    def test_apply_chord(self):
+        with pytest.raises(NotImplementedError):
+            self.b.apply_chord([], (), 'gid', Mock(name='body'))
+
+    @pytest.mark.celery(result_backend='rpc')
+    def test_chord_raises_error(self):
+        with pytest.raises(NotImplementedError):
+            chord(self.add.s(i, i) for i in range(10))(self.add.s([2]))
+
+    @pytest.mark.celery(result_backend='rpc')
+    def test_chain_with_chord_raises_error(self):
+        with pytest.raises(NotImplementedError):
+            (self.add.s(2, 2) |
+             group(self.add.s(2, 2),
+                   self.add.s(5, 6)) | self.add.s()).delay()
+
     def test_destination_for(self):
         req = Mock(name='request')
         req.reply_to = 'reply_to'