瀏覽代碼

Closes #2326 for master branch

Ask Solem 10 年之前
父節點
當前提交
fab0b11e54
共有 2 個文件被更改,包括 13 次插入9 次删除
  1. 12 8
      celery/backends/base.py
  2. 1 1
      celery/worker/autoscale.py

+ 12 - 8
celery/backends/base.py

@@ -432,17 +432,22 @@ class KeyValueStoreBackend(BaseBackend):
                 return bytes_to_str(key[len(prefix):])
                 return bytes_to_str(key[len(prefix):])
         return bytes_to_str(key)
         return bytes_to_str(key)
 
 
+    def _filter_ready(self, values, READY_STATES=states.READY_STATES):
+        for k, v in values:
+            if v is not None:
+                v = self.decode_result(v)
+                if v['status'] in READY_STATES:
+                    yield k, v
+
     def _mget_to_results(self, values, keys):
     def _mget_to_results(self, values, keys):
         if hasattr(values, 'items'):
         if hasattr(values, 'items'):
             # client returns dict so mapping preserved.
             # client returns dict so mapping preserved.
-            return dict((self._strip_prefix(k), self.decode_result(v))
-                        for k, v in items(values)
-                        if v is not None)
+            return dict((self._strip_prefix(k), v)
+                        for k, v in self._filter_ready(items(values)))
         else:
         else:
             # client returns list so need to recreate mapping.
             # client returns list so need to recreate mapping.
-            return dict((bytes_to_str(keys[i]), self.decode_result(value))
-                        for i, value in enumerate(values)
-                        if value is not None)
+            return dict((bytes_to_str(keys[i]), v)
+                        for i, v in self._filter_ready(enumerate(values)))
 
 
     def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
     def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
                  READY_STATES=states.READY_STATES):
                  READY_STATES=states.READY_STATES):
@@ -469,8 +474,7 @@ class KeyValueStoreBackend(BaseBackend):
             cache.update(r)
             cache.update(r)
             ids.difference_update(set(bytes_to_str(v) for v in r))
             ids.difference_update(set(bytes_to_str(v) for v in r))
             for key, value in items(r):
             for key, value in items(r):
-                if value['status'] in READY_STATES:
-                    yield bytes_to_str(key), value
+                yield bytes_to_str(key), value
             if timeout and iterations * interval >= timeout:
             if timeout and iterations * interval >= timeout:
                 raise TimeoutError('Operation timed out ({0})'.format(timeout))
                 raise TimeoutError('Operation timed out ({0})'.format(timeout))
             time.sleep(interval)  # don't busy loop.
             time.sleep(interval)  # don't busy loop.

+ 1 - 1
celery/worker/autoscale.py

@@ -81,7 +81,7 @@ class Autoscaler(bgThread):
             self.maybe_scale()
             self.maybe_scale()
         sleep(1.0)
         sleep(1.0)
 
 
-    def _maybe_scale(self):
+    def _maybe_scale(self, req=None):
         procs = self.processes
         procs = self.processes
         cur = min(self.qty, self.max_concurrency)
         cur = min(self.qty, self.max_concurrency)
         if cur > procs:
         if cur > procs: