|
@@ -1,4 +1,5 @@
|
|
|
"""celery.worker"""
|
|
|
+from __future__ import with_statement
|
|
|
from carrot.connection import DjangoAMQPConnection
|
|
|
from celery.messaging import TaskConsumer
|
|
|
from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
|
|
@@ -319,10 +320,7 @@ class WorkController(object):
|
|
|
def connection_diagnostics(self):
|
|
|
"""Diagnose the AMQP connection, and reset connection if
|
|
|
necessary."""
|
|
|
- if hasattr(self.task_consumer, "backend"):
|
|
|
- connection = self.task_consumer.backend.channel.connection
|
|
|
- else:
|
|
|
- connection = self.task_consumer.channel.connection
|
|
|
+ connection = self.task_consumer.backend.channel.connection
|
|
|
|
|
|
if not connection:
|
|
|
self.logger.info(
|
|
@@ -338,9 +336,7 @@ class WorkController(object):
|
|
|
:rtype: :class:`carrot.messaging.Message` instance.
|
|
|
|
|
|
"""
|
|
|
- #self.connection_diagnostics()
|
|
|
- self.logger.debug("Trying to fetch message from broker...")
|
|
|
- message = next(self.task_consumer_it)
|
|
|
+ message = self.task_consumer_it.next()
|
|
|
if not message:
|
|
|
raise EmptyQueue()
|
|
|
return message
|
|
@@ -367,10 +363,8 @@ class WorkController(object):
|
|
|
Raises :exc:`EmptyQueue` exception if there is no message
|
|
|
waiting on the queue.
|
|
|
|
|
|
- :returns: :class:`TaskWrapper` instance.
|
|
|
-
|
|
|
"""
|
|
|
- return self.process_task(self.receive_message())
|
|
|
+ self.process_task(self.receive_message())
|
|
|
|
|
|
def run_periodic_tasks(self):
|
|
|
"""Schedule all waiting periodic tasks for execution.
|