Ask Solem 11 rokov pred
rodič
commit
7625f8c8ed

+ 1 - 1
celery/app/builtins.py

@@ -106,7 +106,7 @@ def add_unlock_chord_task(app):
                 except StopIteration:
                     reason = repr(exc)
                 app.backend.chord_error_from_stack(callback,
-                                                    ChordError(reason))
+                                                   ChordError(reason))
             else:
                 try:
                     callback.delay(ret)

+ 4 - 4
celery/backends/base.py

@@ -508,7 +508,7 @@ class KeyValueStoreBackend(BaseBackend):
             return
         app = self.app
         if propagate is None:
-            propagate = self.app.conf.CELERY_CHORD_PROPAGATES
+            propagate = app.conf.CELERY_CHORD_PROPAGATES
         gid = task.request.group
         if not gid:
             return
@@ -516,7 +516,7 @@ class KeyValueStoreBackend(BaseBackend):
         try:
             deps = GroupResult.restore(gid, backend=task.backend)
         except Exception as exc:
-            callback = maybe_signature(task.request.chord, app=self.app)
+            callback = maybe_signature(task.request.chord, app=app)
             return self.chord_error_from_stack(
                 callback,
                 ChordError('Cannot restore group: {0!r}'.format(exc)),
@@ -525,14 +525,14 @@ class KeyValueStoreBackend(BaseBackend):
             try:
                 raise ValueError(gid)
             except ValueError as exc:
-                callback = maybe_signature(task.request.chord, app=self.app)
+                callback = maybe_signature(task.request.chord, app=app)
                 return self.chord_error_from_stack(
                     callback,
                     ChordError('GroupResult {0} no longer exists'.format(gid)),
                 )
         val = self.incr(key)
         if val >= len(deps):
-            callback = maybe_signature(task.request.chord, app=self.app)
+            callback = maybe_signature(task.request.chord, app=app)
             j = deps.join_native if deps.supports_native_join else deps.join
             try:
                 with allow_join_result():

+ 1 - 0
celery/tests/worker/test_consumer.py

@@ -439,6 +439,7 @@ class test_Gossip(AppCase):
         c.app.events.State.assert_called_with(
             on_node_join=g.on_node_join,
             on_node_leave=g.on_node_leave,
+            max_tasks_in_memory=1,
         )
         g.update_state = Mock()
         worker = Mock()

+ 2 - 2
celery/utils/dispatch/signal.py

@@ -45,8 +45,8 @@ class Signal(object):  # pragma: no cover
 
     def _connect_proxy(self, fun, sender, weak, dispatch_uid):
         return self.connect(
-            fun, sender=sender._get_current_object(), weak=weak,
-                dispatch_uid=dispatch_uid,
+            fun, sender=sender._get_current_object(),
+            weak=weak, dispatch_uid=dispatch_uid,
         )
 
     def connect(self, *args, **kwargs):

+ 2 - 2
celery/worker/job.py

@@ -288,8 +288,8 @@ class Request(object):
                         'hostname': self.hostname, 'is_eager': False,
                         'delivery_info': self.delivery_info})
         retval = trace_task(self.task, self.id, self.args, kwargs, request,
-                            **{'hostname': self.hostname,
-                               'loader': self.app.loader})
+                            hostname=self.hostname, loader=self.app.loader,
+                            app=self.app)
         self.acknowledge()
         return retval