Browse Source

request on_timeout now ignores soft time limit exception (fixes #4412) (#4473)

* request on_timeout now ignores soft time limit exception (closes #4412)

* fix quality
Alex Garel 7 years ago
parent
commit
0ffd36fbf9
6 changed files with 68 additions and 19 deletions
  1. 1 0
      CONTRIBUTORS.txt
  2. 9 0
      Changelog
  3. 8 9
      celery/worker/request.py
  4. 11 0
      t/integration/tasks.py
  5. 22 1
      t/integration/test_canvas.py
  6. 17 9
      t/unit/worker/test_request.py

+ 1 - 0
CONTRIBUTORS.txt

@@ -254,3 +254,4 @@ Andrew Wong, 2017/09/07
 Arpan Shah, 2017/09/12
 Tobias 'rixx' Kunze, 2017/08/20
 Mikhail Wolfson, 2017/12/11
+Alex Garel, 2018/01/04

+ 9 - 0
Changelog

@@ -8,6 +8,15 @@ This document contains change notes for bugfix releases in
 the 4.1.x series (latentcall), please see :ref:`whatsnew-4.1` for
 an overview of what's new in Celery 4.1.
 
+Unreleased
+==========
+
+- **Canvas**: request on_timeout now ignores soft time limit exception
+  to give a chance for task to recover.
+
+  Contributed by **Alex Garel**
+
+
 .. _version-4.1.0:
 
 4.1.0

+ 8 - 9
celery/worker/request.py

@@ -18,8 +18,8 @@ from kombu.utils.objects import cached_property
 from celery import signals
 from celery.app.trace import trace_task, trace_task_ret
 from celery.exceptions import (Ignore, InvalidTaskError, Reject, Retry,
-                               SoftTimeLimitExceeded, TaskRevokedError,
-                               Terminated, TimeLimitExceeded, WorkerLostError)
+                               TaskRevokedError, Terminated,
+                               TimeLimitExceeded, WorkerLostError)
 from celery.five import python_2_unicode_compatible, string
 from celery.platforms import signals as _signals
 from celery.utils.functional import maybe, noop
@@ -299,22 +299,21 @@ class Request(object):
 
     def on_timeout(self, soft, timeout):
         """Handler called if the task times out."""
-        task_ready(self)
         if soft:
             warn('Soft time limit (%ss) exceeded for %s[%s]',
                  timeout, self.name, self.id)
-            exc = SoftTimeLimitExceeded(soft)
         else:
+            task_ready(self)
             error('Hard time limit (%ss) exceeded for %s[%s]',
                   timeout, self.name, self.id)
             exc = TimeLimitExceeded(timeout)
 
-        self.task.backend.mark_as_failure(
-            self.id, exc, request=self, store_result=self.store_errors,
-        )
+            self.task.backend.mark_as_failure(
+                self.id, exc, request=self, store_result=self.store_errors,
+            )
 
-        if self.task.acks_late:
-            self.acknowledge()
+            if self.task.acks_late:
+                self.acknowledge()
 
     def on_success(self, failed__retval__runtime, **kwargs):
         """Handler called if the task was successfully processed."""

+ 11 - 0
t/integration/tasks.py

@@ -4,6 +4,7 @@ from __future__ import absolute_import, unicode_literals
 from time import sleep
 
 from celery import chain, group, shared_task
+from celery.exceptions import SoftTimeLimitExceeded
 from celery.utils.log import get_task_logger
 
 logger = get_task_logger(__name__)
@@ -24,6 +25,16 @@ def delayed_sum(numbers, pause_time=1):
     return sum(numbers)
 
 
+@shared_task
+def delayed_sum_with_soft_guard(numbers, pause_time=1):
+    """Sum the iterable of numbers."""
+    try:
+        sleep(pause_time)
+        return sum(numbers)
+    except SoftTimeLimitExceeded:
+        return 0
+
+
 @shared_task(bind=True)
 def add_replaced(self, x, y):
     """Add two numbers (via the add task)."""

+ 22 - 1
t/integration/test_canvas.py

@@ -11,7 +11,8 @@ from celery.result import AsyncResult, GroupResult
 
 from .conftest import flaky
 from .tasks import (add, add_replaced, add_to_all, collect_ids, delayed_sum,
-                    ids, redis_echo, second_order_replace1)
+                    delayed_sum_with_soft_guard, ids, redis_echo,
+                    second_order_replace1)
 
 TIMEOUT = 120
 
@@ -110,6 +111,26 @@ class test_chain:
             node = node.parent
             i -= 1
 
+    def test_chord_soft_timeout_recuperation(self, manager):
+        """Test that if soft timeout happens in task but is managed by task,
+        chord still get results normally
+        """
+        if not manager.app.conf.result_backend.startswith('redis'):
+            raise pytest.skip('Requires redis result backend.')
+
+        c = chord([
+            # return 3
+            add.s(1, 2),
+            # return 0 after managing soft timeout
+            delayed_sum_with_soft_guard.s(
+                [100], pause_time=2
+            ).set(
+                soft_time_limit=1
+            ),
+        ])
+        result = c(delayed_sum.s(pause_time=0)).get()
+        assert result == 3
+
 
 class test_group:
 

+ 17 - 9
t/unit/worker/test_request.py

@@ -599,31 +599,39 @@ class test_Request(RequestCase):
         with pytest.raises(InvalidTaskError):
             raise req.execute().exception
 
-    def test_on_timeout(self, patching):
-        warn = patching('celery.worker.request.warn')
+    def test_on_hard_timeout(self, patching):
         error = patching('celery.worker.request.error')
 
         job = self.xRequest()
         job.acknowledge = Mock(name='ack')
         job.task.acks_late = True
-        job.on_timeout(soft=True, timeout=1337)
-        assert 'Soft time limit' in warn.call_args[0][0]
         job.on_timeout(soft=False, timeout=1337)
         assert 'Hard time limit' in error.call_args[0][0]
         assert self.mytask.backend.get_status(job.id) == states.FAILURE
         job.acknowledge.assert_called_with()
 
-        self.mytask.ignore_result = True
         job = self.xRequest()
-        job.on_timeout(soft=True, timeout=1336)
-        assert self.mytask.backend.get_status(job.id) == states.PENDING
+        job.acknowledge = Mock(name='ack')
+        job.task.acks_late = False
+        job.on_timeout(soft=False, timeout=1335)
+        job.acknowledge.assert_not_called()
+
+    def test_on_soft_timeout(self, patching):
+        warn = patching('celery.worker.request.warn')
 
         job = self.xRequest()
         job.acknowledge = Mock(name='ack')
-        job.task.acks_late = False
-        job.on_timeout(soft=True, timeout=1335)
+        job.task.acks_late = True
+        job.on_timeout(soft=True, timeout=1337)
+        assert 'Soft time limit' in warn.call_args[0][0]
+        assert self.mytask.backend.get_status(job.id) == states.PENDING
         job.acknowledge.assert_not_called()
 
+        self.mytask.ignore_result = True
+        job = self.xRequest()
+        job.on_timeout(soft=True, timeout=1336)
+        assert self.mytask.backend.get_status(job.id) == states.PENDING
+
     def test_fast_trace_task(self):
         from celery.app import trace
         setup_worker_optimizations(self.app)