Browse Source

Added Task.delivery_mode / CELERY_DEFAULT_DELIVERY_MODE, with this you can configure tasks to be non-persistent. Closes #69.

Ask Solem 15 years ago
parent
commit
012ca1239f
4 changed files with 17 additions and 2 deletions
  1. 2 0
      celery/conf.py
  2. 2 1
      celery/execute/__init__.py
  3. 1 1
      celery/messaging.py
  4. 12 0
      celery/task/base.py

+ 2 - 0
celery/conf.py

@@ -26,6 +26,7 @@ _DEFAULTS = {
     "CELERY_DEFAULT_QUEUE": "celery",
     "CELERY_DEFAULT_EXCHANGE": "celery",
     "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
+    "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
     "CELERY_BROKER_CONNECTION_TIMEOUT": 4,
     "CELERY_BROKER_CONNECTION_RETRY": True,
     "CELERY_BROKER_CONNECTION_MAX_RETRIES": 100,
@@ -110,6 +111,7 @@ DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
 DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
 DEFAULT_EXCHANGE = _get("CELERY_DEFAULT_EXCHANGE")
 DEFAULT_EXCHANGE_TYPE = _get("CELERY_DEFAULT_EXCHANGE_TYPE")
+DEFAULT_DELIVERY_MODE = _get("CELERY_DEFAULT_DELIVERY_MODE")
 
 _DEPRECATIONS = {"CELERY_AMQP_CONSUMER_QUEUES": "CELERY_QUEUES",
                  "CELERY_AMQP_CONSUMER_QUEUE": "CELERY_QUEUES",

+ 2 - 1
celery/execute/__init__.py

@@ -8,7 +8,8 @@ from celery.messaging import TaskPublisher
 
 extract_exec_options = mattrgetter("routing_key", "exchange",
                                    "immediate", "mandatory",
-                                   "priority", "serializer")
+                                   "priority", "serializer",
+                                   "delivery_mode")
 
 
 @with_connection

+ 1 - 1
celery/messaging.py

@@ -17,7 +17,7 @@ from celery.utils import gen_unique_id, mitemgetter, noop
 
 MSG_OPTIONS = ("mandatory", "priority",
                "immediate", "routing_key",
-               "serializer")
+               "serializer", "delivery_mode")
 
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))

+ 12 - 0
celery/task/base.py

@@ -84,6 +84,17 @@ class Task(object):
 
         Override the global default ``exchange`` for this task.
 
+    .. attribute:: exchange_type
+
+        Override the global default exchange type for this task.
+
+    .. attribute:: delivery_mode
+
+        Override the global default delivery mode for this task.
+        By default this is set to ``2`` (persistent). You can change this
+        to ``1`` to get non-persistent behavior, which means the messages
+        are lost if the broker is restarted.
+
     .. attribute:: mandatory
 
         Mandatory message routing. An exception will be raised if the task
@@ -166,6 +177,7 @@ class Task(object):
     rate_limit_queue_type = Queue
     backend = default_backend
     exchange_type = conf.DEFAULT_EXCHANGE_TYPE
+    delivery_mode = conf.DEFAULT_DELIVERY_MODE
 
     MaxRetriesExceededError = MaxRetriesExceededError