Ver Fonte

Fix issue #1628

Michael Permana há 9 anos atrás
pai
commit
580f06be22
3 ficheiros alterados com 12 adições e 1 exclusões
  1. 1 0
      celery/app/defaults.py
  2. 7 0
      celery/app/task.py
  3. 4 1
      celery/worker/request.py

+ 1 - 0
celery/app/defaults.py

@@ -132,6 +132,7 @@ NAMESPACES = {
         'REDIS_DB': Option(type='int', **_REDIS_OLD),
         'REDIS_PASSWORD': Option(type='string', **_REDIS_OLD),
         'REDIS_MAX_CONNECTIONS': Option(type='int'),
+        'REJECT_ON_WORKER_LOST': Option(type='bool'),
         'RESULT_BACKEND': Option(type='string'),
         'RESULT_DB_SHORT_LIVED_SESSIONS': Option(False, type='bool'),
         'RESULT_DB_TABLENAMES': Option(type='dict'),

+ 7 - 0
celery/app/task.py

@@ -220,6 +220,12 @@ class Task(object):
     #: :setting:`CELERY_ACKS_LATE` setting.
     acks_late = None
 
+    #: When CELERY_ACKS_LATE is set to True, the default behavior to
+    #: handle worker crash is to acknowledge the message. Setting
+    #: this to true allows the message to be rejected and requeued so
+    #: it will be executed again by another worker.
+    reject_on_worker_lost = None
+
     #: Tuple of expected exceptions.
     #:
     #: These are errors that are expected in normal operation
@@ -248,6 +254,7 @@ class Task(object):
         ('rate_limit', 'CELERY_DEFAULT_RATE_LIMIT'),
         ('track_started', 'CELERY_TRACK_STARTED'),
         ('acks_late', 'CELERY_ACKS_LATE'),
+        ('reject_on_worker_lost', 'CELERY_REJECT_ON_WORKER_LOST'),
         ('ignore_result', 'CELERY_IGNORE_RESULT'),
         ('store_errors_even_if_ignored',
             'CELERY_STORE_ERRORS_EVEN_IF_IGNORED'),

+ 4 - 1
celery/worker/request.py

@@ -352,7 +352,10 @@ class Request(object):
                 )
         # (acks_late) acknowledge after result stored.
         if self.task.acks_late:
-            self.acknowledge()
+            if self.task.reject_on_worker_lost and isinstance(exc, WorkerLostError):
+                self.reject(True)
+            else:
+                self.acknowledge()
 
         if send_failed_event:
             self.send_event(