Kaynağa Gözat

Merge branch 'rpc-fix' into canvas_using_freeze

Ask Solem 12 yıl önce
ebeveyn
işleme
cbd8f56005
4 değiştirilmiş dosya ile 27 ekleme ve 17 silme
  1. 5 0
      celery/app/base.py
  2. 9 14
      celery/backends/rpc.py
  3. 11 3
      celery/canvas.py
  4. 2 0
      celery/task/trace.py

+ 5 - 0
celery/app/base.py

@@ -20,6 +20,7 @@ from operator import attrgetter
 
 from billiard.util import register_after_fork
 from kombu.clocks import LamportClock
+from kombu.common import oid_from
 from kombu.serialization import enable_insecure_serializers
 from kombu.utils import cached_property
 
@@ -545,6 +546,10 @@ class Celery(object):
     def current_task(self):
         return _task_stack.top
 
+    @cached_property
+    def oid(self):
+        return oid_from(self)
+
     @cached_property
     def amqp(self):
         return instantiate(self.amqp_cls, app=self)

+ 9 - 14
celery/backends/rpc.py

@@ -10,16 +10,14 @@ from __future__ import absolute_import
 
 import kombu
 
-from threading import local
-
-from kombu.common import maybe_declare, oid_from
+from kombu.common import maybe_declare
+from kombu.utils import cached_property
 
 from celery import current_task
 from celery.backends import amqp
 
 
 class RPCBackend(amqp.AMQPBackend):
-    _tls = local()
 
     class Consumer(kombu.Consumer):
         auto_declare = False
@@ -32,10 +30,6 @@ class RPCBackend(amqp.AMQPBackend):
         maybe_declare(self.binding(producer.channel), retry=True)
         return self.extra_properties
 
-    @property
-    def extra_properties(self):
-        return {'reply_to': self.oid}
-
     def _create_binding(self, task_id):
         return self.binding
 
@@ -53,10 +47,11 @@ class RPCBackend(amqp.AMQPBackend):
         return self.Queue(self.oid, self.exchange, self.oid,
                           durable=False, auto_delete=False)
 
-    @property
+    @cached_property
     def oid(self):
-        try:
-            return self._tls.OID
-        except AttributeError:
-            oid = self._tls.OID = oid_from(self)
-            return oid
+        return self.app.oid
+
+    @cached_property
+    def extra_properties(self):
+        return {'reply_to': self.oid}
+

+ 11 - 3
celery/canvas.py

@@ -18,7 +18,7 @@ from itertools import chain as _chain
 
 from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 
-from celery._state import current_app
+from celery._state import current_app, get_current_worker_task
 from celery.result import AsyncResult, GroupResult
 from celery.utils.functional import (
     maybe_list, is_list, regen,
@@ -163,13 +163,20 @@ class Signature(dict):
         return s
     partial = clone
 
-    def _freeze(self, _id=None):
+    def freeze(self, _id=None):
         opts = self.options
         try:
             tid = opts['task_id']
         except KeyError:
             tid = opts['task_id'] = _id or uuid()
+        if 'reply_to' not in opts:
+            curtask = get_current_worker_task()
+            if curtask:
+                opts['repy_to'] = curtask.request.reply_to
+            else:
+                opts['reply_to'] = self.type.app.oid
         return self.AsyncResult(tid)
+    _freeze = freeze
 
     def replace(self, args=None, kwargs=None, options=None):
         s = self.clone()
@@ -423,7 +430,7 @@ class group(Signature):
         type = tasks[0].type.app.tasks[self['task']]
         return type(*type.prepare(options, tasks, partial_args))
 
-    def _freeze(self, _id=None):
+    def freeze(self, _id=None):
         opts = self.options
         try:
             gid = opts['group']
@@ -436,6 +443,7 @@ class group(Signature):
             new_tasks.append(task)
         self.tasks = self.kwargs['tasks'] = new_tasks
         return GroupResult(gid, results)
+    _freeze = freeze
 
     def skew(self, start=1.0, stop=None, step=1.0):
         it = fxrange(start, stop, step, repeatlast=True)

+ 2 - 0
celery/task/trace.py

@@ -227,6 +227,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 else:
                     # callback tasks must be applied before the result is
                     # stored, so that result.children is populated.
+                    print('CALLBACK OPTS: %r' % ([callback.options for
+                        callback in task_request.callbacks or []], ))
                     [subtask(callback).apply_async((retval, ))
                         for callback in task_request.callbacks or []]
                     if publish_result: