Browse Source

Merge pull request #2872 from mpermana/master

use reject+requeue=False when redelivered is not known,
Ask Solem Hoel 9 years ago
parent
commit
de2941195d
2 changed files with 20 additions and 7 deletions
  1. 15 1
      celery/tests/worker/test_request.py
  2. 5 6
      celery/worker/request.py

+ 15 - 1
celery/tests/worker/test_request.py

@@ -325,7 +325,7 @@ class test_Request(AppCase):
             req_logger, req.connection_errors, True,
         )
 
-    def test_on_failure_WrokerLostError_rejects_with_requeue(self):
+    def test_on_failure_WorkerLostError_rejects_with_requeue(self):
         einfo = None
         try:
             raise WorkerLostError()
@@ -339,6 +339,20 @@ class test_Request(AppCase):
         req.on_reject.assert_called_with(
             req_logger, req.connection_errors, True)
 
+    def test_on_failure_WorkerLostError_redelivered_None(self):
+        einfo = None
+        try:
+            raise WorkerLostError()
+        except:
+            einfo = ExceptionInfo(internal=True)
+        req = self.get_request(self.add.s(2, 2))
+        req.task.acks_late = True
+        req.task.reject_on_worker_lost = True
+        req.delivery_info['redelivered'] = None
+        req.on_failure(einfo)
+        req.on_reject.assert_called_with(
+            req_logger, req.connection_errors, False)
+
     def test_tzlocal_is_cached(self):
         req = self.get_request(self.add.s(2, 2))
         req._tzlocal = 'foo'

+ 5 - 6
celery/worker/request.py

@@ -355,12 +355,11 @@ class Request(object):
             )
         # (acks_late) acknowledge after result stored.
         if self.task.acks_late:
-            reject_and_requeue = (
-                self.task.reject_on_worker_lost and
-                isinstance(exc, WorkerLostError) and
-                self.delivery_info.get('redelivered', False) is False)
-            if reject_and_requeue:
-                self.reject(requeue=True)
+            requeue = self.delivery_info.get('redelivered', None) is False
+            reject = (self.task.reject_on_worker_lost and
+                isinstance(exc, WorkerLostError))
+            if reject:
+                self.reject(requeue=requeue)
             else:
                 self.acknowledge()