Procházet zdrojové kódy

Only reject and requeue on non redelivered message to avoid infinite crash

Conflicts:
	celery/worker/request.py
Michael Permana před 9 roky
rodič
revize
01f921adee
3 změnil soubory, kde provedl 13 přidání a 3 odebrání
  1. 1 0
      celery/app/defaults.py
  2. 7 0
      celery/app/task.py
  3. 5 3
      celery/worker/request.py

+ 1 - 0
celery/app/defaults.py

@@ -132,6 +132,7 @@ NAMESPACES = {
         'REDIS_DB': Option(type='int', **_REDIS_OLD),
         'REDIS_PASSWORD': Option(type='string', **_REDIS_OLD),
         'REDIS_MAX_CONNECTIONS': Option(type='int'),
+        'REJECT_ON_WORKER_LOST': Option(type='bool'),
         'RESULT_BACKEND': Option(type='string'),
         'RESULT_DB_SHORT_LIVED_SESSIONS': Option(False, type='bool'),
         'RESULT_DB_TABLENAMES': Option(type='dict'),

+ 7 - 0
celery/app/task.py

@@ -220,6 +220,12 @@ class Task(object):
     #: :setting:`CELERY_ACKS_LATE` setting.
     acks_late = None
 
+    #: When CELERY_ACKS_LATE is set to True, the default behavior to
+    #: handle worker crash is to acknowledge the message. Setting
+    #: this to true allows the message to be rejected and requeued so
+    #: it will be executed again by another worker.
+    reject_on_worker_lost = None
+
     #: Tuple of expected exceptions.
     #:
     #: These are errors that are expected in normal operation
@@ -248,6 +254,7 @@ class Task(object):
         ('rate_limit', 'CELERY_DEFAULT_RATE_LIMIT'),
         ('track_started', 'CELERY_TRACK_STARTED'),
         ('acks_late', 'CELERY_ACKS_LATE'),
+        ('reject_on_worker_lost', 'CELERY_REJECT_ON_WORKER_LOST'),
         ('ignore_result', 'CELERY_IGNORE_RESULT'),
         ('store_errors_even_if_ignored',
             'CELERY_STORE_ERRORS_EVEN_IF_IGNORED'),

+ 5 - 3
celery/worker/request.py

@@ -326,7 +326,6 @@ class Request(object):
     def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
         """Handler called if the task raised an exception."""
         task_ready(self)
-
         if isinstance(exc_info.exception, MemoryError):
             raise MemoryError('Process got: %s' % (exc_info.exception,))
         elif isinstance(exc_info.exception, Reject):
@@ -352,8 +351,11 @@ class Request(object):
                 )
         # (acks_late) acknowledge after result stored.
         if self.task.acks_late:
-            if isinstance(exc, WorkerLostError):
-                self.reject(True)
+            reject_and_requeue = (self.task.reject_on_worker_lost and
+                isinstance(exc, WorkerLostError) and
+                not self.delivery_info.get('redelivered', False))
+            if reject_and_requeue:
+                self.reject(requeue=True)
             else:
                 self.acknowledge()