rpc.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.rpc
  4. ~~~~~~~~~~~~~~~~~~~
  5. RPC-style result backend, using reply-to and one queue per client.
  6. """
  7. from __future__ import absolute_import
  8. import kombu
  9. from kombu.common import maybe_declare
  10. from kombu.utils import cached_property
  11. from celery import current_task
  12. from celery.backends import amqp
  13. __all__ = ['RPCBackend']
  14. class RPCBackend(amqp.AMQPBackend):
  15. class Consumer(kombu.Consumer):
  16. auto_declare = False
  17. def _create_exchange(self, name, type='direct', persistent=False):
  18. return self.Exchange('c.rep', type=type, delivery_mode=1,
  19. durable=False, auto_delete=False)
  20. def on_task_call(self, producer, task_id):
  21. maybe_declare(self.binding(producer.channel), retry=True)
  22. def _create_binding(self, task_id):
  23. return self.binding
  24. def _many_bindings(self, ids):
  25. return [self.binding]
  26. def _routing_key(self, task_id):
  27. task = current_task._get_current_object()
  28. if task is not None:
  29. return task.request.reply_to
  30. def on_reply_declare(self, task_id):
  31. pass
  32. @property
  33. def binding(self):
  34. return self.Queue(self.oid, self.exchange, self.oid,
  35. durable=False, auto_delete=False)
  36. @cached_property
  37. def oid(self):
  38. return self.app.oid