|
@@ -439,10 +439,17 @@ class Consumer(object):
|
|
|
|
|
|
def on_unknown_task(self, body, message, exc):
|
|
def on_unknown_task(self, body, message, exc):
|
|
error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
|
+ id_, name = message.headers['id'], message.headers['task']
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
- self.app.backend.mark_as_failure(
|
|
|
|
- message.headers['id'], NotRegistered(message.headers['task']))
|
|
|
|
- signals.task_unknown.send(sender=self, message=message, exc=exc)
|
|
|
|
|
|
+ self.app.backend.mark_as_failure(id_, NotRegistered(name))
|
|
|
|
+ if self.event_dispatcher:
|
|
|
|
+ self.event_dispatcher.send(
|
|
|
|
+ 'task-failed', uuid=id_,
|
|
|
|
+ exception='NotRegistered({0!r})'.format(name),
|
|
|
|
+ )
|
|
|
|
+ signals.task_unknown.send(
|
|
|
|
+ sender=self, message=message, exc=exc, name=name, id=id_,
|
|
|
|
+ )
|
|
|
|
|
|
def on_invalid_task(self, body, message, exc):
|
|
def on_invalid_task(self, body, message, exc):
|
|
error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|