|
@@ -20,17 +20,13 @@ from celery.five import items, range
|
|
|
from . import base
|
|
|
from .async import AsyncBackendMixin, BaseResultConsumer
|
|
|
|
|
|
-__all__ = ['BacklogLimitExceeded', 'BaseRPCBackend', 'RPCBackend']
|
|
|
+__all__ = ['BacklogLimitExceeded', 'RPCBackend']
|
|
|
|
|
|
|
|
|
class BacklogLimitExceeded(Exception):
|
|
|
"""Too much state history to fast-forward."""
|
|
|
|
|
|
|
|
|
-class NoCacheQueue(kombu.Queue):
|
|
|
- can_cache_declaration = False
|
|
|
-
|
|
|
-
|
|
|
def _on_after_fork_cleanup_backend(backend):
|
|
|
backend._after_fork()
|
|
|
|
|
@@ -85,12 +81,10 @@ class ResultConsumer(BaseResultConsumer):
|
|
|
self._consumer.cancel_by_queue(self._create_binding(task_id).name)
|
|
|
|
|
|
|
|
|
-class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
+class RPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
"""Base class for the RPC result backend."""
|
|
|
|
|
|
Exchange = kombu.Exchange
|
|
|
- Queue = NoCacheQueue
|
|
|
- Consumer = kombu.Consumer
|
|
|
Producer = kombu.Producer
|
|
|
ResultConsumer = ResultConsumer
|
|
|
|
|
@@ -108,9 +102,19 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
'interval_max': 1,
|
|
|
}
|
|
|
|
|
|
+ class Consumer(kombu.Consumer):
|
|
|
+ """Consumer that requires manual declaration of queues."""
|
|
|
+
|
|
|
+ auto_declare = False
|
|
|
+
|
|
|
+ class Queue(kombu.Queue):
|
|
|
+ """Queue that never caches declaration."""
|
|
|
+
|
|
|
+ can_cache_declaration = False
|
|
|
+
|
|
|
def __init__(self, app, connection=None, exchange=None, exchange_type=None,
|
|
|
persistent=None, serializer=None, auto_delete=True, **kwargs):
|
|
|
- super(BaseRPCBackend, self).__init__(app, **kwargs)
|
|
|
+ super(RPCBackend, self).__init__(app, **kwargs)
|
|
|
conf = self.app.conf
|
|
|
self._connection = connection
|
|
|
self._out_of_band = {}
|
|
@@ -135,6 +139,52 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
self._pending_results.clear()
|
|
|
self.result_consumer._after_fork()
|
|
|
|
|
|
+ def _create_exchange(self, name, type='direct', delivery_mode=2):
|
|
|
+ # uses direct to queue routing (anon exchange).
|
|
|
+ return self.Exchange(None)
|
|
|
+
|
|
|
+ def _create_binding(self, task_id):
|
|
|
+ """Create new binding for task with id."""
|
|
|
+ # RPC backend caches the binding, as one queue is used for all tasks.
|
|
|
+ return self.binding
|
|
|
+
|
|
|
+ def on_task_call(self, producer, task_id):
|
|
|
+ # Called every time a task is sent when using this backend.
|
|
|
+ # We declare the queue we receive replies on in advance of sending
|
|
|
+ # the message, but we skip this if running in the prefork pool
|
|
|
+ # (task_join_will_block), as we know the queue is already declared.
|
|
|
+ if not task_join_will_block():
|
|
|
+ maybe_declare(self.binding(producer.channel), retry=True)
|
|
|
+
|
|
|
+ def destination_for(self, task_id, request):
|
|
|
+ """Get the destination for result by task id.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple[str, str]: tuple of ``(reply_to, correlation_id)``.
|
|
|
+ """
|
|
|
+ # Backends didn't always receive the `request`, so we must still
|
|
|
+ # support old code that relies on current_task.
|
|
|
+ try:
|
|
|
+ request = request or current_task.request
|
|
|
+ except AttributeError:
|
|
|
+ raise RuntimeError(
|
|
|
+ 'RPC backend missing task request for {0!r}'.format(task_id))
|
|
|
+ return request.reply_to, request.correlation_id or task_id
|
|
|
+
|
|
|
+ def on_reply_declare(self, task_id):
|
|
|
+ # Return value here is used as the `declare=` argument
|
|
|
+ # for Producer.publish.
|
|
|
+ # By default we don't have to declare anything when sending a result.
|
|
|
+ pass
|
|
|
+
|
|
|
+ def on_result_fulfilled(self, result):
|
|
|
+ # This usually cancels the queue after the result is received,
|
|
|
+ # but we don't have to cancel since we have one queue per process.
|
|
|
+ pass
|
|
|
+
|
|
|
+ def as_uri(self, include_password=True):
|
|
|
+ return 'rpc://'
|
|
|
+
|
|
|
def store_result(self, task_id, result, state,
|
|
|
traceback=None, request=None, **kwargs):
|
|
|
"""Send task return value and state."""
|
|
@@ -258,7 +308,7 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
'delete_group is not supported by this backend.')
|
|
|
|
|
|
def __reduce__(self, args=(), kwargs={}):
|
|
|
- return super(BaseRPCBackend, self).__reduce__(args, dict(
|
|
|
+ return super(RPCBackend, self).__reduce__(args, dict(
|
|
|
kwargs,
|
|
|
connection=self._connection,
|
|
|
exchange=self.exchange.name,
|
|
@@ -269,63 +319,6 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
expires=self.expires,
|
|
|
))
|
|
|
|
|
|
-
|
|
|
-class RPCBackend(BaseRPCBackend):
|
|
|
- """RPC result backend."""
|
|
|
-
|
|
|
- persistent = False
|
|
|
-
|
|
|
- class Consumer(kombu.Consumer):
|
|
|
- """Consumer that requires manual declaration of queues."""
|
|
|
-
|
|
|
- auto_declare = False
|
|
|
-
|
|
|
- def _create_exchange(self, name, type='direct', delivery_mode=2):
|
|
|
- # uses direct to queue routing (anon exchange).
|
|
|
- return self.Exchange(None)
|
|
|
-
|
|
|
- def _create_binding(self, task_id):
|
|
|
- """Create new binding for task with id."""
|
|
|
- # RPC backend caches the binding, as one queue is used for all tasks.
|
|
|
- return self.binding
|
|
|
-
|
|
|
- def on_task_call(self, producer, task_id):
|
|
|
- # Called every time a task is sent when using this backend.
|
|
|
- # We declare the queue we receive replies on in advance of sending
|
|
|
- # the message, but we skip this if running in the prefork pool
|
|
|
- # (task_join_will_block), as we know the queue is already declared.
|
|
|
- if not task_join_will_block():
|
|
|
- maybe_declare(self.binding(producer.channel), retry=True)
|
|
|
-
|
|
|
- def destination_for(self, task_id, request):
|
|
|
- """Get the destination for result by task id.
|
|
|
-
|
|
|
- Returns:
|
|
|
- Tuple[str, str]: tuple of ``(reply_to, correlation_id)``.
|
|
|
- """
|
|
|
- # Backends didn't always receive the `request`, so we must still
|
|
|
- # support old code that relies on current_task.
|
|
|
- try:
|
|
|
- request = request or current_task.request
|
|
|
- except AttributeError:
|
|
|
- raise RuntimeError(
|
|
|
- 'RPC backend missing task request for {0!r}'.format(task_id))
|
|
|
- return request.reply_to, request.correlation_id or task_id
|
|
|
-
|
|
|
- def on_reply_declare(self, task_id):
|
|
|
- # Return value here is used as the `declare=` argument
|
|
|
- # for Producer.publish.
|
|
|
- # By default we don't have to declare anything when sending a result.
|
|
|
- pass
|
|
|
-
|
|
|
- def on_result_fulfilled(self, result):
|
|
|
- # This usually cancels the queue after the result is received,
|
|
|
- # but we don't have to cancel since we have one queue per process.
|
|
|
- pass
|
|
|
-
|
|
|
- def as_uri(self, include_password=True):
|
|
|
- return 'rpc://'
|
|
|
-
|
|
|
@property
|
|
|
def binding(self):
|
|
|
return self.Queue(
|