|
@@ -47,9 +47,13 @@ from Queue import Empty, Queue
|
|
|
|
|
|
from celery.task import Task
|
|
|
from celery.utils import timer2
|
|
|
+from celery.utils.log import get_logger
|
|
|
from celery.worker import state
|
|
|
|
|
|
|
|
|
+logger = get_logger(__name__)
|
|
|
+
|
|
|
+
|
|
|
def consume_queue(queue):
|
|
|
"""Iterator yielding all immediately available items in a
|
|
|
:class:`Queue.Queue`.
|
|
@@ -78,9 +82,9 @@ def apply_batches_task(task, args, loglevel, logfile):
|
|
|
task.request.update({"loglevel": loglevel, "logfile": logfile})
|
|
|
try:
|
|
|
result = task(*args)
|
|
|
- except Exception, exp:
|
|
|
+ except Exception, exc:
|
|
|
result = None
|
|
|
- task.logger.error("There was an Exception: %s", exp, exc_info=True)
|
|
|
+ task.logger.error("Error: %r", exc, exc_info=True)
|
|
|
finally:
|
|
|
task.request.clear()
|
|
|
return result
|
|
@@ -161,15 +165,15 @@ class Batches(Task):
|
|
|
self._do_flush()
|
|
|
|
|
|
def _do_flush(self):
|
|
|
- self.debug("Wake-up to flush buffer...")
|
|
|
+ logger.debug("Batches: Wake-up to flush buffer...")
|
|
|
requests = None
|
|
|
if self._buffer.qsize():
|
|
|
requests = list(consume_queue(self._buffer))
|
|
|
if requests:
|
|
|
- self.debug("Buffer complete: %s", len(requests))
|
|
|
+ logger.debug("Batches: Buffer complete: %s", len(requests))
|
|
|
self.flush(requests)
|
|
|
if not requests:
|
|
|
- self.debug("Cancelling timer: Nothing in buffer.")
|
|
|
+ logger.debug("Batches: Cancelling timer: Nothing in buffer.")
|
|
|
self._tref.cancel() # cancel timer.
|
|
|
self._tref = None
|
|
|
|
|
@@ -189,6 +193,3 @@ class Batches(Task):
|
|
|
(self, args, loglevel, logfile),
|
|
|
accept_callback=on_accepted,
|
|
|
callback=acks_late[True] and on_return or None)
|
|
|
-
|
|
|
- def debug(self, msg):
|
|
|
- self.logger.debug("%s: %s", self.name, msg)
|