messaging.py 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. """
  2. Sending and Receiving Messages
  3. """
  4. from celery.app import app_or_default
  5. default_app = app_or_default()
  6. TaskPublisher = default_app.amqp.TaskPublisher
  7. ConsumerSet = default_app.amqp.ConsumerSet
  8. TaskConsumer = default_app.amqp.TaskConsumer
  9. def establish_connection(**kwargs):
  10. """Establish a connection to the message broker."""
  11. # FIXME: # Deprecate
  12. app = app_or_default(kwargs.pop("app", None))
  13. return app.broker_connection(**kwargs)
  14. def with_connection(fun):
  15. """Decorator for providing default message broker connection for functions
  16. supporting the ``connection`` and ``connect_timeout`` keyword
  17. arguments."""
  18. # FIXME: Deprecate!
  19. return default_app.with_default_connection(fun)
  20. def get_consumer_set(connection, queues=None, **options):
  21. """Get the :class:`carrot.messaging.ConsumerSet`` for a queue
  22. configuration.
  23. Defaults to the queues in ``CELERY_QUEUES``.
  24. """
  25. # FIXME: Deprecate!
  26. return default_app.amqp.get_consumer_set(connection, queues, **options)