|
@@ -33,6 +33,7 @@ from kombu.utils.limits import TokenBucket
|
|
|
from celery import bootsteps
|
|
|
from celery.app import app_or_default
|
|
|
from celery.canvas import subtask
|
|
|
+from celery.exceptions import InvalidTaskError
|
|
|
from celery.five import items, values
|
|
|
from celery.task.trace import build_tracer
|
|
|
from celery.utils.functional import noop
|
|
@@ -153,7 +154,7 @@ class Consumer(object):
|
|
|
def shutdown(self, parent):
|
|
|
self.send_all(parent, 'shutdown')
|
|
|
|
|
|
- def __init__(self, handle_task,
|
|
|
+ def __init__(self, on_task,
|
|
|
init_callback=noop, hostname=None,
|
|
|
pool=None, app=None,
|
|
|
timer=None, controller=None, hub=None, amqheartbeat=None,
|
|
@@ -172,7 +173,7 @@ class Consumer(object):
|
|
|
self._restart_state = restart_state(maxR=5, maxT=1)
|
|
|
|
|
|
self._does_info = logger.isEnabledFor(logging.INFO)
|
|
|
- self.handle_task = handle_task
|
|
|
+ self.on_task = on_task
|
|
|
self.amqheartbeat_rate = self.app.conf.BROKER_HEARTBEAT_CHECKRATE
|
|
|
self.disable_rate_limits = disable_rate_limits
|
|
|
|
|
@@ -223,7 +224,7 @@ class Consumer(object):
|
|
|
)
|
|
|
else:
|
|
|
task_reserved(request)
|
|
|
- self.handle_task(request)
|
|
|
+ self.on_task(request)
|
|
|
|
|
|
def start(self):
|
|
|
blueprint, loop = self.blueprint, self.loop
|
|
@@ -264,9 +265,7 @@ class Consumer(object):
|
|
|
|
|
|
def loop_args(self):
|
|
|
return (self, self.connection, self.task_consumer,
|
|
|
- self.strategies, self.blueprint, self.hub, self.qos,
|
|
|
- self.amqheartbeat, self.handle_unknown_message,
|
|
|
- self.handle_unknown_task, self.handle_invalid_task,
|
|
|
+ self.blueprint, self.hub, self.qos, self.amqheartbeat,
|
|
|
self.app.clock, self.amqheartbeat_rate)
|
|
|
|
|
|
def on_poll_init(self, hub):
|
|
@@ -360,7 +359,7 @@ class Consumer(object):
|
|
|
"""Method called by the timer to apply a task with an
|
|
|
ETA/countdown."""
|
|
|
task_reserved(task)
|
|
|
- self.handle_task(task)
|
|
|
+ self.on_task(task)
|
|
|
self.qos.decrement_eventually()
|
|
|
|
|
|
def _message_report(self, body, message):
|
|
@@ -369,15 +368,15 @@ class Consumer(object):
|
|
|
safe_repr(message.content_encoding),
|
|
|
safe_repr(message.delivery_info))
|
|
|
|
|
|
- def handle_unknown_message(self, body, message):
|
|
|
+ def on_unknown_message(self, body, message):
|
|
|
warn(UNKNOWN_FORMAT, self._message_report(body, message))
|
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
|
|
|
|
- def handle_unknown_task(self, body, message, exc):
|
|
|
+ def on_unknown_task(self, body, message, exc):
|
|
|
error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
|
|
|
|
- def handle_invalid_task(self, body, message, exc):
|
|
|
+ def on_invalid_task(self, body, message, exc):
|
|
|
error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
|
|
|
@@ -387,8 +386,11 @@ class Consumer(object):
|
|
|
self.strategies[name] = task.start_strategy(self.app, self)
|
|
|
task.__trace__ = build_tracer(name, task, loader, self.hostname)
|
|
|
|
|
|
- def create_task_handler(self, strategies, callbacks,
|
|
|
- on_unknown_message, on_unknown_task, on_invalid_task):
|
|
|
+ def create_task_handler(self, callbacks):
|
|
|
+ strategies = self.strategies
|
|
|
+ on_unknown_message = self.on_unknown_message
|
|
|
+ on_unknown_task = self.on_unknown_task
|
|
|
+ on_invalid_task = self.on_invalid_task
|
|
|
|
|
|
def on_task_received(body, message):
|
|
|
if callbacks:
|