소스 검색

Use safe_repr to ensure repr doesn't crash the worker. Closes #298

Ask Solem 14 년 전
부모
커밋
1ab37bbf44
4개의 변경된 파일21개의 추가작업 그리고 14개의 파일을 삭제
  1. 5 0
      celery/utils/encoding.py
  2. 3 2
      celery/worker/consumer.py
  3. 2 1
      celery/worker/control/builtins.py
  4. 11 11
      celery/worker/job.py

+ 5 - 0
celery/utils/encoding.py

@@ -17,3 +17,8 @@ def safe_str(s, errors="replace"):
         return "<Unrepresentable %r: %r>" % (type(s), exc)
         return "<Unrepresentable %r: %r>" % (type(s), exc)
 
 
 
 
+def safe_repr(o, errors="replace"):
+    try:
+        return repr(o)
+    except Exception, exc:
+        return safe_str(o, errors)

+ 3 - 2
celery/worker/consumer.py

@@ -80,6 +80,7 @@ from celery.app import app_or_default
 from celery.datastructures import AttributeDict
 from celery.datastructures import AttributeDict
 from celery.exceptions import NotRegistered
 from celery.exceptions import NotRegistered
 from celery.utils import noop
 from celery.utils import noop
+from celery.utils.encoding import safe_repr
 from celery.utils.timer2 import to_timestamp
 from celery.utils.timer2 import to_timestamp
 from celery.worker import state
 from celery.worker import state
 from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.job import TaskRequest, InvalidTaskError
@@ -293,8 +294,8 @@ class Consumer(object):
         self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
         self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
 
 
         self.event_dispatcher.send("task-received", uuid=task.task_id,
         self.event_dispatcher.send("task-received", uuid=task.task_id,
-                name=task.task_name, args=repr(task.args),
-                kwargs=repr(task.kwargs), retries=task.retries,
+                name=task.task_name, args=safe_repr(task.args),
+                kwargs=safe_repr(task.kwargs), retries=task.retries,
                 eta=task.eta and task.eta.isoformat(),
                 eta=task.eta and task.eta.isoformat(),
                 expires=task.expires and task.expires.isoformat())
                 expires=task.expires and task.expires.isoformat())
 
 

+ 2 - 1
celery/worker/control/builtins.py

@@ -8,6 +8,7 @@ from celery.utils import timeutils
 from celery.worker import state
 from celery.worker import state
 from celery.worker.state import revoked
 from celery.worker.state import revoked
 from celery.worker.control.registry import Panel
 from celery.worker.control.registry import Panel
+from celery.utils.encoding import safe_repr
 
 
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
 
 
@@ -128,7 +129,7 @@ def dump_reserved(panel, safe=False, **kwargs):
         panel.logger.info("--Empty queue--")
         panel.logger.info("--Empty queue--")
         return []
         return []
     panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
     panel.logger.info("* Dump of currently reserved tasks:\n%s" % (
-                            "\n".join(map(repr, reserved), )))
+                            "\n".join(map(safe_repr, reserved), )))
     return [request.info(safe=safe)
     return [request.info(safe=safe)
             for request in reserved]
             for request in reserved]
 
 

+ 11 - 11
celery/worker/job.py

@@ -17,7 +17,7 @@ from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils import truncate_text
 from celery.utils import truncate_text
 from celery.utils.compat import log_with_extra
 from celery.utils.compat import log_with_extra
-from celery.utils.encoding import safe_str
+from celery.utils.encoding import safe_repr, safe_str
 from celery.utils.timeutils import maybe_iso8601
 from celery.utils.timeutils import maybe_iso8601
 from celery.worker import state
 from celery.worker import state
 
 
@@ -451,7 +451,7 @@ class TaskRequest(object):
 
 
         runtime = self.time_start and (time.time() - self.time_start) or 0
         runtime = self.time_start and (time.time() - self.time_start) or 0
         self.send_event("task-succeeded", uuid=self.task_id,
         self.send_event("task-succeeded", uuid=self.task_id,
-                        result=repr(ret_value), runtime=runtime)
+                        result=safe_repr(ret_value), runtime=runtime)
 
 
         self.logger.info(self.success_msg.strip() % {
         self.logger.info(self.success_msg.strip() % {
                 "id": self.task_id,
                 "id": self.task_id,
@@ -462,13 +462,13 @@ class TaskRequest(object):
     def on_retry(self, exc_info):
     def on_retry(self, exc_info):
         """Handler called if the task should be retried."""
         """Handler called if the task should be retried."""
         self.send_event("task-retried", uuid=self.task_id,
         self.send_event("task-retried", uuid=self.task_id,
-                                        exception=repr(exc_info.exception.exc),
-                                        traceback=repr(exc_info.traceback))
+                         exception=safe_repr(exc_info.exception.exc),
+                         traceback=safe_repr(exc_info.traceback))
 
 
         self.logger.info(self.retry_msg.strip() % {
         self.logger.info(self.retry_msg.strip() % {
                 "id": self.task_id,
                 "id": self.task_id,
                 "name": self.task_name,
                 "name": self.task_name,
-                "exc": repr(exc_info.exception.exc)})
+                "exc": safe_repr(exc_info.exception.exc)})
 
 
     def on_failure(self, exc_info):
     def on_failure(self, exc_info):
         """Handler called if the task raised an exception."""
         """Handler called if the task raised an exception."""
@@ -488,13 +488,13 @@ class TaskRequest(object):
                                                   exc_info.exception)
                                                   exc_info.exception)
 
 
         self.send_event("task-failed", uuid=self.task_id,
         self.send_event("task-failed", uuid=self.task_id,
-                                       exception=repr(exc_info.exception),
-                                       traceback=exc_info.traceback)
+                         exception=safe_repr(exc_info.exception),
+                         traceback=safe_str(exc_info.traceback))
 
 
         context = {"hostname": self.hostname,
         context = {"hostname": self.hostname,
                    "id": self.task_id,
                    "id": self.task_id,
                    "name": self.task_name,
                    "name": self.task_name,
-                   "exc": safe_str(repr(exc_info.exception)),
+                   "exc": safe_repr(exc_info.exception),
                    "traceback": safe_str(exc_info.traceback),
                    "traceback": safe_str(exc_info.traceback),
                    "args": self.args,
                    "args": self.args,
                    "kwargs": self.kwargs}
                    "kwargs": self.kwargs}
@@ -530,14 +530,14 @@ class TaskRequest(object):
     def repr_result(self, result, maxlen=46):
     def repr_result(self, result, maxlen=46):
         # 46 is the length needed to fit
         # 46 is the length needed to fit
         #     "the quick brown fox jumps over the lazy dog" :)
         #     "the quick brown fox jumps over the lazy dog" :)
-        return truncate_text(repr(result), maxlen)
+        return truncate_text(safe_repr(result), maxlen)
 
 
     def info(self, safe=False):
     def info(self, safe=False):
         args = self.args
         args = self.args
         kwargs = self.kwargs
         kwargs = self.kwargs
         if not safe:
         if not safe:
-            args = repr(args)
-            kwargs = repr(self.kwargs)
+            args = safe_repr(args)
+            kwargs = safe_repr(self.kwargs)
 
 
         return {"id": self.task_id,
         return {"id": self.task_id,
                 "name": self.task_name,
                 "name": self.task_name,