瀏覽代碼

Prefork pool: Safe repr of returned result

Ask Solem 10 年之前
父節點
當前提交
01b7abfeee
共有 1 個文件被更改,包括 20 次插入3 次删除
  1. 20 3
      celery/concurrency/asynpool.py

+ 20 - 3
celery/concurrency/asynpool.py

@@ -43,7 +43,7 @@ from kombu.serialization import pickle as _pickle
 from kombu.utils import fxrange
 from kombu.utils.compat import get_errno
 from kombu.utils.eventio import SELECT_BAD_FD
-from celery.five import Counter, items, values
+from celery.five import Counter, items, string_t, text_t, values
 from celery.utils.log import get_logger
 from celery.utils.text import truncate
 from celery.worker import state as worker_state
@@ -159,6 +159,23 @@ def _select(readers=None, writers=None, err=None, timeout=0):
             raise
 
 
+def _repr_result(obj):
+    try:
+        return repr(obj)
+    except Exception as orig_exc:
+        try:
+            return text_t(obj)
+        except UnicodeDecodeError:
+            if isinstance(obj, string_t):
+                try:
+                    return obj.decode('utf-8', errors='replace')
+                except Exception:
+                    pass
+        return '<Unrepresentable: {0!r} (o.__repr__ returns unicode?)>'.format(
+            orig_exc,
+        )
+
+
 class Worker(_pool.Worker):
     """Pool worker process."""
     dead = False
@@ -169,9 +186,9 @@ class Worker(_pool.Worker):
         # is writable.
         self.outq.put((WORKER_UP, (pid, )))
 
-    def prepare_result(self, result, RESULT_MAXLEN=RESULT_MAXLEN):
+    def prepare_result(self, result, maxlen=RESULT_MAXLEN, truncate=truncate):
         if not isinstance(result, ExceptionInfo):
-            return truncate(repr(result), RESULT_MAXLEN)
+            return truncate(_repr_result(result), maxlen)
         return result