Browse Source

Added support to bind=True task as link errors (#4545)

* Added support to bind=True task as link errors

Signed-off-by: Bartosz Rabiega <bartosz.rabiega@corp.ovh.com>

* Fixed linting errors

Signed-off-by: Bartosz Rabiega <bartosz.rabiega@corp.ovh.com>

* Added unittests for errbacks in mark_as_failure

Signed-off-by: Bartosz Rabiega <bartosz.rabiega@corp.ovh.com>
BR 7 years ago
parent
commit
bc366c887f
2 changed files with 40 additions and 1 deletions
  1. 7 1
      celery/backends/base.py
  2. 33 0
      t/unit/backends/test_base.py

+ 7 - 1
celery/backends/base.py

@@ -12,6 +12,7 @@ import sys
 import time
 from collections import namedtuple
 from datetime import timedelta
+from functools import partial
 from weakref import WeakValueDictionary
 
 from billiard.einfo import ExceptionInfo
@@ -163,7 +164,12 @@ class Backend(object):
         old_signature = []
         for errback in request.errbacks:
             errback = self.app.signature(errback)
-            if arity_greater(errback.type.__header__, 1):
+            if (
+                # workaround to support tasks with bind=True executed as
+                # link errors. Otherwise retries can't be used
+                not isinstance(errback.type.__header__, partial) and
+                arity_greater(errback.type.__header__, 1)
+            ):
                 errback(request, exc, traceback)
             else:
                 old_signature.append(errback)

+ 33 - 0
t/unit/backends/test_base.py

@@ -218,6 +218,17 @@ class test_BaseBackend_dict:
     def setup(self):
         self.b = DictBackend(app=self.app)
 
+        @self.app.task(shared=False, bind=True)
+        def bound_errback(self, result):
+            pass
+
+        @self.app.task(shared=False)
+        def errback(arg1, arg2):
+            errback.last_result = arg1 + arg2
+
+        self.bound_errback = bound_errback
+        self.errback = errback
+
     def test_delete_group(self):
         self.b.delete_group('can-delete')
         assert 'can-delete' not in self.b._data
@@ -303,6 +314,28 @@ class test_BaseBackend_dict:
         b.mark_as_done('id', 10, request=request)
         b.on_chord_part_return.assert_called_with(request, states.SUCCESS, 10)
 
+    def test_mark_as_failure__bound_errback(self):
+        b = BaseBackend(app=self.app)
+        b._store_result = Mock()
+        request = Mock(name='request')
+        request.errbacks = [
+            self.bound_errback.subtask(args=[1], immutable=True)]
+        exc = KeyError()
+        group = self.patching('celery.backends.base.group')
+        b.mark_as_failure('id', exc, request=request)
+        group.assert_called_with(request.errbacks, app=self.app)
+        group.return_value.apply_async.assert_called_with(
+            (request.id, ), parent_id=request.id, root_id=request.root_id)
+
+    def test_mark_as_failure__errback(self):
+        b = BaseBackend(app=self.app)
+        b._store_result = Mock()
+        request = Mock(name='request')
+        request.errbacks = [self.errback.subtask(args=[2, 3], immutable=True)]
+        exc = KeyError()
+        b.mark_as_failure('id', exc, request=request)
+        assert self.errback.last_result == 5
+
     def test_mark_as_failure__chord(self):
         b = BaseBackend(app=self.app)
         b._store_result = Mock()