|
@@ -63,7 +63,7 @@ class AMQPBackend(BaseBackend):
|
|
|
|
|
|
def __init__(self, app, connection=None, exchange=None, exchange_type=None,
|
|
|
persistent=None, serializer=None, auto_delete=True,
|
|
|
- **kwargs):
|
|
|
+ accept=None, **kwargs):
|
|
|
super(AMQPBackend, self).__init__(app, **kwargs)
|
|
|
conf = self.app.conf
|
|
|
self._connection = connection
|
|
@@ -74,6 +74,7 @@ class AMQPBackend(BaseBackend):
|
|
|
self.exchange = self._create_exchange(exchange, exchange_type,
|
|
|
self.persistent)
|
|
|
self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
|
|
|
+ self.accept = conf.CELERY_ACCEPT_CONTENT if accept is None else accept
|
|
|
self.auto_delete = auto_delete
|
|
|
|
|
|
self.expires = None
|
|
@@ -201,7 +202,8 @@ class AMQPBackend(BaseBackend):
|
|
|
wait = self.drain_events
|
|
|
with self.app.pool.acquire_channel(block=True) as (conn, channel):
|
|
|
binding = self._create_binding(task_id)
|
|
|
- with self.Consumer(channel, binding, no_ack=True) as consumer:
|
|
|
+ with self.Consumer(channel, binding,
|
|
|
+ no_ack=True, accept=self.accept) as consumer:
|
|
|
while 1:
|
|
|
try:
|
|
|
return wait(conn, consumer, timeout)[task_id]
|
|
@@ -240,8 +242,8 @@ class AMQPBackend(BaseBackend):
|
|
|
if uid in task_ids else push_cache(uid, body)
|
|
|
|
|
|
bindings = self._many_bindings(task_ids)
|
|
|
- with self.Consumer(channel, bindings,
|
|
|
- on_message=on_message, no_ack=True):
|
|
|
+ with self.Consumer(channel, bindings, on_message=on_message,
|
|
|
+ accept=self.accept, no_ack=True):
|
|
|
wait = conn.drain_events
|
|
|
popleft = results.popleft
|
|
|
while ids:
|