Kaynağa Gözat

Merge branch 'peplin/reraise_exceptions'

Ask Solem 14 yıl önce
ebeveyn
işleme
1276fcfd1c

+ 1 - 0
celery/app/defaults.py

@@ -69,6 +69,7 @@ NAMESPACES = {
         "DEFAULT_EXCHANGE_TYPE": Option("direct"),
         "DEFAULT_EXCHANGE_TYPE": Option("direct"),
         "DEFAULT_DELIVERY_MODE": Option(2, type="string"),
         "DEFAULT_DELIVERY_MODE": Option(2, type="string"),
         "EAGER_PROPAGATES_EXCEPTIONS": Option(False, type="bool"),
         "EAGER_PROPAGATES_EXCEPTIONS": Option(False, type="bool"),
+        "EAGER_RERAISES_EXCEPTIONS": Option(False, type="bool"),
         "EVENT_SERIALIZER": Option("json"),
         "EVENT_SERIALIZER": Option("json"),
         "IMPORTS": Option((), type="tuple"),
         "IMPORTS": Option((), type="tuple"),
         "IGNORE_RESULT": Option(False, type="bool"),
         "IGNORE_RESULT": Option(False, type="bool"),

+ 1 - 0
celery/conf.py

@@ -15,6 +15,7 @@ conf = app_or_default().conf
 
 
 ALWAYS_EAGER = conf.CELERY_ALWAYS_EAGER
 ALWAYS_EAGER = conf.CELERY_ALWAYS_EAGER
 EAGER_PROPAGATES_EXCEPTIONS = conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS
 EAGER_PROPAGATES_EXCEPTIONS = conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS
+EAGER_RERAISES_EXCEPTIONS = conf.CELERY_EAGER_RERAISES_EXCEPTIONS
 RESULT_BACKEND = conf.CELERY_RESULT_BACKEND
 RESULT_BACKEND = conf.CELERY_RESULT_BACKEND
 CACHE_BACKEND = conf.CELERY_CACHE_BACKEND
 CACHE_BACKEND = conf.CELERY_CACHE_BACKEND
 CACHE_BACKEND_OPTIONS = conf.CELERY_CACHE_BACKEND_OPTIONS
 CACHE_BACKEND_OPTIONS = conf.CELERY_CACHE_BACKEND_OPTIONS

+ 14 - 4
celery/execute/trace.py

@@ -22,9 +22,13 @@ class TraceInfo(object):
             self.strtb = "\n".join(traceback.format_exception(*exc_info))
             self.strtb = "\n".join(traceback.format_exception(*exc_info))
 
 
     @classmethod
     @classmethod
-    def trace(cls, fun, args, kwargs):
+    def trace(cls, fun, args, kwargs, propagate=False):
         """Trace the execution of a function, calling the appropiate callback
         """Trace the execution of a function, calling the appropiate callback
-        if the function raises retry, an failure or returned successfully."""
+        if the function raises retry, an failure or returned successfully.
+
+        :keyword propagate: If true, errors will propagate to the caller.
+
+        """
         try:
         try:
             return cls(states.SUCCESS, retval=fun(*args, **kwargs))
             return cls(states.SUCCESS, retval=fun(*args, **kwargs))
         except (SystemExit, KeyboardInterrupt):
         except (SystemExit, KeyboardInterrupt):
@@ -32,16 +36,20 @@ class TraceInfo(object):
         except RetryTaskError, exc:
         except RetryTaskError, exc:
             return cls(states.RETRY, retval=exc, exc_info=sys.exc_info())
             return cls(states.RETRY, retval=exc, exc_info=sys.exc_info())
         except Exception, exc:
         except Exception, exc:
+            if propagate:
+                raise
             return cls(states.FAILURE, retval=exc, exc_info=sys.exc_info())
             return cls(states.FAILURE, retval=exc, exc_info=sys.exc_info())
         except:
         except:
             # For Python2.4 where raising strings are still allowed.
             # For Python2.4 where raising strings are still allowed.
+            if propagate:
+                raise
             return cls(states.FAILURE, retval=None, exc_info=sys.exc_info())
             return cls(states.FAILURE, retval=None, exc_info=sys.exc_info())
 
 
 
 
 class TaskTrace(object):
 class TaskTrace(object):
 
 
     def __init__(self, task_name, task_id, args, kwargs, task=None,
     def __init__(self, task_name, task_id, args, kwargs, task=None,
-            request=None, **_):
+            request=None, propagate=None, **_):
         self.task_id = task_id
         self.task_id = task_id
         self.task_name = task_name
         self.task_name = task_name
         self.args = args
         self.args = args
@@ -50,6 +58,7 @@ class TaskTrace(object):
         self.request = request or {}
         self.request = request or {}
         self.status = states.PENDING
         self.status = states.PENDING
         self.strtb = None
         self.strtb = None
+        self.propagate = propagate
         self._trace_handlers = {states.FAILURE: self.handle_failure,
         self._trace_handlers = {states.FAILURE: self.handle_failure,
                                 states.RETRY: self.handle_retry,
                                 states.RETRY: self.handle_retry,
                                 states.SUCCESS: self.handle_success}
                                 states.SUCCESS: self.handle_success}
@@ -72,7 +81,8 @@ class TaskTrace(object):
         return retval
         return retval
 
 
     def _trace(self):
     def _trace(self):
-        trace = TraceInfo.trace(self.task, self.args, self.kwargs)
+        trace = TraceInfo.trace(self.task, self.args, self.kwargs,
+                propagate=self.propagate)
         self.status = trace.status
         self.status = trace.status
         self.strtb = trace.strtb
         self.strtb = trace.strtb
         self.handle_after_return(trace.status, trace.retval,
         self.handle_after_return(trace.status, trace.retval,

+ 1 - 3
celery/task/base.py

@@ -589,11 +589,9 @@ class BaseTask(object):
             kwargs.update(extend_with)
             kwargs.update(extend_with)
 
 
         trace = TaskTrace(task.name, task_id, args, kwargs,
         trace = TaskTrace(task.name, task_id, args, kwargs,
-                          task=task, request=request)
+                          task=task, request=request, propagate=throw)
         retval = trace.execute()
         retval = trace.execute()
         if isinstance(retval, ExceptionInfo):
         if isinstance(retval, ExceptionInfo):
-            if throw:
-                raise retval.exception
             retval = retval.exception
             retval = retval.exception
         return EagerResult(task_id, retval, trace.status,
         return EagerResult(task_id, retval, trace.status,
                            traceback=trace.strtb)
                            traceback=trace.strtb)

+ 7 - 0
celery/tests/test_task.py

@@ -440,6 +440,13 @@ class TestTaskApply(unittest.TestCase):
         finally:
         finally:
             RaisingTask.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
             RaisingTask.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
 
 
+    def test_apply_with_CELERY_EAGER_RERAISES_EXCEPTIONS(self):
+        RaisingTask.app.conf.CELERY_EAGER_RERAISES_EXCEPTIONS = True
+        try:
+            self.assertRaises(KeyError, RaisingTask.apply)
+        finally:
+            RaisingTask.app.conf.CELERY_EAGER_RERAISES_EXCEPTIONS = False
+
     def test_apply(self):
     def test_apply(self):
         IncrementCounterTask.count = 0
         IncrementCounterTask.count = 0
 
 

+ 11 - 0
docs/configuration.rst

@@ -622,6 +622,17 @@ If this is :const:`True`, eagerly executed tasks (using `.apply`, or with
 
 
 It's the same as always running `apply` with `throw=True`.
 It's the same as always running `apply` with `throw=True`.
 
 
+.. setting:: CELERY_EAGER_RERAISES_EXCEPTIONS
+
+CELERY_EAGER_RERAISES_EXCEPTIONS
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If this is :const:`True`, eagerly executed tasks (using `.apply`, or with
+:setting:`CELERY_ALWAYS_EAGER` on), will re-raise exceptions with a full
+application traceback.
+
+It's the same as always running `apply` with `reraise_exceptions=True`.
+
 .. setting:: CELERY_IGNORE_RESULT
 .. setting:: CELERY_IGNORE_RESULT
 
 
 CELERY_IGNORE_RESULT
 CELERY_IGNORE_RESULT