Browse Source

apply_async now passes options to apply() when ALWAYS_EAGER

Ask Solem 13 years ago
parent
commit
b16397da96
2 changed files with 7 additions and 18 deletions
  1. 1 0
      celery/app/amqp.py
  2. 6 18
      celery/app/task/__init__.py

+ 1 - 0
celery/app/amqp.py

@@ -320,6 +320,7 @@ class AMQP(object):
                     "exchange_type": default_queue["exchange_type"],
                     "routing_key": conf.CELERY_DEFAULT_ROUTING_KEY,
                     "serializer": conf.CELERY_TASK_SERIALIZER,
+                    "compression": conf.CELERY_MESSAGE_COMPRESSION,
                     "retry": conf.CELERY_TASK_PUBLISH_RETRY,
                     "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
                     "enable_utc": conf.CELERY_ENABLE_UTC,

+ 6 - 18
celery/app/task/__init__.py

@@ -28,7 +28,7 @@ extract_exec_options = mattrgetter("queue", "routing_key",
                                    "exchange", "immediate",
                                    "mandatory", "priority",
                                    "serializer", "delivery_mode",
-                                   "compression")
+                                   "compression", "expires")
 
 
 class Context(threading.local):
@@ -370,10 +370,9 @@ class BaseTask(object):
         return self.apply_async(args, kwargs)
 
     @classmethod
-    def apply_async(self, args=None, kwargs=None, countdown=None,
-            eta=None, task_id=None, publisher=None, connection=None,
-            connect_timeout=None, router=None, expires=None, queues=None,
-            **options):
+    def apply_async(self, args=None, kwargs=None,
+            task_id=None, publisher=None, connection=None,
+            router=None, queues=None, **options):
         """Apply tasks asynchronously by sending a message.
 
         :keyword args: The positional arguments to pass on to the
@@ -400,12 +399,7 @@ class BaseTask(object):
                           executed after the expiration time.
 
         :keyword connection: Re-use existing broker connection instead
-                             of establishing a new one.  The `connect_timeout`
-                             argument is not respected if this is set.
-
-        :keyword connect_timeout: The timeout in seconds, before we give up
-                                  on establishing a connection to the AMQP
-                                  server.
+                             of establishing a new one.
 
         :keyword retry: If enabled sending of the task message will be retried
                         in the event of connection loss or failure.  Default
@@ -465,13 +459,9 @@ class BaseTask(object):
         conf = self.app.conf
 
         if conf.CELERY_ALWAYS_EAGER:
-            return self.apply(args, kwargs, task_id=task_id)
-
-        options.setdefault("compression",
-                           conf.CELERY_MESSAGE_COMPRESSION)
+            return self.apply(args, kwargs, task_id=task_id, **options)
         options = dict(extract_exec_options(self), **options)
         options = router.route(options, self.name, args, kwargs)
-        expires = expires or self.expires
 
         publish = publisher or self.app.amqp.publisher_pool.acquire(block=True)
         evd = None
@@ -482,8 +472,6 @@ class BaseTask(object):
         try:
             task_id = publish.delay_task(self.name, args, kwargs,
                                          task_id=task_id,
-                                         countdown=countdown,
-                                         eta=eta, expires=expires,
                                          event_dispatcher=evd,
                                          **options)
         finally: