Browse Source

Recoverable task connection errors now raises Task.OperationalError. Closes #3283

Ask Solem 8 years ago
parent
commit
e542a94005
2 changed files with 5 additions and 2 deletions
  1. 3 2
      celery/app/base.py
  2. 2 0
      celery/app/task.py

+ 3 - 2
celery/app/base.py

@@ -657,8 +657,9 @@ class Celery(object):
         if connection:
             producer = amqp.Producer(connection)
         with self.producer_or_acquire(producer) as P:
-            self.backend.on_task_call(P, task_id)
-            amqp.send_task_message(P, name, message, **options)
+            with P.connection._reraise_as_library_errors():
+                self.backend.on_task_call(P, task_id)
+                amqp.send_task_message(P, name, message, **options)
         result = (result_cls or self.AsyncResult)(task_id)
         if add_to_parent:
             if not have_parent:

+ 2 - 0
celery/app/task.py

@@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals
 import sys
 
 from billiard.einfo import ExceptionInfo
+from kombu.exceptions import OperationalError
 from kombu.utils.uuid import uuid
 
 from celery import current_app, group
@@ -147,6 +148,7 @@ class Task(object):
     __v2_compat__ = False  # set by old base in celery.task.base
 
     MaxRetriesExceededError = MaxRetriesExceededError
+    OperationalError = OperationalError
 
     #: Execution strategy used, or the qualified name of one.
     Strategy = 'celery.worker.strategy:default'