Explorar o código

Merge branch 'master' of github.com:celery/celery

Ask Solem %!s(int64=9) %!d(string=hai) anos
pai
achega
4d5486dbef

+ 4 - 5
.travis.yml

@@ -1,4 +1,8 @@
 language: python
+sudo: false
+cache:
+  directories:
+    - $HOME/.cache/pip
 python: 2.7
 env:
   global:
@@ -8,11 +12,6 @@ env:
     - TOXENV=3.3
     - TOXENV=3.4
     - TOXENV=pypy
-before_install:
-  - |
-    python --version
-    uname -a
-    lsb_release -a
 install:
   - pip install tox
 script:

+ 1 - 0
celery/app/amqp.py

@@ -371,6 +371,7 @@ class AMQP(object):
                 'id': task_id,
                 'args': args,
                 'kwargs': kwargs,
+                'group': group_id,
                 'retries': retries,
                 'eta': eta,
                 'expires': expires,

+ 7 - 3
celery/backends/base.py

@@ -357,8 +357,8 @@ class BaseBackend(object):
 
     def apply_chord(self, header, partial_args, group_id, body,
                     options={}, **kwargs):
-        options['task_id'] = group_id
-        result = header(*partial_args, **options or {})
+        fixed_options = {k: v for k,v in options.items() if k!='task_id'}
+        result = header(*partial_args, task_id=group_id, **fixed_options or {})
         self.fallback_chord_unlock(group_id, body, **kwargs)
         return result
 
@@ -534,7 +534,11 @@ class KeyValueStoreBackend(BaseBackend):
     def _apply_chord_incr(self, header, partial_args, group_id, body,
                           result=None, options={}, **kwargs):
         self.save_group(group_id, self.app.GroupResult(group_id, result))
-        return header(*partial_args, task_id=group_id, **options or {})
+
+        fixed_options = {k: v for k,v in options.items() if k != 'task_id'}
+
+        return header(*partial_args, task_id=group_id, **fixed_options or {})
+
 
     def on_chord_part_return(self, task, state, result, propagate=None):
         if not self.implements_incr:

+ 4 - 4
celery/canvas.py

@@ -474,9 +474,9 @@ class chain(Signature):
             if link_error:
                 task.set(link_error=link_error)
 
-            if not isinstance(prev_task, chord):
-                results.append(res)
-                tasks.append(task)
+            tasks.append(task)
+            results.append(res)
+
             prev_task, prev_res = task, res
 
         return tasks, results
@@ -603,7 +603,7 @@ def _maybe_group(tasks):
     elif isinstance(tasks, Signature):
         tasks = [tasks]
     else:
-        tasks = regen(tasks)
+        tasks = map(signature, regen(tasks))
     return tasks
 
 

+ 1 - 1
celery/tests/utils/test_mail.py

@@ -46,7 +46,7 @@ class test_Mailer(Case):
         mailer = Mailer(use_ssl=False, use_tls=False)
         mailer._send(msg)
 
-        client.sendmail.assert_called_With(msg.sender, msg.to, str(msg))
+        client.sendmail.assert_called_with(msg.sender, msg.to, str(msg))
 
         client.quit.side_effect = SSLError()
         mailer._send(msg)

+ 3 - 3
celery/tests/worker/test_autoscale.py

@@ -107,7 +107,7 @@ class test_Autoscaler(AppCase):
         state.reserved_requests.clear()
         x.body()
         self.assertEqual(x.pool.num_processes, 10)
-        x._last_action = monotonic() - 10000
+        x._last_scale_up = monotonic() - 10000
         x.body()
         self.assertEqual(x.pool.num_processes, 3)
         self.assertTrue(worker.consumer._update_prefetch_count.called)
@@ -141,7 +141,7 @@ class test_Autoscaler(AppCase):
         worker = Mock(name='worker')
         x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
         x.scale_up(3)
-        x._last_action = monotonic() - 10000
+        x._last_scale_up = monotonic() - 10000
         x.pool.shrink_raises_ValueError = True
         x.scale_down(1)
         self.assertTrue(debug.call_count)
@@ -156,7 +156,7 @@ class test_Autoscaler(AppCase):
         self.assertEqual(x.processes, 5)
         x.force_scale_down(3)
         self.assertEqual(x.processes, 2)
-        x.update(3, None)
+        x.update(None, 3)
         self.assertEqual(x.processes, 3)
         x.force_scale_down(1000)
         self.assertEqual(x.min_concurrency, 0)

+ 10 - 11
celery/worker/autoscale.py

@@ -71,7 +71,7 @@ class Autoscaler(bgThread):
         self.max_concurrency = max_concurrency
         self.min_concurrency = min_concurrency
         self.keepalive = keepalive
-        self._last_action = None
+        self._last_scale_up = None
         self.worker = worker
 
         assert self.keepalive, 'cannot scale down too fast.'
@@ -87,8 +87,9 @@ class Autoscaler(bgThread):
         if cur > procs:
             self.scale_up(cur - procs)
             return True
-        elif cur < procs:
-            self.scale_down((procs - cur) - self.min_concurrency)
+        cur = max(self.qty, self.min_concurrency)
+        if cur < procs:
+            self.scale_down(procs - cur)
             return True
 
     def maybe_scale(self, req=None):
@@ -98,12 +99,12 @@ class Autoscaler(bgThread):
     def update(self, max=None, min=None):
         with self.mutex:
             if max is not None:
-                if max < self.max_concurrency:
+                if max < self.processes:
                     self._shrink(self.processes - max)
                 self.max_concurrency = max
             if min is not None:
-                if min > self.min_concurrency:
-                    self._grow(min - self.min_concurrency)
+                if min > self.processes:
+                    self._grow(min - self.processes)
                 self.min_concurrency = min
             return self.max_concurrency, self.min_concurrency
 
@@ -112,7 +113,6 @@ class Autoscaler(bgThread):
             new = self.processes + n
             if new > self.max_concurrency:
                 self.max_concurrency = new
-            self.min_concurrency += 1
             self._grow(n)
 
     def force_scale_down(self, n):
@@ -123,13 +123,12 @@ class Autoscaler(bgThread):
             self._shrink(min(n, self.processes))
 
     def scale_up(self, n):
-        self._last_action = monotonic()
+        self._last_scale_up = monotonic()
         return self._grow(n)
 
     def scale_down(self, n):
-        if n and self._last_action and (
-                monotonic() - self._last_action > self.keepalive):
-            self._last_action = monotonic()
+        if self._last_scale_up and (
+                monotonic() - self._last_scale_up > self.keepalive):
             return self._shrink(n)
 
     def _grow(self, n):