rpc.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.rpc
  4. ~~~~~~~~~~~~~~~~~~~
  5. RPC-style result backend, using reply-to and one queue per client.
  6. """
  7. from __future__ import absolute_import
  8. import kombu
  9. from kombu.common import maybe_declare
  10. from kombu.utils import cached_property
  11. from celery import current_task
  12. from celery.backends import amqp
  13. class RPCBackend(amqp.AMQPBackend):
  14. class Consumer(kombu.Consumer):
  15. auto_declare = False
  16. def _create_exchange(self, name, type='direct', persistent=False):
  17. return self.Exchange('c.rep', type=type, delivery_mode=1,
  18. durable=False, auto_delete=False)
  19. def on_task_call(self, producer, task_id):
  20. maybe_declare(self.binding(producer.channel), retry=True)
  21. def _create_binding(self, task_id):
  22. return self.binding
  23. def _many_bindings(self, ids):
  24. return [self.binding]
  25. def _routing_key(self, task_id):
  26. return current_task.request.reply_to
  27. def on_reply_declare(self, task_id):
  28. pass
  29. @property
  30. def binding(self):
  31. return self.Queue(self.oid, self.exchange, self.oid,
  32. durable=False, auto_delete=False)
  33. @cached_property
  34. def oid(self):
  35. return self.app.oid