Browse Source

Fixes weird traceback issues with connection_or_acquire + producer_or_acquire

Ask Solem 11 năm trước cách đây
mục cha
commit
6c08d2acdf
2 tập tin đã thay đổi với 58 bổ sung19 xóa
  1. 13 19
      celery/app/base.py
  2. 45 0
      celery/utils/objects.py

+ 13 - 19
celery/app/base.py

@@ -9,6 +9,7 @@
 from __future__ import absolute_import
 
 import os
+import sys
 import threading
 import warnings
 
@@ -37,7 +38,7 @@ from celery.local import PromiseProxy, maybe_evaluate
 from celery.utils.dispatch import Signal
 from celery.utils.functional import first, maybe_list
 from celery.utils.imports import instantiate, symbol_by_name
-from celery.utils.objects import mro_lookup
+from celery.utils.objects import FallbackContext, mro_lookup
 
 from .annotations import prepare as prepare_annotations
 from .defaults import DEFAULTS, find_deprecated_settings
@@ -411,27 +412,20 @@ class Celery(object):
         )
     broker_connection = connection
 
-    @contextmanager
-    def connection_or_acquire(self, connection=None, pool=True,
-                              *args, **kwargs):
-        if connection:
-            yield connection
-        else:
-            if pool:
-                with self.pool.acquire(block=True) as connection:
-                    yield connection
-            else:
-                with self.connection() as connection:
-                    yield connection
+    def _acquire_connection(self, pool=True):
+        """Helper for :meth:`connection_or_acquire`."""
+        if pool:
+            return self.pool.acquire(block=True)
+        return self.connection()
+
+    def connection_or_acquire(self, connection=None, pool=True, *_, **__):
+        return FallbackContext(connection, self._acquire_connection, pool=pool)
     default_connection = connection_or_acquire  # XXX compat
 
-    @contextmanager
     def producer_or_acquire(self, producer=None):
-        if producer:
-            yield producer
-        else:
-            with self.amqp.producer_pool.acquire(block=True) as producer:
-                yield producer
+        return FallbackContext(
+            producer, self.amqp.producer_pool.acquire, block=True,
+        )
     default_producer = producer_or_acquire  # XXX compat
 
     def prepare_config(self, c):

+ 45 - 0
celery/utils/objects.py

@@ -35,3 +35,48 @@ def mro_lookup(cls, attr, stop=(), monkey_patched=[]):
             return
         if attr in node.__dict__:
             return node
+
+
+class FallbackContext(object):
+    """The built-in ``@contextmanager`` utility does not work well
+    when wrapping other contexts, as the traceback is wrong when
+    the wrapped context raises.
+
+    This solves this problem and can be used instead of ``@contextmanager``
+    in this example::
+
+        @contextmanager
+        def connection_or_default_connection(connection=None):
+            if connection:
+                # user already has a connection, should not close
+                # after use
+                yield connection
+            else:
+                # must have new connection, and also close the connection
+                # after the block returns
+                with create_new_connection() as connection:
+                    yield connection
+
+    This wrapper can be used instead for the above like this::
+
+        def connection_or_default_connection(connection=None):
+            return FallbackContext(connection, create_new_connection)
+
+    """
+
+    def __init__(self, provided, fallback, *fb_args, **fb_kwargs):
+        self.provided = provided
+        self.fallback = fallback
+        self.fb_args = fb_args
+        self.fb_kwargs = fb_kwargs
+        self._context = None
+
+    def __enter__(self):
+        if self.provided is not None:
+            return self.provided
+        context = self._context = self.fallback(*self.fb_args, **self.fb_kwargs).__enter__()
+        return context
+
+    def __exit__(self, *exc_info):
+        if self._context is not None:
+            return self._context.__exit__(*exc_info)