Преглед на файлове

All methods using connections now use the connection pool/publisher pool by default

Ask Solem преди 14 години
родител
ревизия
d97051dadc
променени са 5 файла, в които са добавени 55 реда и са изтрити 43 реда
  1. 23 9
      celery/app/amqp.py
  2. 23 2
      celery/app/base.py
  3. 3 3
      celery/app/defaults.py
  4. 4 24
      celery/backends/amqp.py
  5. 2 5
      celery/task/base.py

+ 23 - 9
celery/app/amqp.py

@@ -183,6 +183,7 @@ class TaskPublisher(messaging.Publisher):
             event_dispatcher=None, retry=None, retry_policy=None,
             queue=None, now=None, retries=0, **kwargs):
         """Send task message."""
+
         connection = self.connection
         _retry_policy = self.retry_policy
         if retry_policy:  # merge default and custom policy
@@ -248,14 +249,15 @@ class TaskPublisher(messaging.Publisher):
 
 class PublisherPool(Resource):
 
-    def __init__(self, limit=None, app=None):
+    def __init__(self, app=None):
         self.app = app
-        self.connections = self.app.broker_connection().Pool(limit=limit)
-        super(PublisherPool, self).__init__(limit=limit)
+        super(PublisherPool, self).__init__(limit=self.app.pool.limit)
 
     def create_publisher(self):
-        return self.app.amqp.TaskPublisher(self.connections.acquire(),
-                                           auto_declare=False)
+        conn = self.app.pool.acquire(block=True)
+        pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
+        conn._publisher_chan = pub.channel
+        return pub
 
     def new(self):
         return promise(self.create_publisher)
@@ -266,7 +268,18 @@ class PublisherPool(Resource):
                 self._resource.put_nowait(self.new())
 
     def prepare(self, publisher):
-        return maybe_promise(publisher)
+        pub = maybe_promise(publisher)
+        if not pub.connection:
+            pub.connection = self.app.pool.acquire(block=True)
+            if not getattr(pub.connection, "_publisher_chan", None):
+                pub.connection._publisher_chan = pub.connection.channel()
+            pub.revive(pub.connection._publisher_chan)
+        return pub
+
+    def release(self, resource):
+        resource.connection.release()
+        resource.connection = None
+        super(PublisherPool, self).release(resource)
 
 
 class AMQP(object):
@@ -327,9 +340,6 @@ class AMQP(object):
                     "app": self}
         return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
 
-    def PublisherPool(self, limit=None):
-        return PublisherPool(limit=limit, app=self.app)
-
     def get_task_consumer(self, connection, queues=None, **kwargs):
         """Return consumer configured to consume from all known task
         queues."""
@@ -353,3 +363,7 @@ class AMQP(object):
         if self._rtable is None:
             self.flush_routes()
         return self._rtable
+
+    @cached_property
+    def publisher_pool(self):
+        return PublisherPool(app=self.app)

+ 23 - 2
celery/app/base.py

@@ -8,6 +8,7 @@ Application Base Class.
 :license: BSD, see LICENSE for more details.
 
 """
+import os
 import platform as _platform
 
 from copy import deepcopy
@@ -33,6 +34,8 @@ class BaseApp(object):
     log_cls = "celery.log.Logging"
     control_cls = "celery.task.control.Control"
 
+    _pool = None
+
     def __init__(self, main=None, loader=None, backend=None,
             amqp=None, events=None, log=None, control=None,
             set_as_current=True, accept_magic_kwargs=False):
@@ -184,8 +187,8 @@ class BaseApp(object):
             connection = kwargs.get("connection")
             timeout = kwargs.get("connect_timeout")
             kwargs["connection"] = conn = connection or \
-                    self.broker_connection(connect_timeout=timeout)
-            close_connection = not connection and conn.close or None
+                    self.pool.acquire(block=True)
+            close_connection = not connection and conn.release or None
 
             try:
                 return fun(*args, **kwargs)
@@ -241,6 +244,24 @@ class BaseApp(object):
         return ConfigurationView({},
                 [self.prepare_config(self.loader.conf), deepcopy(DEFAULTS)])
 
+    def _set_pool(self):
+        self._pool = self.broker_connection().Pool(2)
+        self._pool.owner_pid = os.getpid()
+
+    def _reset_after_fork(self):
+        if self._pool:
+            self._pool.force_close_all()
+
+    @property
+    def pool(self):
+        if self._pool is None:
+            self._set_pool()
+        elif os.getpid() != self._pool.owner_pid:
+            print("-- RESET POOL AFTER FORK -- ")
+            self._reset_after_fork()
+            self._set_pool()
+        return self._pool
+
     @cached_property
     def amqp(self):
         """Sending/receiving messages.  See :class:`~celery.app.amqp.AMQP`."""

+ 3 - 3
celery/app/defaults.py

@@ -90,11 +90,11 @@ NAMESPACES = {
         "SEND_TASK_SENT_EVENT": Option(False, type="bool"),
         "STORE_ERRORS_EVEN_IF_IGNORED": Option(False, type="bool"),
         "TASK_ERROR_WHITELIST": Option((), type="tuple"),
-        "TASK_PUBLISH_RETRY": Option(False, type="bool"),
+        "TASK_PUBLISH_RETRY": Option(True, type="bool"),
         "TASK_PUBLISH_RETRY_POLICY": Option({
-                "max_retries": 3,
+                "max_retries": 100,
                 "interval_start": 0,
-                "interval_max": 0.2,
+                "interval_max": 1,
                 "interval_step": 0.2}, type="dict"),
         "TASK_RESULT_EXPIRES": Option(timedelta(days=1), type="int"),
         "TASK_SERIALIZER": Option("pickle"),

+ 4 - 24
celery/backends/amqp.py

@@ -36,9 +36,6 @@ class AMQPBackend(BaseDictBackend):
 
     BacklogLimitExceeded = BacklogLimitExceeded
 
-    _pool = None
-    _pool_owner_pid = None
-
     def __init__(self, connection=None, exchange=None, exchange_type=None,
             persistent=None, serializer=None, auto_delete=True,
             expires=None, connection_max=None, **kwargs):
@@ -110,7 +107,7 @@ class AMQPBackend(BaseDictBackend):
         """Send task return value and status."""
         self.mutex.acquire()
         try:
-            conn = self.pool.acquire(block=True)
+            conn = self.app.pool.acquire(block=True)
             try:
 
                 def errback(error, delay):
@@ -160,7 +157,7 @@ class AMQPBackend(BaseDictBackend):
             return self.wait_for(task_id, timeout, cache)
 
     def poll(self, task_id, backlog_limit=100):
-        conn = self.pool.acquire(block=True)
+        conn = self.app.pool.acquire(block=True)
         channel = conn.channel()
         try:
             binding = self._create_binding(task_id)(channel)
@@ -204,7 +201,7 @@ class AMQPBackend(BaseDictBackend):
         return results
 
     def consume(self, task_id, timeout=None):
-        conn = self.pool.acquire(block=True)
+        conn = self.app.pool.acquire(block=True)
         channel = conn.channel()
         try:
             binding = self._create_binding(task_id)
@@ -219,7 +216,7 @@ class AMQPBackend(BaseDictBackend):
             conn.release()
 
     def get_many(self, task_ids, timeout=None):
-        conn = self.pool.acquire(block=True)
+        conn = self.app.pool.acquire(block=True)
         channel = conn.channel()
         try:
             ids = set(task_ids)
@@ -274,20 +271,3 @@ class AMQPBackend(BaseDictBackend):
         """Get the result of a taskset."""
         raise NotImplementedError(
                 "restore_taskset is not supported by this backend.")
-
-    def _set_pool(self):
-        self._pool = self.app.broker_connection().Pool(self.connection_max)
-        self._pool_owner_pid = os.getpid()
-
-    def _reset_after_fork(self):
-        self._pool.force_close_all()
-
-    @property
-    def pool(self):
-        if self._pool is None:
-            self._set_pool()
-        elif os.getpid() != self._pool_owner_pid:
-            print("--- RESET POOL AFTER FORK --- ")
-            self._reset_after_fork()
-            self._set_pool()
-        return self._pool

+ 2 - 5
celery/task/base.py

@@ -436,9 +436,7 @@ class BaseTask(object):
         exchange_type = options.get("exchange_type")
         expires = expires or self.expires
 
-        publish = publisher or self.get_publisher(connection,
-                                                  exchange=exchange,
-                                                  exchange_type=exchange_type)
+        publish = publisher or self.app.amqp.publisher_pool.acquire(block=True)
         evd = None
         if conf.CELERY_SEND_TASK_SENT_EVENT:
             evd = self.app.events.Dispatcher(channel=publish.channel,
@@ -453,8 +451,7 @@ class BaseTask(object):
                                          **options)
         finally:
             if not publisher:
-                publish.close()
-                publish.connection.close()
+                publish.release()
 
         return self.AsyncResult(task_id)