|
@@ -33,6 +33,7 @@ from celery.exceptions import InvalidTaskError, NotRegistered
|
|
from celery.utils import gethostname
|
|
from celery.utils import gethostname
|
|
from celery.utils.functional import noop
|
|
from celery.utils.functional import noop
|
|
from celery.utils.log import get_logger
|
|
from celery.utils.log import get_logger
|
|
|
|
+from celery.utils.objects import Bunch
|
|
from celery.utils.text import truncate
|
|
from celery.utils.text import truncate
|
|
from celery.utils.timeutils import humanize_seconds, rate
|
|
from celery.utils.timeutils import humanize_seconds, rate
|
|
|
|
|
|
@@ -451,10 +452,20 @@ class Consumer(object):
|
|
error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
try:
|
|
try:
|
|
id_, name = message.headers['id'], message.headers['task']
|
|
id_, name = message.headers['id'], message.headers['task']
|
|
|
|
+ root_id = message.headers['root_id']
|
|
except KeyError: # proto1
|
|
except KeyError: # proto1
|
|
id_, name = body['id'], body['task']
|
|
id_, name = body['id'], body['task']
|
|
|
|
+ root_id = None
|
|
|
|
+ request = Bunch(
|
|
|
|
+ name=name, chord=None, root_id=root_id,
|
|
|
|
+ correlation_id=message.properties.get('correlation_id'),
|
|
|
|
+ reply_to=message.properties.get('reply_to'),
|
|
|
|
+ errbacks=None,
|
|
|
|
+ )
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
- self.app.backend.mark_as_failure(id_, NotRegistered(name))
|
|
|
|
|
|
+ self.app.backend.mark_as_failure(
|
|
|
|
+ id_, NotRegistered(name), request=request,
|
|
|
|
+ )
|
|
if self.event_dispatcher:
|
|
if self.event_dispatcher:
|
|
self.event_dispatcher.send(
|
|
self.event_dispatcher.send(
|
|
'task-failed', uuid=id_,
|
|
'task-failed', uuid=id_,
|