Jelajahi Sumber

[amqrpc] Fixing the rpc backend. Now also works with Redis

Ask Solem 12 tahun lalu
induk
melakukan
2019dd641f
5 mengubah file dengan 13 tambahan dan 21 penghapusan
  1. 1 1
      celery/app/task.py
  2. 4 1
      celery/backends/amqp.py
  3. 8 17
      celery/backends/amqrpc.py
  4. 0 1
      celery/beat.py
  5. 0 1
      celery/concurrency/gevent.py

+ 1 - 1
celery/app/task.py

@@ -474,7 +474,7 @@ class Task(object):
                 evd = app.events.Dispatcher(channel=P.channel,
                                             buffer_while_offline=False)
 
-            extra_properties = self.backend.on_task_call(producer, task_id)
+            extra_properties = self.backend.on_task_call(P, task_id)
             task_id = P.publish_task(self.name, args, kwargs,
                                      task_id=task_id,
                                      event_dispatcher=evd,

+ 4 - 1
celery/backends/amqp.py

@@ -115,9 +115,12 @@ class AMQPBackend(BaseBackend):
                             routing_key=self._routing_key(task_id),
                             serializer=self.serializer,
                             retry=True, retry_policy=self.retry_policy,
-                            declare=[self._create_binding(task_id)])
+                            declare=self.on_reply_declare(task_id))
         return result
 
+    def on_reply_declare(self, task_id):
+        return [self._create_binding(task_id)]
+
     def wait_for(self, task_id, timeout=None, cache=True, propagate=True,
             **kwargs):
         cached_meta = self._cache.get(task_id)

+ 8 - 17
celery/backends/amqrpc.py

@@ -7,26 +7,16 @@
 
 """
 from __future__ import absolute_import
-from __future__ import with_statement
 
 import kombu
-import os
-import uuid
 
 from threading import local
 
-from kombu.common import maybe_declare
-from celery.backends import amqp
-
-try:
-    from thread import get_ident            # noqa
-except ImportError:                         # pragma: no cover
-    try:
-        from dummy_thread import get_ident  # noqa
-    except ImportError:                     # pragma: no cover
-        from _thread import get_ident       # noqa
+from kombu.common import maybe_declare, oid_from
+from kombu.utils import cached_property
 
-_nodeid = uuid.getnode()
+from celery import current_task
+from celery.backends import amqp
 
 
 class AMQRPCBackend(amqp.AMQPBackend):
@@ -50,9 +40,11 @@ class AMQRPCBackend(amqp.AMQPBackend):
         return [self.binding]
 
     def _routing_key(self, task_id):
-        from celery import current_task
         return current_task.request.reply_to
 
+    def on_reply_declare(self, task_id):
+        pass
+
     @property
     def binding(self):
         return self.Queue(self.oid, self.exchange, self.oid,
@@ -63,6 +55,5 @@ class AMQRPCBackend(amqp.AMQPBackend):
         try:
             return self._tls.OID
         except AttributeError:
-            ent = '%x-%x-%x' % (_nodeid, os.getpid(), get_ident())
-            oid = self._tls.OID = str(uuid.uuid3(uuid.NAMESPACE_OID, ent))
+            oid = self._tls.OID = oid_from(self)
             return oid

+ 0 - 1
celery/beat.py

@@ -7,7 +7,6 @@
 
 """
 from __future__ import absolute_import
-from __future__ import with_statement
 
 import errno
 import os

+ 0 - 1
celery/concurrency/gevent.py

@@ -7,7 +7,6 @@
 
 """
 from __future__ import absolute_import
-from __future__ import with_statement
 
 import os