Ask Solem hace 12 años
padre
commit
936dd9e66f
Se han modificado 2 ficheros con 31 adiciones y 13 borrados
  1. 28 13
      celery/_state.py
  2. 3 0
      celery/tests/app/test_app.py

+ 28 - 13
celery/_state.py

@@ -19,6 +19,31 @@ import weakref
 from celery.local import Proxy
 from celery.utils.threads import LocalStack
 
+try:
+    from weakref import WeakSet as AppSet
+except ImportError:  # XXX Py2.6
+
+    class AppSet(object):  # noqa
+
+        def __init__(self):
+            self._refs = set()
+
+        def add(self, app):
+            self._refs.add(weakref.ref(app))
+
+        def __iter__(self):
+            dirty = []
+            try:
+                for appref in self._refs:
+                    app = appref()
+                    if app is None:
+                        dirty.append(appref)
+                    else:
+                        yield app
+            finally:
+                while dirty:
+                    self._refs.discard(dirty.pop())
+
 __all__ = ['set_default_app', 'get_current_app', 'get_current_task',
            'get_current_worker_task', 'current_app', 'current_task']
 
@@ -26,7 +51,7 @@ __all__ = ['set_default_app', 'get_current_app', 'get_current_task',
 default_app = None
 
 #: List of all app instances (weakrefs), must not be used directly.
-_apps = set()
+_apps = AppSet()
 
 _task_join_will_block = False
 
@@ -104,18 +129,8 @@ current_task = Proxy(get_current_task)
 
 
 def _register_app(app):
-    _apps.add(weakref.ref(app))
+    _apps.add(app)
 
 
 def _get_active_apps():
-    dirty = []
-    try:
-        for appref in _apps:
-            app = appref()
-            if app is None:
-                dirty.append(appref)
-            else:
-                yield app
-    finally:
-        while dirty:
-            _apps.discard(dirty.pop())
+    return _apps

+ 3 - 0
celery/tests/app/test_app.py

@@ -1,5 +1,6 @@
 from __future__ import absolute_import
 
+import gc
 import os
 import itertools
 
@@ -426,6 +427,8 @@ class test_App(AppCase):
         app1.close()
         del(app1)
 
+        gc.collect()
+
         # weakref removed from list when app goes out of scope.
         with self.assertRaises(StopIteration):
             next(app for app in _state._get_active_apps() if id(app) == appid)