rpc.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.rpc
  4. ~~~~~~~~~~~~~~~~~~~
  5. RPC-style result backend, using reply-to and one queue per client.
  6. """
  7. from __future__ import absolute_import
  8. import kombu
  9. from threading import local
  10. from kombu.common import maybe_declare, oid_from
  11. from celery import current_task
  12. from celery.backends import amqp
  13. class RPCBackend(amqp.AMQPBackend):
  14. _tls = local()
  15. class Consumer(kombu.Consumer):
  16. auto_declare = False
  17. def _create_exchange(self, name, type='direct', persistent=False):
  18. return self.Exchange('c.rep', type=type, delivery_mode=1,
  19. durable=False, auto_delete=False)
  20. def on_task_call(self, producer, task_id):
  21. maybe_declare(self.binding(producer.channel), retry=True)
  22. return self.extra_properties
  23. @property
  24. def extra_properties(self):
  25. return {'reply_to': self.oid}
  26. def _create_binding(self, task_id):
  27. return self.binding
  28. def _many_bindings(self, ids):
  29. return [self.binding]
  30. def _routing_key(self, task_id):
  31. return current_task.request.reply_to
  32. def on_reply_declare(self, task_id):
  33. pass
  34. @property
  35. def binding(self):
  36. return self.Queue(self.oid, self.exchange, self.oid,
  37. durable=False, auto_delete=False)
  38. @property
  39. def oid(self):
  40. try:
  41. return self._tls.OID
  42. except AttributeError:
  43. oid = self._tls.OID = oid_from(self)
  44. return oid