Browse Source

apply_async: Added the ability to retry publishing the task message in the event of connection loss or failure.

**New settings**

* `CELERY_TASK_PUBLISH_RETRY`

    Decides if publishing task messages will be retried in the case
    of connection loss or other connection errors.
    Not enabled by default.

* `CELERY_TASK_PUBLISH_RETRY_POLICY`

    Defines the default policy when retrying publishing a task message in
    the case of connection loss or other connection errors.

    This is a mapping that must contain the following keys:

    * `max_retries`

        Maximum number of retries before giving up, in this case the
        exception that caused the retry to fail will be raised.

        A value of 0 or :const:`None` means it will retry forever.

        The default is to retry 3 times.

    * `interval_start`

        Defines the number of seconds (float or integer) to wait between
        retries.  Default is 0, which means the first retry will be
        instantaneous.

    * `interval_step`

        On each consecutive retry this number will be added to the retry
        delay (float or integer).  Default is 0.2.

    * `interval_max`

        Maximum number of seconds (float or integer) to wait between
        retries.  Default is 0.2.

With the default policy of::

    {"max_retries": 3,
     "interval_start": 0,
     "interval_step": 0.2,
     "interval_max": 0.2}

the maximum time spent retrying will be 0.4 seconds.  It is set relatively
short by default because a connection failure could lead to a retry pile effect
if the broker connection is down: e.g. many web server processes waiting
to retry blocking other incoming requests.
Ask Solem 14 years ago
parent
commit
fb4ab44b7b
8 changed files with 113 additions and 14 deletions
  1. 10 0
      Changelog
  2. 24 8
      celery/app/amqp.py
  3. 9 3
      celery/app/defaults.py
  4. 1 1
      celery/backends/amqp.py
  5. 8 0
      celery/task/base.py
  6. 1 1
      contrib/requirements/default.txt
  7. 59 0
      docs/configuration.rst
  8. 1 1
      setup.py

+ 10 - 0
Changelog

@@ -144,6 +144,16 @@ Important Notes
 News
 ----
 
+* Added the ability to retry publishing the task message in
+  the event of connection loss or failure.
+
+    This is disabled by default but can be enabled using the
+    :setting:`CELERY_TASK_PUBLISH_RETRY` setting, and tweaked by
+    the :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY` setting.
+
+    In addition `retry`, and `retry_policy` keyword arguments have
+    been added to `Task.apply_async`.
+
 * Added support for message compression using the
   :setting:`CELERY_MESSAGE_COMPRESSION` setting, or the `compression` argument
   to `apply_async`.  This can also be set using routers.

+ 24 - 8
celery/app/amqp.py

@@ -130,6 +130,14 @@ class Queues(UserDict):
 
 class TaskPublisher(messaging.Publisher):
     auto_declare = False
+    retry = False
+    retry_policy = None
+
+    def __init__(self, *args, **kwargs):
+        self.retry = kwargs.pop("retry", self.retry)
+        self.retry_policy = kwargs.pop("retry_policy",
+                                        self.retry_policy or {})
+        super(TaskPublisher, self).__init__(*args, **kwargs)
 
     def declare(self):
         if self.exchange.name not in _exchanges_declared:
@@ -139,7 +147,7 @@ class TaskPublisher(messaging.Publisher):
     def delay_task(self, task_name, task_args=None, task_kwargs=None,
             countdown=None, eta=None, task_id=None, taskset_id=None,
             expires=None, exchange=None, exchange_type=None,
-            event_dispatcher=None, **kwargs):
+            event_dispatcher=None, retry=None, retry_policy=None, **kwargs):
         """Send task message."""
 
         task_id = task_id or gen_unique_id()
@@ -163,7 +171,7 @@ class TaskPublisher(messaging.Publisher):
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
 
-        message_data = {
+        body = {
             "task": task_name,
             "id": task_id,
             "args": task_args or [],
@@ -174,7 +182,7 @@ class TaskPublisher(messaging.Publisher):
         }
 
         if taskset_id:
-            message_data["taskset"] = taskset_id
+            body["taskset"] = taskset_id
 
         # custom exchange passed, need to declare it.
         if exchange and exchange not in _exchanges_declared:
@@ -184,10 +192,15 @@ class TaskPublisher(messaging.Publisher):
                                           durable=self.durable,
                                           auto_delete=self.auto_delete)
             _exchanges_declared.add(exchange)
-        self.send(message_data, exchange=exchange,
-                  **extract_msg_options(kwargs))
-        signals.task_sent.send(sender=task_name, **message_data)
 
+        send = self.send
+        if retry is None and self.retry or retry:
+            send = self.connection.ensure(self, self.send,
+                            **dict(self.retry_policy, **retry_policy or {}))
+
+        send(body, exchange=exchange, **extract_msg_options(kwargs))
+
+        signals.task_sent.send(sender=task_name, **body)
         if event_dispatcher:
             event_dispatcher.send("task-sent", uuid=task_id,
                                                name=task_name,
@@ -240,11 +253,14 @@ class AMQP(object):
         You should use `app.send_task` instead.
 
         """
+        conf = self.app.conf
         _, default_queue = self.get_default_queue()
         defaults = {"exchange": default_queue["exchange"],
                     "exchange_type": default_queue["exchange_type"],
-                    "routing_key": self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
-                    "serializer": self.app.conf.CELERY_TASK_SERIALIZER}
+                    "routing_key": conf.CELERY_DEFAULT_ROUTING_KEY,
+                    "serializer": conf.CELERY_TASK_SERIALIZER,
+                    "retry": conf.CELERY_TASK_PUBLISH_RETRY,
+                    "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY}
         publisher = TaskPublisher(*args,
                                   **self.app.merge(defaults, kwargs))
 

+ 9 - 3
celery/app/defaults.py

@@ -57,6 +57,8 @@ NAMESPACES = {
     "CELERY": {
         "ACKS_LATE": Option(False, type="bool"),
         "ALWAYS_EAGER": Option(False, type="bool"),
+        "AMQP_TASK_RESULT_EXPIRES": Option(type="int"),
+        "AMQP_TASK_RESULT_CONNECTION_MAX": Option(type="int", default=1),
         "BROADCAST_QUEUE": Option("celeryctl"),
         "BROADCAST_EXCHANGE": Option("celeryctl"),
         "BROADCAST_EXCHANGE_TYPE": Option("fanout"),
@@ -87,10 +89,14 @@ NAMESPACES = {
         "SEND_TASK_ERROR_EMAILS": Option(False, type="bool"),
         "SEND_TASK_SENT_EVENT": Option(False, type="bool"),
         "STORE_ERRORS_EVEN_IF_IGNORED": Option(False, type="bool"),
-        "TASK_RESULT_EXPIRES": Option(timedelta(days=1), type="int"),
-        "AMQP_TASK_RESULT_EXPIRES": Option(type="int"),
-        "AMQP_TASK_RESULT_CONNECTION_MAX": Option(type="int", default=1),
         "TASK_ERROR_WHITELIST": Option((), type="tuple"),
+        "TASK_PUBLISH_RETRY": Option(False, type="bool"),
+        "TASK_PUBLISH_RETRY_POLICY": Option({
+                "max_retries": 3,
+                "interval_start": 0,
+                "interval_max": 0.2,
+                "interval_step": 0.2}, type="dict"),
+        "TASK_RESULT_EXPIRES": Option(timedelta(days=1), type="int"),
         "TASK_SERIALIZER": Option("pickle"),
         "TRACK_STARTED": Option(False, type="bool"),
         "REDIRECT_STDOUTS": Option(True, type="bool"),

+ 1 - 1
celery/backends/amqp.py

@@ -113,7 +113,7 @@ class AMQPBackend(BaseDictBackend):
         pass
 
     def _store_result(self, task_id, result, status, traceback=None,
-            max_retries=20, interval_start=0.2, interval_step=1,
+            max_retries=20, interval_start=0, interval_step=1,
             interval_max=1):
         """Send task return value and status."""
         result = self.encode_result(result, status)

+ 8 - 0
celery/task/base.py

@@ -394,6 +394,14 @@ class BaseTask(object):
                                   on establishing a connection to the AMQP
                                   server.
 
+        :keyword retry: If enabled sending of the task message will be retried
+                        in the event of connection loss or failure.  Default
+                        is taken from the :setting:`CELERY_TASK_PUBLISH_RETRY`
+                        setting.
+
+        :keyword retry_policy:  Override the retry policy used.  See the
+                                :setting:`CELERY_TASK_PUBLISH_RETRY` setting.
+
         :keyword routing_key: The routing key used to route the task to a
                               worker server.  Defaults to the
                               :attr:`routing_key` attribute.

+ 1 - 1
contrib/requirements/default.txt

@@ -1,5 +1,5 @@
 python-dateutil
 sqlalchemy
 anyjson
-kombu>=0.9.1
+kombu>=1.0.0b4
 pyparsing>=1.5.0

+ 59 - 0
docs/configuration.rst

@@ -685,6 +685,65 @@ methods that have been registered with :mod:`kombu.serialization.registry`.
 
     :ref:`executing-serializers`.
 
+.. setting:: CELERY_TASK_PUBLISH_RETRY
+
+CELERY_TASK_PUBLISH_RETRY
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Decides if publishing task messages will be retried in the case
+of connection loss or other connection errors.
+See also :setting:`CELERY_TASK_PUBLISH_RETRY_POLICY`.
+
+Disabled by default.
+
+.. setting:: CELERY_TASK_PUBLISH_RETRY_POLICY
+
+Defines the default policy when retrying publishing a task message in
+the case of connection loss or other connection errors.
+
+This is a mapping that must contain the following keys:
+
+    * `max_retries`
+
+        Maximum number of retries before giving up, in this case the
+        exception that caused the retry to fail will be raised.
+
+        A value of 0 or :const:`None` means it will retry forever.
+
+        The default is to retry 3 times.
+
+    * `interval_start`
+
+        Defines the number of seconds (float or integer) to wait between
+        retries.  Default is 0, which means the first retry will be
+        instantaneous.
+
+    * `interval_step`
+
+        On each consecutive retry this number will be added to the retry
+        delay (float or integer).  Default is 0.2.
+
+    * `interval_max`
+
+        Maximum number of seconds (float or integer) to wait between
+        retries.  Default is 0.2.
+
+With the default policy of::
+
+    {"max_retries": 3,
+     "interval_start": 0,
+     "interval_step": 0.2,
+     "interval_max": 0.2}
+
+the maximum time spent retrying will be 0.4 seconds.  It is set relatively
+short by default because a connection failure could lead to a retry pile effect
+if the broker connection is down: e.g. many web server processes waiting
+to retry blocking other incoming requests.
+
+
+CELERY_TASK_PUBLISH_RETRY_POLICY
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
 .. setting:: CELERY_DEFAULT_RATE_LIMIT
 
 CELERY_DEFAULT_RATE_LIMIT

+ 1 - 1
setup.py

@@ -141,7 +141,7 @@ except ImportError:
 install_requires.extend([
     "python-dateutil",
     "anyjson",
-    "kombu>=0.9.1",
+    "kombu>=1.0.0b4",
     "pyparsing>=1.5.0",
 ])