Browse Source

New magic kwarg: delivery_info. Used by retry() to resent the task to it's
original destination by using the same exchange/routing_key. Thanks to Jesper Noehr

Ask Solem 15 years ago
parent
commit
ff572bbdd9

+ 1 - 0
celery/execute/__init__.py

@@ -151,6 +151,7 @@ def apply(task, args, kwargs, **options):
                       "task_retries": retries,
                       "task_retries": retries,
                       "task_is_eager": True,
                       "task_is_eager": True,
                       "logfile": None,
                       "logfile": None,
+                      "delivery_info": {"is_eager": True},
                       "loglevel": 0}
                       "loglevel": 0}
     supported_keys = fun_takes_kwargs(task.run, default_kwargs)
     supported_keys = fun_takes_kwargs(task.run, default_kwargs)
     extend_with = dict((key, val) for key, val in default_kwargs.items()
     extend_with = dict((key, val) for key, val in default_kwargs.items()

+ 6 - 0
celery/task/base.py

@@ -181,8 +181,10 @@ class Task(object):
             * task_id
             * task_id
             * task_name
             * task_name
             * task_retries
             * task_retries
+            * task_is_eager
             * logfile
             * logfile
             * loglevel
             * loglevel
+            * delivery_info
 
 
         Additional standard keyword arguments may be added in the future.
         Additional standard keyword arguments may be added in the future.
         To take these default arguments, the task can either list the ones
         To take these default arguments, the task can either list the ones
@@ -321,6 +323,10 @@ class Task(object):
             ...                        countdown=60 * 5, exc=exc)
             ...                        countdown=60 * 5, exc=exc)
 
 
         """
         """
+        delivery_info = kwargs.pop("delivery_info", {})
+        options.setdefault("exchange", delivery_info.get("exchange"))
+        options.setdefault("routing_key", delivery_info.get("routing_key"))
+
         options["retries"] = kwargs.pop("task_retries", 0) + 1
         options["retries"] = kwargs.pop("task_retries", 0) + 1
         options["task_id"] = kwargs.pop("task_id", None)
         options["task_id"] = kwargs.pop("task_id", None)
         options["countdown"] = options.get("countdown",
         options["countdown"] = options.get("countdown",

+ 2 - 0
celery/tests/test_worker_job.py

@@ -381,6 +381,8 @@ class TestTaskWrapper(unittest.TestCase):
             "loglevel": 10,
             "loglevel": 10,
             "task_id": tw.task_id,
             "task_id": tw.task_id,
             "task_retries": 0,
             "task_retries": 0,
+            "task_is_eager": False,
+            "delivery_info": {},
             "task_name": tw.task_name})
             "task_name": tw.task_name})
 
 
     def test_on_failure(self):
     def test_on_failure(self):

+ 18 - 6
celery/worker/job.py

@@ -38,6 +38,9 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
 
 
+WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
+
+
 class InvalidTaskError(Exception):
 class InvalidTaskError(Exception):
     """The task has invalid data or is not properly constructed."""
     """The task has invalid data or is not properly constructed."""
 
 
@@ -179,13 +182,14 @@ class TaskWrapper(object):
     time_start = None
     time_start = None
 
 
     def __init__(self, task_name, task_id, args, kwargs,
     def __init__(self, task_name, task_id, args, kwargs,
-            on_ack=noop, retries=0, **opts):
+            on_ack=noop, retries=0, delivery_info=None, **opts):
         self.task_name = task_name
         self.task_name = task_name
         self.task_id = task_id
         self.task_id = task_id
         self.retries = retries
         self.retries = retries
         self.args = args
         self.args = args
         self.kwargs = kwargs
         self.kwargs = kwargs
         self.on_ack = on_ack
         self.on_ack = on_ack
+        self.delivery_info = delivery_info or {}
         self.task = tasks[self.task_name]
         self.task = tasks[self.task_name]
 
 
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
@@ -217,6 +221,11 @@ class TaskWrapper(object):
         kwargs = message_data["kwargs"]
         kwargs = message_data["kwargs"]
         retries = message_data.get("retries", 0)
         retries = message_data.get("retries", 0)
 
 
+        _delivery_info = getattr(message, "delivery_info", {})
+        delivery_info = dict((key, _delivery_info.get(key))
+                                for key in WANTED_DELIVERY_INFO)
+
+
         if not hasattr(kwargs, "items"):
         if not hasattr(kwargs, "items"):
             raise InvalidTaskError("Task kwargs must be a dictionary.")
             raise InvalidTaskError("Task kwargs must be a dictionary.")
 
 
@@ -226,23 +235,26 @@ class TaskWrapper(object):
 
 
         return cls(task_name, task_id, args, kwargs,
         return cls(task_name, task_id, args, kwargs,
                     retries=retries, on_ack=message.ack,
                     retries=retries, on_ack=message.ack,
+                    delivery_info=delivery_info,
                     logger=logger, eventer=eventer)
                     logger=logger, eventer=eventer)
 
 
     def extend_with_default_kwargs(self, loglevel, logfile):
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
         """Extend the tasks keyword arguments with standard task arguments.
 
 
         Currently these are ``logfile``, ``loglevel``, ``task_id``,
         Currently these are ``logfile``, ``loglevel``, ``task_id``,
-        ``task_name`` and ``task_retries``.
+        ``task_name``, ``task_retries``, and ``delivery_info``.
 
 
         See :meth:`celery.task.base.Task.run` for more information.
         See :meth:`celery.task.base.Task.run` for more information.
 
 
         """
         """
         kwargs = dict(self.kwargs)
         kwargs = dict(self.kwargs)
         default_kwargs = {"logfile": logfile,
         default_kwargs = {"logfile": logfile,
-                            "loglevel": loglevel,
-                            "task_id": self.task_id,
-                            "task_name": self.task_name,
-                            "task_retries": self.retries}
+                          "loglevel": loglevel,
+                          "task_id": self.task_id,
+                          "task_name": self.task_name,
+                          "task_retries": self.retries,
+                          "task_is_eager": False,
+                          "delivery_info": self.delivery_info}
         fun = self.task.run
         fun = self.task.run
         supported_keys = fun_takes_kwargs(fun, default_kwargs)
         supported_keys = fun_takes_kwargs(fun, default_kwargs)
         extend_with = dict((key, val) for key, val in default_kwargs.items()
         extend_with = dict((key, val) for key, val in default_kwargs.items()

+ 14 - 0
docs/userguide/tasks.rst

@@ -78,6 +78,20 @@ The current default keyword arguments are:
     How many times the current task has been retried.
     How many times the current task has been retried.
     An integer starting at ``0``.
     An integer starting at ``0``.
 
 
+* task_is_eager
+
+    Set to ``True`` if the task is executed locally in the client,
+    and not by a worker.
+
+* delivery_info
+
+  Additional message delivery information. This is a mapping containing
+  the exchange and routing key used to deliver this task. It's used
+  by e.g. :meth:`retry` to resend the task to the same destination queue.
+
+  **NOTE** As some messaging backends doesn't have advanced routing
+  capabilities, you can't trust the availability of keys in this mapping.
+
 
 
 Logging
 Logging
 =======
 =======