Explorar el Código

Removes the 'connect_timeout' argument to connection related functions, as use of the pool now makes it impossible to support

Ask Solem hace 13 años
padre
commit
b72faa29b6
Se han modificado 2 ficheros con 6 adiciones y 8 borrados
  1. 4 5
      celery/task/base.py
  2. 2 3
      celery/task/sets.py

+ 4 - 5
celery/task/base.py

@@ -73,7 +73,7 @@ class Task(BaseTask):
         return get_task_logger(self.name)
 
     @classmethod
-    def establish_connection(self, connect_timeout=None):
+    def establish_connection(self):
         """Deprecated method used to get a broker connection.
 
         Should be replaced with :meth:`@Celery.connection`
@@ -89,11 +89,10 @@ class Task(BaseTask):
             with celery.connection() as conn:
                 ...
         """
-        return self._get_app().connection(
-                connect_timeout=connect_timeout)
+        return self._get_app().connection()
 
     def get_publisher(self, connection=None, exchange=None,
-            connect_timeout=None, exchange_type=None, **options):
+            exchange_type=None, **options):
         """Deprecated method to get the task publisher (now called producer).
 
         Should be replaced with :class:`@amqp.TaskProducer`:
@@ -108,7 +107,7 @@ class Task(BaseTask):
         exchange = self.exchange if exchange is None else exchange
         if exchange_type is None:
             exchange_type = self.exchange_type
-        connection = connection or self.establish_connection(connect_timeout)
+        connection = connection or self.establish_connection()
         return self._get_app().amqp.TaskProducer(connection,
                 exchange=exchange and Exchange(exchange, exchange_type),
                 routing_key=self.routing_key, **options)

+ 2 - 3
celery/task/sets.py

@@ -37,15 +37,14 @@ class TaskSet(list):
         self.Publisher = Publisher or self.app.amqp.TaskProducer
         self.total = len(self)  # XXX compat
 
-    def apply_async(self, connection=None, connect_timeout=None,
-            publisher=None, taskset_id=None):
+    def apply_async(self, connection=None, publisher=None, taskset_id=None):
         """Apply TaskSet."""
         app = self.app
 
         if app.conf.CELERY_ALWAYS_EAGER:
             return self.apply(taskset_id=taskset_id)
 
-        with app.default_connection(connection, connect_timeout) as conn:
+        with app.default_connection(connection) as conn:
             setid = taskset_id or uuid()
             pub = publisher or self.Publisher(conn)
             results = self._async_results(setid, pub)