소스 검색

Add task.acks_on_failure_or_timeout option (#4970)

* Add task.acks_on_failure_or_timeout option

As shown in https://github.com/celery/celery/issues/4797, acknowledging
SQS messages on failure or timing out makes it hard to use dead letter
queues. This change is introducing new option
acks_on_failure_or_timeout, making sure we can totally fallback on
native SQS message lifecycle, using redeliveries for retries (in case of
slow processing or failure) and transitions to dead letter queue after
defined number of times.

* Fix style
Mario Kostelac 6 년 전
부모
커밋
90da2b3bb0
3개의 변경된 파일53개의 추가작업 그리고 4개의 파일을 삭제
  1. 8 0
      celery/app/task.py
  2. 4 3
      celery/worker/request.py
  3. 41 1
      t/unit/worker/test_request.py

+ 8 - 0
celery/app/task.py

@@ -250,6 +250,13 @@ class Task(object):
     #: :setting:`task_acks_late` setting.
     acks_late = None
 
+    #: When enabled messages for this task will be acknowledged even if it
+    #: fails or times out.
+    #:
+    #: The application default can be overridden with the
+    #: :setting:`task_acks_on_failure_or_timeout` setting.
+    acks_on_failure_or_timeout = True
+
     #: Even if :attr:`acks_late` is enabled, the worker will
     #: acknowledge tasks when the worker process executing them abruptly
     #: exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).
@@ -295,6 +302,7 @@ class Task(object):
         ('rate_limit', 'task_default_rate_limit'),
         ('track_started', 'task_track_started'),
         ('acks_late', 'task_acks_late'),
+        ('acks_on_failure_or_timeout', 'task_acks_on_failure_or_timeout'),
         ('reject_on_worker_lost', 'task_reject_on_worker_lost'),
         ('ignore_result', 'task_ignore_result'),
         ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored'),

+ 4 - 3
celery/worker/request.py

@@ -315,7 +315,7 @@ class Request(object):
                 self.id, exc, request=self, store_result=self.store_errors,
             )
 
-            if self.task.acks_late:
+            if self.task.acks_late and self.task.acks_on_failure_or_timeout:
                 self.acknowledge()
 
     def on_success(self, failed__retval__runtime, **kwargs):
@@ -368,15 +368,16 @@ class Request(object):
             )
         # (acks_late) acknowledge after result stored.
         if self.task.acks_late:
-            requeue = not self.delivery_info.get('redelivered')
             reject = (
                 self.task.reject_on_worker_lost and
                 isinstance(exc, WorkerLostError)
             )
+            ack = self.task.acks_on_failure_or_timeout
             if reject:
+                requeue = not self.delivery_info.get('redelivered')
                 self.reject(requeue=requeue)
                 send_failed_event = False
-            else:
+            elif ack:
                 self.acknowledge()
 
         if send_failed_event:

+ 41 - 1
t/unit/worker/test_request.py

@@ -616,13 +616,25 @@ class test_Request(RequestCase):
             job.on_failure(exc_info)
             assert job.acknowledged
 
+    def test_on_failure_acks_on_failure_or_timeout(self):
+        job = self.xRequest()
+        job.time_start = 1
+        self.mytask.acks_late = True
+        self.mytask.acks_on_failure_or_timeout = False
+        try:
+            raise KeyError('foo')
+        except KeyError:
+            exc_info = ExceptionInfo()
+            job.on_failure(exc_info)
+            assert job.acknowledged is False
+
     def test_from_message_invalid_kwargs(self):
         m = self.TaskMessage(self.mytask.name, args=(), kwargs='foo')
         req = Request(m, app=self.app)
         with pytest.raises(InvalidTaskError):
             raise req.execute().exception
 
-    def test_on_hard_timeout(self, patching):
+    def test_on_hard_timeout_acks_late(self, patching):
         error = patching('celery.worker.request.error')
 
         job = self.xRequest()
@@ -639,6 +651,34 @@ class test_Request(RequestCase):
         job.on_timeout(soft=False, timeout=1335)
         job.acknowledge.assert_not_called()
 
+    def test_on_hard_timeout_acks_on_failure_or_timeout(self, patching):
+        error = patching('celery.worker.request.error')
+
+        job = self.xRequest()
+        job.acknowledge = Mock(name='ack')
+        job.task.acks_late = True
+        job.task.acks_on_failure_or_timeout = True
+        job.on_timeout(soft=False, timeout=1337)
+        assert 'Hard time limit' in error.call_args[0][0]
+        assert self.mytask.backend.get_status(job.id) == states.FAILURE
+        job.acknowledge.assert_called_with()
+
+        job = self.xRequest()
+        job.acknowledge = Mock(name='ack')
+        job.task.acks_late = True
+        job.task.acks_on_failure_or_timeout = False
+        job.on_timeout(soft=False, timeout=1337)
+        assert 'Hard time limit' in error.call_args[0][0]
+        assert self.mytask.backend.get_status(job.id) == states.FAILURE
+        job.acknowledge.assert_not_called()
+
+        job = self.xRequest()
+        job.acknowledge = Mock(name='ack')
+        job.task.acks_late = False
+        job.task.acks_on_failure_or_timeout = True
+        job.on_timeout(soft=False, timeout=1335)
+        job.acknowledge.assert_not_called()
+
     def test_on_soft_timeout(self, patching):
         warn = patching('celery.worker.request.warn')