Parcourir la source

Merge pull request #2840 from mpermana/master

Fix issue #1628
Omer Katz il y a 10 ans
Parent
commit
8e31670bb7
4 fichiers modifiés avec 29 ajouts et 2 suppressions
  1. 1 0
      celery/app/defaults.py
  2. 7 0
      celery/app/task.py
  3. 14 0
      celery/tests/worker/test_request.py
  4. 7 2
      celery/worker/request.py

+ 1 - 0
celery/app/defaults.py

@@ -132,6 +132,7 @@ NAMESPACES = {
         'REDIS_DB': Option(type='int', **_REDIS_OLD),
         'REDIS_PASSWORD': Option(type='string', **_REDIS_OLD),
         'REDIS_MAX_CONNECTIONS': Option(type='int'),
+        'REJECT_ON_WORKER_LOST': Option(type='bool'),
         'RESULT_BACKEND': Option(type='string'),
         'RESULT_DB_SHORT_LIVED_SESSIONS': Option(False, type='bool'),
         'RESULT_DB_TABLENAMES': Option(type='dict'),

+ 7 - 0
celery/app/task.py

@@ -220,6 +220,12 @@ class Task(object):
     #: :setting:`CELERY_ACKS_LATE` setting.
     acks_late = None
 
+    #: When CELERY_ACKS_LATE is set to True, the default behavior to
+    #: handle worker crash is to acknowledge the message. Setting
+    #: this to true allows the message to be rejected and requeued so
+    #: it will be executed again by another worker.
+    reject_on_worker_lost = None
+
     #: Tuple of expected exceptions.
     #:
     #: These are errors that are expected in normal operation
@@ -248,6 +254,7 @@ class Task(object):
         ('rate_limit', 'CELERY_DEFAULT_RATE_LIMIT'),
         ('track_started', 'CELERY_TRACK_STARTED'),
         ('acks_late', 'CELERY_ACKS_LATE'),
+        ('reject_on_worker_lost', 'CELERY_REJECT_ON_WORKER_LOST'),
         ('ignore_result', 'CELERY_IGNORE_RESULT'),
         ('store_errors_even_if_ignored',
             'CELERY_STORE_ERRORS_EVEN_IF_IGNORED'),

+ 14 - 0
celery/tests/worker/test_request.py

@@ -325,6 +325,20 @@ class test_Request(AppCase):
             req_logger, req.connection_errors, True,
         )
 
+    def test_on_failure_WrokerLostError_rejects_with_requeue(self):
+        einfo = None
+        try:
+            raise WorkerLostError()
+        except:
+            einfo = ExceptionInfo(internal=True)
+        req = self.get_request(self.add.s(2, 2))
+        req.task.acks_late = True
+        req.task.reject_on_worker_lost = True
+        req.delivery_info['redelivered'] = False
+        req.on_failure(einfo)
+        req.on_reject.assert_called_with(req_logger,
+            req.connection_errors, True)
+
     def test_tzlocal_is_cached(self):
         req = self.get_request(self.add.s(2, 2))
         req._tzlocal = 'foo'

+ 7 - 2
celery/worker/request.py

@@ -326,7 +326,6 @@ class Request(object):
     def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
         """Handler called if the task raised an exception."""
         task_ready(self)
-
         if isinstance(exc_info.exception, MemoryError):
             raise MemoryError('Process got: %s' % (exc_info.exception,))
         elif isinstance(exc_info.exception, Reject):
@@ -352,7 +351,13 @@ class Request(object):
                 )
         # (acks_late) acknowledge after result stored.
         if self.task.acks_late:
-            self.acknowledge()
+            reject_and_requeue = (self.task.reject_on_worker_lost and
+                isinstance(exc, WorkerLostError) and
+                self.delivery_info.get('redelivered', False) is False)
+            if reject_and_requeue:
+                self.reject(requeue=True)
+            else:
+                self.acknowledge()
 
         if send_failed_event:
             self.send_event(