Kaynağa Gözat

Eventlet/gevent/solo/threads now properly handles BaseException in tasks

Ask Solem 11 yıl önce
ebeveyn
işleme
eb16bb7822
1 değiştirilmiş dosya ile 19 ekleme ve 4 silme
  1. 19 4
      celery/concurrency/base.py

+ 19 - 4
celery/concurrency/base.py

@@ -10,10 +10,13 @@ from __future__ import absolute_import
 
 import logging
 import os
+import sys
 
+from billiard.einfo import ExceptionInfo
+from billiard.exceptions import WorkerLostError
 from kombu.utils.encoding import safe_repr
 
-from celery.five import monotonic
+from celery.five import monotonic, reraise
 from celery.utils import timer2
 from celery.utils.log import get_logger
 
@@ -23,10 +26,22 @@ logger = get_logger('celery.pool')
 
 
 def apply_target(target, args=(), kwargs={}, callback=None,
-                 accept_callback=None, pid=None, **_):
+                 accept_callback=None, pid=None, getpid=os.getpid,
+                 monotonic=monotonic, **_):
     if accept_callback:
-        accept_callback(pid or os.getpid(), monotonic())
-    callback(target(*args, **kwargs))
+        accept_callback(pid or getpid(), monotonic())
+    try:
+        ret = target(*args, **kwargs)
+    except Exception:
+        raise
+    except BaseException as exc:
+        try:
+            reraise(WorkerLostError, WorkerLostError(repr(exc)),
+                    sys.exc_info()[2])
+        except WorkerLostError:
+            callback(ExceptionInfo())
+    else:
+        callback(ret)
 
 
 class BasePool(object):