Procházet zdrojové kódy

apply(throw=True) / CELERY_EAGER_PROPAGATES_EXCEPTIONS: Makes eager execution re-raise task errors. Thanks to proppy. Closes #108.

Ask Solem před 15 roky
rodič
revize
760e94ec1b
3 změnil soubory, kde provedl 19 přidání a 0 odebrání
  1. 2 0
      celery/conf.py
  2. 6 0
      celery/execute/__init__.py
  3. 11 0
      celery/tests/test_task.py

+ 2 - 0
celery/conf.py

@@ -22,6 +22,7 @@ settings = load_settings()
 _DEFAULTS = {
     "CELERY_RESULT_BACKEND": "database",
     "CELERY_ALWAYS_EAGER": False,
+    "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
     "CELERY_TASK_RESULT_EXPIRES": timedelta(days=5),
     "CELERY_SEND_EVENTS": False,
     "CELERY_IGNORE_RESULT": False,
@@ -92,6 +93,7 @@ def _get(name, default=None, compat=None):
 
 # <--- Task                                        <-   --   --- - ----- -- #
 ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
+EAGER_PROPAGATES_EXCEPTIONS = _get("CELERY_EAGER_PROPAGATES_EXCEPTIONS")
 RESULT_BACKEND = _get("CELERY_RESULT_BACKEND", compat=["CELERY_BACKEND"])
 CELERY_BACKEND = RESULT_BACKEND # FIXME Remove in 1.4
 CELERY_CACHE_BACKEND = _get("CELERY_CACHE_BACKEND")

+ 6 - 0
celery/execute/__init__.py

@@ -137,6 +137,9 @@ def delay_task(task_name, *args, **kwargs):
 def apply(task, args, kwargs, **options):
     """Apply the task locally.
 
+    :keyword throw: Re-raise task exceptions. Defaults to
+        the ``CELERY_EAGER_PROPAGATES_EXCEPTIONS`` setting.
+
     This will block until the task completes, and returns a
     :class:`celery.result.EagerResult` instance.
 
@@ -145,6 +148,7 @@ def apply(task, args, kwargs, **options):
     kwargs = kwargs or {}
     task_id = options.get("task_id", gen_unique_id())
     retries = options.get("retries", 0)
+    throw = options.pop("throw", conf.EAGER_PROPAGATES_EXCEPTIONS)
 
     task = tasks[task.name] # Make sure we get the instance, not class.
 
@@ -163,5 +167,7 @@ def apply(task, args, kwargs, **options):
     trace = TaskTrace(task.name, task_id, args, kwargs, task=task)
     retval = trace.execute()
     if isinstance(retval, ExceptionInfo):
+        if throw:
+            raise retval.exception
         retval = retval.exception
     return EagerResult(task_id, retval, trace.status, traceback=trace.strtb)

+ 11 - 0
celery/tests/test_task.py

@@ -4,6 +4,7 @@ from datetime import datetime, timedelta
 
 from billiard.utils.functional import wraps
 
+from celery import conf
 from celery import task
 from celery import messaging
 from celery.task.schedules import crontab
@@ -396,6 +397,16 @@ class TestTaskSet(unittest.TestCase):
 
 class TestTaskApply(unittest.TestCase):
 
+    def test_apply_throw(self):
+        self.assertRaises(KeyError, RaisingTask.apply, throw=True)
+
+    def test_apply_with_CELERY_EAGER_PROPAGATES_EXCEPTIONS(self):
+        conf.EAGER_PROPAGATES_EXCEPTIONS = True
+        try:
+            self.assertRaises(KeyError, RaisingTask.apply)
+        finally:
+            conf.EAGER_PROPAGATES_EXCEPTIONS = False
+
     def test_apply(self):
         IncrementCounterTask.count = 0