Forráskód Böngészése

Properly handle task messages with missing fields as documented:

Missing required field throws InvalidTaskError,
and missing args/kwargs is assumed to be empty.
Chris Chamberlin 13 éve
szülő
commit
6edd3f8a67
2 módosított fájl, 29 hozzáadás és 5 törlés
  1. 18 0
      celery/tests/test_worker/test_worker_job.py
  2. 11 5
      celery/worker/job.py

+ 18 - 0
celery/tests/test_worker/test_worker_job.py

@@ -537,6 +537,24 @@ class test_TaskRequest(unittest.TestCase):
         self.assertIsInstance(tw.kwargs.keys()[0], str)
         self.assertTrue(tw.logger)
 
+    def test_from_message_empty_args(self):
+        body = {"task" : mytask.name, "id": uuid()}
+        m = Message(None, body=anyjson.serialize(body), backend="foo",
+                          content_type="application/json",
+                          content_encoding="utf-8")
+        tw = TaskRequest.from_message(m, m.decode())
+        self.assertIsInstance(tw, TaskRequest)
+        self.assertEquals(tw.args, [])
+        self.assertEquals(tw.kwargs, {})
+
+    def test_from_message_missing_required_fields(self):
+        body = {}
+        m = Message(None, body=anyjson.serialize(body), backend="foo",
+                          content_type="application/json",
+                          content_encoding="utf-8")
+        with self.assertRaises(InvalidTaskError):
+            TaskRequest.from_message(m, m.decode())
+ 
     def test_from_message_nonexistant_task(self):
         body = {"task": "cu.mytask.doesnotexist", "id": uuid(),
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}

+ 11 - 5
celery/worker/job.py

@@ -276,14 +276,20 @@ class TaskRequest(object):
         delivery_info = dict((key, delivery_info.get(key))
                                 for key in WANTED_DELIVERY_INFO)
 
-        kwargs = body["kwargs"]
+        kwargs = body.get("kwargs", {})
         if not hasattr(kwargs, "items"):
             raise InvalidTaskError("Task keyword arguments is not a mapping.")
-
-        return cls(task_name=body["task"],
-                   task_id=body["id"],
+        try:
+            task_name = body["task"]
+            task_id = body["id"]
+        except KeyError, exc:
+            raise InvalidTaskError(
+                "Task message is missing required field %r" % (exc, ))
+
+        return cls(task_name=task_name,
+                   task_id=task_id,
                    taskset_id=body.get("taskset", None),
-                   args=body["args"],
+                   args=body.get("args", []),
                    kwargs=kwdict(kwargs),
                    chord=body.get("chord"),
                    retries=body.get("retries", 0),