|
@@ -94,6 +94,7 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
Producer = kombu.Producer
|
|
|
ResultConsumer = ResultConsumer
|
|
|
|
|
|
+ #: Exception raised when there are too many messages for a task id.
|
|
|
BacklogLimitExceeded = BacklogLimitExceeded
|
|
|
|
|
|
persistent = True
|
|
@@ -130,6 +131,7 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
register_after_fork(self, _on_after_fork_cleanup_backend)
|
|
|
|
|
|
def _after_fork(self):
|
|
|
+ # clear state for child processes.
|
|
|
self._pending_results.clear()
|
|
|
self.result_consumer._after_fork()
|
|
|
|
|
@@ -162,6 +164,10 @@ class BaseRPCBackend(base.Backend, AsyncBackendMixin):
|
|
|
}
|
|
|
|
|
|
def on_out_of_band_result(self, task_id, message):
|
|
|
+ # Callback called when a reply for a task is received,
|
|
|
+ # but we have no idea what do do with it.
|
|
|
+ # Since the result is not pending, we put it in a separate
|
|
|
+ # buffer: probably it will become pending later.
|
|
|
if self.result_consumer:
|
|
|
self.result_consumer.on_out_of_band_result(message)
|
|
|
self._out_of_band[task_id] = message
|
|
@@ -279,18 +285,26 @@ class RPCBackend(BaseRPCBackend):
|
|
|
return self.Exchange(None)
|
|
|
|
|
|
def _create_binding(self, task_id):
|
|
|
+ """Create new binding for task with id."""
|
|
|
+ # RPC backend caches the binding, as one queue is used for all tasks.
|
|
|
return self.binding
|
|
|
|
|
|
def on_task_call(self, producer, task_id):
|
|
|
+ # Called every time a task is sent when using this backend.
|
|
|
+ # We declare the queue we receive replies on in advance of sending
|
|
|
+ # the message, but we skip this if running in the prefork pool
|
|
|
+ # (task_join_will_block), as we know the queue is already declared.
|
|
|
if not task_join_will_block():
|
|
|
maybe_declare(self.binding(producer.channel), retry=True)
|
|
|
|
|
|
- def rkey(self, task_id):
|
|
|
- return task_id
|
|
|
-
|
|
|
def destination_for(self, task_id, request):
|
|
|
- # Request is a new argument for backends, so must still support
|
|
|
- # old code that rely on current_task
|
|
|
+ """Get the destination for result by task id.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple[str, str]: tuple of ``(reply_to, correlation_id)``.
|
|
|
+ """
|
|
|
+ # Backends didn't always receive the `request`, so we must still
|
|
|
+ # support old code that relies on current_task.
|
|
|
try:
|
|
|
request = request or current_task.request
|
|
|
except AttributeError:
|
|
@@ -299,9 +313,14 @@ class RPCBackend(BaseRPCBackend):
|
|
|
return request.reply_to, request.correlation_id or task_id
|
|
|
|
|
|
def on_reply_declare(self, task_id):
|
|
|
+ # Return value here is used as the `declare=` argument
|
|
|
+ # for Producer.publish.
|
|
|
+ # By default we don't have to declare anything when sending a result.
|
|
|
pass
|
|
|
|
|
|
def on_result_fulfilled(self, result):
|
|
|
+ # This usually cancels the queue after the result is received,
|
|
|
+ # but we don't have to cancel since we have one queue per process.
|
|
|
pass
|
|
|
|
|
|
def as_uri(self, include_password=True):
|
|
@@ -318,4 +337,5 @@ class RPCBackend(BaseRPCBackend):
|
|
|
|
|
|
@cached_property
|
|
|
def oid(self):
|
|
|
+ # cached here is the app OID: name of queue we receive results on.
|
|
|
return self.app.oid
|