Przeglądaj źródła

itertools.chain.from_iterable new in Py2.6

Added celery.utils.compat.chain_from_iterable.

Thanks to gthb. Closes #133.
Ask Solem 15 lat temu
rodzic
commit
2a4659a067
2 zmienionych plików z 16 dodań i 3 usunięć
  1. 14 0
      celery/utils/compat.py
  2. 2 3
      celery/worker/buckets.py

+ 14 - 0
celery/utils/compat.py

@@ -353,3 +353,17 @@ except ImportError:
                 yield tup
         except IndexError:
             pass
+
+############## itertools.chain.from_iterable ################################
+from itertools import chain
+
+
+def _compat_chain_from_iterable(iterables):
+    for it in iterables:
+        for element in it:
+            yield element
+
+#try:
+#    chain_from_iterable = getattr(chain, "from_iterable")
+#except AttributeError:
+chain_from_iterable = _compat_chain_from_iterable

+ 2 - 3
celery/worker/buckets.py

@@ -1,12 +1,11 @@
 import time
 
 from collections import deque
-from itertools import chain
 from Queue import Queue, Empty as QueueEmpty
 
 from celery.utils import all
 from celery.utils import timeutils
-from celery.utils.compat import izip_longest
+from celery.utils.compat import izip_longest, chain_from_iterable
 
 
 class RateLimitExceeded(Exception):
@@ -180,7 +179,7 @@ class TaskBucket(object):
     def items(self):
         # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
         # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
-        return filter(None, chain.from_iterable(izip_longest(*[bucket.items
+        return filter(None, chain_from_iterable(izip_longest(*[bucket.items
                                     for bucket in self.buckets.values()])))