|
@@ -161,7 +161,7 @@ class TaskWrapper(object):
|
|
|
self.args, self.kwargs)
|
|
|
|
|
|
@classmethod
|
|
|
- def from_message(cls, message, logger):
|
|
|
+ def from_message(cls, message, message_data, logger):
|
|
|
"""Create a :class:`TaskWrapper` from a task message sent by
|
|
|
:class:`celery.messaging.TaskPublisher`.
|
|
|
|
|
@@ -171,7 +171,6 @@ class TaskWrapper(object):
|
|
|
:returns: :class:`TaskWrapper` instance.
|
|
|
|
|
|
"""
|
|
|
- message_data = message.decode()
|
|
|
task_name = message_data["task"]
|
|
|
task_id = message_data["id"]
|
|
|
args = message_data["args"]
|
|
@@ -375,7 +374,7 @@ class WorkController(object):
|
|
|
def _message_callback(self, message_data, message):
|
|
|
try:
|
|
|
try:
|
|
|
- self.process_task(message)
|
|
|
+ self.process_task(message_data, message)
|
|
|
except ValueError:
|
|
|
# execute_next_task didn't return a r/name/id tuple,
|
|
|
# probably because it got an exception.
|
|
@@ -391,9 +390,10 @@ class WorkController(object):
|
|
|
except (SystemExit, KeyboardInterrupt):
|
|
|
self.shutdown()
|
|
|
|
|
|
- def process_task(self, message):
|
|
|
+ def process_task(self, message_data, message):
|
|
|
"""Process task message by passing it to the pool of workers."""
|
|
|
- task = TaskWrapper.from_message(message, logger=self.logger)
|
|
|
+ task = TaskWrapper.from_message(message, message_data,
|
|
|
+ logger=self.logger)
|
|
|
self.logger.info("Got task from broker: %s[%s]" % (
|
|
|
task.task_name, task.task_id))
|
|
|
self.logger.debug("Got a task: %s. Trying to execute it..." % task)
|