|
@@ -27,7 +27,15 @@ class UnknownTask(Exception):
|
|
|
|
|
|
def jail(task_id, callable_, args, kwargs):
|
|
|
"""Wraps the task in a jail which saves the status and result
|
|
|
- of the task execution to the task meta backend."""
|
|
|
+ of the task execution to the task meta backend.
|
|
|
+
|
|
|
+ If the call results in an exception, it saves the exception as the task
|
|
|
+ result, and sets the task status to ``FAILURE``.
|
|
|
+
|
|
|
+ If the call was successful, it saves the result to the task result
|
|
|
+ backend, and sets the task status to ``DONE``.
|
|
|
+
|
|
|
+ """
|
|
|
try:
|
|
|
result = callable_(*args, **kwargs)
|
|
|
mark_as_done(task_id, result)
|
|
@@ -39,6 +47,7 @@ def jail(task_id, callable_, args, kwargs):
|
|
|
|
|
|
class TaskWrapper(object):
|
|
|
"""Class defining a task to be run."""
|
|
|
+
|
|
|
def __init__(self, task_name, task_id, task_func, args, kwargs):
|
|
|
self.task_name = task_name
|
|
|
self.task_id = task_id
|
|
@@ -49,7 +58,14 @@ class TaskWrapper(object):
|
|
|
@classmethod
|
|
|
def from_message(cls, message):
|
|
|
"""Create a TaskWrapper from a message returned by
|
|
|
- :class:`celery.messaging.TaskConsumer`."""
|
|
|
+ :class:`celery.messaging.TaskConsumer`.
|
|
|
+
|
|
|
+ If the message is not a proper task it raises
|
|
|
+ :class:`UnknownTask` exception.
|
|
|
+
|
|
|
+ :rtype: :class:`TaskWrapper` instance.
|
|
|
+
|
|
|
+ """
|
|
|
message_data = simplejson.loads(message.body)
|
|
|
task_name = message_data["task"]
|
|
|
task_id = message_data["id"]
|
|
@@ -82,7 +98,10 @@ class TaskWrapper(object):
|
|
|
self.task_func, self.args, task_func_kwargs])
|
|
|
|
|
|
def execute_using_pool(self, pool, loglevel, logfile):
|
|
|
- """Like ``execute``, but using the ``multiprocessing`` pool."""
|
|
|
+ """Like ``execute``, but using the ``multiprocessing`` pool.
|
|
|
+
|
|
|
+ :rtype: :class:`multiprocessing.AsyncResult` instance.
|
|
|
+ """
|
|
|
task_func_kwargs = self.extend_kwargs_with_logging(loglevel, logfile)
|
|
|
return pool.apply_async(jail, [self.task_id, self.task_func,
|
|
|
self.args, task_func_kwargs])
|
|
@@ -91,7 +110,43 @@ class TaskWrapper(object):
|
|
|
class TaskDaemon(object):
|
|
|
"""Executes tasks waiting in the task queue.
|
|
|
|
|
|
- ``concurrency`` is the number of simultaneous processes.
|
|
|
+ .. attribute:: concurrency
|
|
|
+
|
|
|
+ The number of simultaneous processes doing work (default:
|
|
|
+ ``celery.conf.DAEMON_CONCURRENCY``)
|
|
|
+
|
|
|
+ .. attribute:: loglevel
|
|
|
+
|
|
|
+ The loglevel used (default: ``logging.INFO``)
|
|
|
+
|
|
|
+ .. attribute:: logfile
|
|
|
+
|
|
|
+ The logfile used, if no logfile is specified it uses ``stderr``
|
|
|
+ (default: ``celery.conf.DAEMON_LOG_FILE``).
|
|
|
+
|
|
|
+ .. attribute:: queue_wakeup_after
|
|
|
+
|
|
|
+ The time it takes for the daemon to wake up after the queue is empty,
|
|
|
+ so it can check for more work
|
|
|
+ (default: ``celery.conf.QUEUE_WAKEUP_AFTER``).
|
|
|
+
|
|
|
+ .. attribute:: empty_msg_emit_every
|
|
|
+
|
|
|
+ How often the daemon emits the ``Waiting for queue`` message.
|
|
|
+ If this is ``None``, the message will never be logged.
|
|
|
+ (default: ``celery.conf.EMPTY_MSG_EMIT_EVERY``)
|
|
|
+
|
|
|
+ .. attribute:: logger
|
|
|
+
|
|
|
+ The :class:`logging.Logger` instance used for logging.
|
|
|
+
|
|
|
+ .. attribute:: pool
|
|
|
+ The :class:`multiprocessing.Pool` instance used.
|
|
|
+
|
|
|
+ .. attribute:: task_consumer
|
|
|
+
|
|
|
+ The :class:`celery.messaging.TaskConsumer` instance used.
|
|
|
+
|
|
|
"""
|
|
|
loglevel = logging.ERROR
|
|
|
concurrency = DAEMON_CONCURRENCY
|
|
@@ -112,18 +167,29 @@ class TaskDaemon(object):
|
|
|
self.reset_connection()
|
|
|
|
|
|
def reset_connection(self):
|
|
|
+ """Reset the AMQP connection, and reinitialize the
|
|
|
+ :class:`celery.messaging.TaskConsumer` instance."""
|
|
|
if self.task_consumer:
|
|
|
self.task_consumer.connection.close()
|
|
|
amqp_connection = DjangoAMQPConnection()
|
|
|
self.task_consumer = TaskConsumer(connection=amqp_connection)
|
|
|
|
|
|
def connection_diagnostics(self):
|
|
|
+ """Diagnose the AMQP connection, and reset if necessary."""
|
|
|
if not self.task_consumer.channel.connection:
|
|
|
self.logger.info(
|
|
|
"AMQP Connection has died, restoring connection.")
|
|
|
self.reset_connection()
|
|
|
|
|
|
def receive_message(self):
|
|
|
+ """Receive the next message from the Task consumer queue.
|
|
|
+
|
|
|
+ Tries to reset the AMQP connection if not available.
|
|
|
+ Returns ``None`` if no message is waiting on the queue.
|
|
|
+
|
|
|
+ :rtype: :class:`carrot.messaging.Message` instance.
|
|
|
+
|
|
|
+ """
|
|
|
self.connection_diagnostics()
|
|
|
message = self.task_consumer.fetch()
|
|
|
if message is not None:
|
|
@@ -131,6 +197,7 @@ class TaskDaemon(object):
|
|
|
return message
|
|
|
|
|
|
def receive_message_cc(self):
|
|
|
+ """UNUSED."""
|
|
|
amqp_connection = DjangoAMQPConnection()
|
|
|
task_consumer = TaskConsumer(connection=amqp_connection)
|
|
|
message = task_consumer.fetch()
|
|
@@ -140,6 +207,13 @@ class TaskDaemon(object):
|
|
|
return message
|
|
|
|
|
|
def fetch_next_task(self):
|
|
|
+ """Fetch the next task from the AMQP broker.
|
|
|
+
|
|
|
+ Raises :class`EmptyQueue` exception if there is no messages
|
|
|
+ waiting on the queue.
|
|
|
+
|
|
|
+ :rtype: :class:`TaskWrapper` instance.
|
|
|
+ """
|
|
|
message = self.receive_message()
|
|
|
if message is None: # No messages waiting.
|
|
|
raise EmptyQueue()
|
|
@@ -151,6 +225,12 @@ class TaskDaemon(object):
|
|
|
return task, message
|
|
|
|
|
|
def execute_next_task(self):
|
|
|
+ """Execute the next task on the queue using the multiprocessing
|
|
|
+ pool.
|
|
|
+
|
|
|
+ Catches all exceptions and logs them with level ``logging.CRITICAL``.
|
|
|
+
|
|
|
+ """
|
|
|
task, message = self.fetch_next_task()
|
|
|
|
|
|
try:
|
|
@@ -166,7 +246,7 @@ class TaskDaemon(object):
|
|
|
def run_periodic_tasks(self):
|
|
|
"""Schedule all waiting periodic tasks for execution.
|
|
|
|
|
|
- Returns list of :class:`celery.models.PeriodicTaskMeta` objects.
|
|
|
+ :rtype: list of :class:`celery.models.PeriodicTaskMeta` objects.
|
|
|
"""
|
|
|
waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
|
|
|
[waiting_task.delay()
|