Преглед на файлове

ResultSet.iterate did not actually yield results as they finish. This fixes that, but can result in strain when polling resources. It is about time we implemented iter_native for databases. Closes #459

Ask Solem преди 13 години
родител
ревизия
dc6caecdd3
променени са 3 файла, в които са добавени 17 реда и са изтрити 5 реда
  1. 4 0
      celery/backends/base.py
  2. 3 0
      celery/backends/database.py
  3. 10 5
      celery/result.py

+ 4 - 0
celery/backends/base.py

@@ -21,6 +21,10 @@ class BaseBackend(object):
 
     TimeoutError = TimeoutError
 
+    #: Time to sleep between polling each individual item in `ResultSet.iterate`.
+    #: as opposed to the `interval` argument which is for each pass.
+    subpolling_interval = None
+
     def __init__(self, *args, **kwargs):
         from celery.app import app_or_default
         self.app = app_or_default(kwargs.get("app"))

+ 3 - 0
celery/backends/database.py

@@ -21,6 +21,9 @@ _sqlalchemy_installed()
 
 class DatabaseBackend(BaseDictBackend):
     """The database result backend."""
+    # ResultSet.iterate should sleep this much between each pool,
+    # to not bombard the database with queries.
+    subpolling_interval = 0.5
 
     def __init__(self, dburi=None, expires=None,
             engine_options=None, **kwargs):

+ 10 - 5
celery/result.py

@@ -11,6 +11,7 @@ from celery import states
 from celery.app import app_or_default
 from celery.exceptions import TimeoutError
 from celery.registry import _unpickle_task
+from celery.utils.compat import OrderedDict
 
 
 def _unpickle_result(task_id, task_name):
@@ -327,15 +328,19 @@ class ResultSet(object):
 
         """
         elapsed = 0.0
-        results = dict((result.task_id, copy(result))
-                            for result in self.results)
+        results = OrderedDict((result.task_id, copy(result))
+                                for result in self.results)
 
         while results:
             removed = set()
             for task_id, result in results.iteritems():
-                yield result.get(timeout=timeout and timeout - elapsed,
-                                 propagate=propagate, interval=0.0)
-                removed.add(task_id)
+                if result.ready():
+                    yield result.get(timeout=timeout and timeout - elapsed,
+                                     propagate=propagate)
+                    removed.add(task_id)
+                else:
+                    if result.backend.subpolling_interval:
+                        time.sleep(result.backend.subpolling_interval)
             for task_id in removed:
                 results.pop(task_id, None)
             time.sleep(interval)