|
@@ -177,11 +177,10 @@ class QoS(object):
|
|
|
if pcount != self.prev:
|
|
|
new_value = pcount
|
|
|
if pcount > PREFETCH_COUNT_MAX:
|
|
|
- self.logger.warning(
|
|
|
- "QoS: Disabled: prefetch_count exceeds %r" % (
|
|
|
- PREFETCH_COUNT_MAX, ))
|
|
|
+ self.logger.warning("QoS: Disabled: prefetch_count exceeds %r",
|
|
|
+ PREFETCH_COUNT_MAX)
|
|
|
new_value = 0
|
|
|
- self.logger.debug("basic.qos: prefetch_count->%s" % new_value)
|
|
|
+ self.logger.debug("basic.qos: prefetch_count->%s", new_value)
|
|
|
self.consumer.qos(prefetch_count=new_value)
|
|
|
self.prev = pcount
|
|
|
return pcount
|
|
@@ -226,7 +225,7 @@ class Consumer(object):
|
|
|
|
|
|
#: The thread that sends event heartbeats at regular intervals.
|
|
|
#: The heartbeats are used by monitors to detect that a worker
|
|
|
- #: went off-line/disappeared.
|
|
|
+ #: went offline/disappeared.
|
|
|
heart = None
|
|
|
|
|
|
#: The logger instance to use. Defaults to the default Celery logger.
|
|
@@ -334,7 +333,7 @@ class Consumer(object):
|
|
|
if task.revoked():
|
|
|
return
|
|
|
|
|
|
- self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
|
|
|
+ self.logger.info("Got task from broker: %s", task.shortinfo())
|
|
|
|
|
|
if self.event_dispatcher.enabled:
|
|
|
self.event_dispatcher.send("task-received", uuid=task.task_id,
|
|
@@ -348,8 +347,8 @@ class Consumer(object):
|
|
|
eta = timer2.to_timestamp(task.eta)
|
|
|
except OverflowError, exc:
|
|
|
self.logger.error(
|
|
|
- "Couldn't convert eta %s to time stamp: %r. Task: %r" % (
|
|
|
- task.eta, exc, task.info(safe=True)),
|
|
|
+ "Couldn't convert eta %s to timestamp: %r. Task: %r",
|
|
|
+ task.eta, exc, task.info(safe=True),
|
|
|
exc_info=sys.exc_info())
|
|
|
task.acknowledge()
|
|
|
else:
|
|
@@ -365,11 +364,11 @@ class Consumer(object):
|
|
|
try:
|
|
|
self.pidbox_node.handle_message(body, message)
|
|
|
except KeyError, exc:
|
|
|
- self.logger.error("No such control command: %s" % exc)
|
|
|
+ self.logger.error("No such control command: %s", exc)
|
|
|
except Exception, exc:
|
|
|
self.logger.error(
|
|
|
- "Error occurred while handling control command: %r\n%r" % (
|
|
|
- exc, traceback.format_exc()), exc_info=sys.exc_info())
|
|
|
+ "Error occurred while handling control command: %r\n%r",
|
|
|
+ exc, traceback.format_exc(), exc_info=sys.exc_info())
|
|
|
self.reset_pidbox_node()
|
|
|
|
|
|
def apply_eta_task(self, task):
|
|
@@ -392,15 +391,15 @@ class Consumer(object):
|
|
|
:param message: The kombu message object.
|
|
|
|
|
|
"""
|
|
|
- # need to guard against errors occurring while acking the message.
|
|
|
+ # need to guard against errors occuring while acking the message.
|
|
|
def ack():
|
|
|
try:
|
|
|
message.ack()
|
|
|
except self.connection_errors + (AttributeError, ), exc:
|
|
|
self.logger.critical(
|
|
|
- "Couldn't ack %r: %s reason:%r" % (
|
|
|
+ "Couldn't ack %r: %s reason:%r",
|
|
|
message.delivery_tag,
|
|
|
- self._message_report(body, message), exc))
|
|
|
+ self._message_report(body, message), exc)
|
|
|
|
|
|
try:
|
|
|
body["task"]
|
|
@@ -420,12 +419,12 @@ class Consumer(object):
|
|
|
eventer=self.event_dispatcher)
|
|
|
|
|
|
except NotRegistered, exc:
|
|
|
- self.logger.error(UNKNOWN_TASK_ERROR % (
|
|
|
- exc, safe_repr(body)), exc_info=sys.exc_info())
|
|
|
+ self.logger.error(UNKNOWN_TASK_ERROR, exc, safe_repr(body),
|
|
|
+ exc_info=sys.exc_info())
|
|
|
ack()
|
|
|
except InvalidTaskError, exc:
|
|
|
- self.logger.error(INVALID_TASK_ERROR % (
|
|
|
- str(exc), safe_repr(body)), exc_info=sys.exc_info())
|
|
|
+ self.logger.error(INVALID_TASK_ERROR, str(exc), safe_repr(body),
|
|
|
+ exc_info=sys.exc_info())
|
|
|
ack()
|
|
|
else:
|
|
|
self.on_task(task)
|
|
@@ -500,9 +499,9 @@ class Consumer(object):
|
|
|
|
|
|
"""
|
|
|
self.logger.critical(
|
|
|
- "Can't decode message body: %r (type:%r encoding:%r raw:%r')" % (
|
|
|
+ "Can't decode message body: %r (type:%r encoding:%r raw:%r')",
|
|
|
exc, message.content_type, message.content_encoding,
|
|
|
- safe_repr(message.body)))
|
|
|
+ safe_repr(message.body))
|
|
|
message.ack()
|
|
|
|
|
|
def reset_pidbox_node(self):
|
|
@@ -558,7 +557,7 @@ class Consumer(object):
|
|
|
self.initial_prefetch_count, self.logger)
|
|
|
self.qos.update()
|
|
|
|
|
|
- # receive_message handles incoming messages.
|
|
|
+ # receive_message handles incomsing messages.
|
|
|
self.task_consumer.register_callback(self.receive_message)
|
|
|
|
|
|
# Setup the process mailbox.
|
|
@@ -600,8 +599,8 @@ class Consumer(object):
|
|
|
# Callback called for each retry while the connection
|
|
|
# can't be established.
|
|
|
def _error_handler(exc, interval):
|
|
|
- self.logger.error("Consumer: Connection Error: %s. " % exc
|
|
|
- + "Trying again in %d seconds..." % interval)
|
|
|
+ self.logger.error("Consumer: Connection Error: %s. "
|
|
|
+ "Trying again in %d seconds...", exc, interval)
|
|
|
|
|
|
# remember that the connection is lazy, it won't establish
|
|
|
# until it's needed.
|
|
@@ -643,4 +642,4 @@ class Consumer(object):
|
|
|
return {"broker": conninfo, "prefetch_count": self.qos.value}
|
|
|
|
|
|
def _debug(self, msg, **kwargs):
|
|
|
- self.logger.debug("Consumer: %s" % (msg, ), **kwargs)
|
|
|
+ self.logger.debug("Consumer: %s", msg, **kwargs)
|