Explorar o código

Ack and show error message when receiving invalid tasks.

Ask Solem %!s(int64=15) %!d(string=hai) anos
pai
achega
94daaba72d
Modificáronse 2 ficheiros con 14 adicións e 2 borrados
  1. 7 0
      celery/worker/job.py
  2. 7 2
      celery/worker/listener.py

+ 7 - 0
celery/worker/job.py

@@ -38,6 +38,10 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
+class InvalidTaskError(Exception):
+    """The task has invalid data or is not properly constructed."""
+
+
 class AlreadyExecutedError(Exception):
     """Tasks can only be executed once, as they might change
     world-wide state."""
@@ -214,6 +218,9 @@ class TaskWrapper(object):
         kwargs = message_data["kwargs"]
         retries = message_data.get("retries", 0)
 
+        if not hasattr(kwargs, "items"):
+            raise InvalidTaskError("Task kwargs must be a dictionary.")
+
         # Convert any unicode keys in the keyword arguments to ascii.
         kwargs = dict((key.encode("utf-8"), value)
                         for key, value in kwargs.items())

+ 7 - 2
celery/worker/listener.py

@@ -7,7 +7,7 @@ from dateutil.parser import parse as parse_iso8601
 from celery import conf
 from celery import signals
 from celery.utils import retry_over_time
-from celery.worker.job import TaskWrapper
+from celery.worker.job import TaskWrapper, InvalidTaskError
 from celery.worker.revoke import revoked
 from celery.worker.control import ControlDispatch
 from celery.worker.heartbeat import Heart
@@ -134,7 +134,12 @@ class CarrotListener(object):
                                                 logger=self.logger,
                                                 eventer=self.event_dispatcher)
             except NotRegistered, exc:
-                self.logger.error("Unknown task ignored: %s" % (exc))
+                self.logger.error("Unknown task ignored: %s: %s" % (
+                        str(exc), message_data))
+                message.ack()
+            except InvalidTaskError, exc:
+                self.logger.error("Invalid task ignored: %s: %s" % (
+                        str(exc), message_data))
                 message.ack()
             else:
                 self.on_task(task, eta=message_data.get("eta"))