Browse Source

[settings] Removes the CHORD_PROPAGATES setting

Ask Solem 9 years ago
parent
commit
5f019dfa6f
5 changed files with 13 additions and 38 deletions
  1. 5 14
      celery/app/builtins.py
  2. 0 5
      celery/app/defaults.py
  3. 3 5
      celery/backends/base.py
  4. 2 4
      celery/canvas.py
  5. 3 10
      celery/tests/backends/test_base.py

+ 5 - 14
celery/app/builtins.py

@@ -54,20 +54,12 @@ def add_unlock_chord_task(app):
     from celery.exceptions import ChordError
     from celery.result import allow_join_result, result_from_tuple
 
-    default_propagate = app.conf.chord_propagates
-
     @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
               default_retry_delay=1, ignore_result=True, lazy=False, bind=True)
-    def unlock_chord(self, group_id, callback, interval=None, propagate=None,
+    def unlock_chord(self, group_id, callback, interval=None,
                      max_retries=None, result=None,
                      Result=app.AsyncResult, GroupResult=app.GroupResult,
-                     result_from_tuple=result_from_tuple):
-        # if propagate is disabled exceptions raised by chord tasks
-        # will be sent as part of the result list to the chord callback.
-        # Since 3.1 propagate will be enabled by default, and instead
-        # the chord callback changes state to FAILURE with the
-        # exception set to ChordError.
-        propagate = default_propagate if propagate is None else propagate
+                     result_from_tuple=result_from_tuple, **kwargs):
         if interval is None:
             interval = self.default_retry_delay
 
@@ -93,7 +85,7 @@ def add_unlock_chord_task(app):
         callback = maybe_signature(callback, app=app)
         try:
             with allow_join_result():
-                ret = j(timeout=3.0, propagate=propagate)
+                ret = j(timeout=3.0, propagate=True)
         except Exception as exc:
             try:
                 culprit = next(deps._failed_join_report())
@@ -191,8 +183,7 @@ def add_chord_task(app):
     @app.task(name='celery.chord', bind=True, ignore_result=False,
               shared=False, lazy=False)
     def chord(self, header, body, partial_args=(), interval=None,
-              countdown=1, max_retries=None, propagate=None,
-              eager=False, **kwargs):
+              countdown=1, max_retries=None, eager=False, **kwargs):
         app = self.app
         # - convert back to group if serialized
         tasks = header.tasks if isinstance(header, group) else header
@@ -202,5 +193,5 @@ def add_chord_task(app):
         body = maybe_signature(body, app=app)
         ch = _chord(header, body)
         return ch.run(header, body, partial_args, app, interval,
-                      countdown, max_retries, propagate, **kwargs)
+                      countdown, max_retries, **kwargs)
     return chord

+ 0 - 5
celery/app/defaults.py

@@ -131,11 +131,6 @@ NAMESPACES = Namespace(
         table=Option(type='string'),
         write_consistency=Option(type='string'),
     ),
-    chord=Namespace(
-        __old__=old_ns('celery_chord'),
-
-        propagates=Option(True, type='bool'),
-    ),
     couchbase=Namespace(
         __old__=old_ns('celery_couchbase'),
 

+ 3 - 5
celery/backends/base.py

@@ -359,7 +359,7 @@ class BaseBackend(object):
     def add_to_chord(self, chord_id, result):
         raise NotImplementedError('Backend does not support add_to_chord')
 
-    def on_chord_part_return(self, request, state, result, propagate=False):
+    def on_chord_part_return(self, request, state, result, **kwargs):
         pass
 
     def fallback_chord_unlock(self, group_id, body, result=None,
@@ -553,12 +553,10 @@ class KeyValueStoreBackend(BaseBackend):
 
         return header(*partial_args, task_id=group_id, **fixed_options or {})
 
-    def on_chord_part_return(self, request, state, result, propagate=None):
+    def on_chord_part_return(self, request, state, result, **kwargs):
         if not self.implements_incr:
             return
         app = self.app
-        if propagate is None:
-            propagate = app.conf.chord_propagates
         gid = request.group
         if not gid:
             return
@@ -593,7 +591,7 @@ class KeyValueStoreBackend(BaseBackend):
             j = deps.join_native if deps.supports_native_join else deps.join
             try:
                 with allow_join_result():
-                    ret = j(timeout=3.0, propagate=propagate)
+                    ret = j(timeout=3.0, propagate=True)
             except Exception as exc:
                 try:
                     culprit = next(deps._failed_join_report())

+ 2 - 4
celery/canvas.py

@@ -939,11 +939,9 @@ class chord(Signature):
         return sum(self._traverse_tasks(self.tasks, 1))
 
     def run(self, header, body, partial_args, app=None, interval=None,
-            countdown=1, max_retries=None, propagate=None, eager=False,
+            countdown=1, max_retries=None, eager=False,
             task_id=None, **options):
         app = app or self._get_app(body)
-        propagate = (app.conf.chord_propagates
-                     if propagate is None else propagate)
         group_id = uuid()
         root_id = body.options.get('root_id')
         body.chord_size = self.__length_hint__()
@@ -960,7 +958,7 @@ class chord(Signature):
             header, partial_args, group_id, body,
             interval=interval, countdown=countdown,
             options=options, max_retries=max_retries,
-            propagate=propagate, result=results)
+            result=results)
         bodyres.parent = parent
         return bodyres
 

+ 3 - 10
celery/tests/backends/test_base.py

@@ -328,24 +328,17 @@ class test_KeyValueStoreBackend(AppCase):
 
     def test_chord_part_return_propagate_set(self):
         with self._chord_part_context(self.b) as (task, deps, _):
-            self.b.on_chord_part_return(
-                task.request, 'SUCCESS', 10, propagate=True,
-            )
+            self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
             self.assertFalse(self.b.expire.called)
             deps.delete.assert_called_with()
             deps.join_native.assert_called_with(propagate=True, timeout=3.0)
 
     def test_chord_part_return_propagate_default(self):
         with self._chord_part_context(self.b) as (task, deps, _):
-            self.b.on_chord_part_return(
-                task.request, 'SUCCESS', 10, propagate=None,
-            )
+            self.b.on_chord_part_return(task.request, 'SUCCESS', 10)
             self.assertFalse(self.b.expire.called)
             deps.delete.assert_called_with()
-            deps.join_native.assert_called_with(
-                propagate=self.b.app.conf.chord_propagates,
-                timeout=3.0,
-            )
+            deps.join_native.assert_called_with(propagate=True, timeout=3.0)
 
     def test_chord_part_return_join_raises_internal(self):
         with self._chord_part_context(self.b) as (task, deps, callback):