rpc.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.backends.rpc
  4. ~~~~~~~~~~~~~~~~~~~
  5. RPC-style result backend, using reply-to and one queue per client.
  6. """
  7. from __future__ import absolute_import
  8. from kombu import Consumer, Exchange
  9. from kombu.common import maybe_declare
  10. from kombu.utils import cached_property
  11. from celery import current_task
  12. from celery._state import task_join_will_block
  13. from celery.backends import amqp
  14. __all__ = ['RPCBackend']
  15. class RPCBackend(amqp.AMQPBackend):
  16. persistent = False
  17. class Consumer(Consumer):
  18. auto_declare = False
  19. def _create_exchange(self, name, type='direct', delivery_mode=2):
  20. # uses direct to queue routing (anon exchange).
  21. return Exchange(None)
  22. def on_task_call(self, producer, task_id):
  23. if not task_join_will_block():
  24. maybe_declare(self.binding(producer.channel), retry=True)
  25. def _create_binding(self, task_id):
  26. return self.binding
  27. def _many_bindings(self, ids):
  28. return [self.binding]
  29. def rkey(self, task_id):
  30. return task_id
  31. def destination_for(self, task_id, request):
  32. # Request is a new argument for backends, so must still support
  33. # old code that rely on current_task
  34. try:
  35. request = request or current_task.request
  36. except AttributeError:
  37. raise RuntimeError(
  38. 'RPC backend missing task request for {0!r}'.format(task_id),
  39. )
  40. return request.reply_to, request.correlation_id or task_id
  41. def on_reply_declare(self, task_id):
  42. pass
  43. def on_result_fulfilled(self, result):
  44. pass
  45. def as_uri(self, include_password=True):
  46. return 'rpc://'
  47. @property
  48. def binding(self):
  49. return self.Queue(self.oid, self.exchange, self.oid,
  50. durable=False, auto_delete=True)
  51. @cached_property
  52. def oid(self):
  53. return self.app.oid