|
@@ -25,28 +25,74 @@ class UnknownTask(Exception):
|
|
|
ignored."""
|
|
|
|
|
|
|
|
|
-def jail(task_id, callable_, args, kwargs):
|
|
|
- """Wraps the task in a jail which saves the status and result
|
|
|
- of the task execution to the task meta backend.
|
|
|
+def jail(task_id, func, args, kwargs):
|
|
|
+ """Wraps the task in a jail, which catches all exceptions, and
|
|
|
+ saves the status and result of the task execution to the task
|
|
|
+ meta backend.
|
|
|
+
|
|
|
+ If the call was successful, it saves the result to the task result
|
|
|
+ backend, and sets the task status to ``"DONE"``.
|
|
|
|
|
|
If the call results in an exception, it saves the exception as the task
|
|
|
- result, and sets the task status to ``FAILURE``.
|
|
|
+ result, and sets the task status to ``"FAILURE"``.
|
|
|
|
|
|
- If the call was successful, it saves the result to the task result
|
|
|
- backend, and sets the task status to ``DONE``.
|
|
|
+ :param task_id: The id of the task.
|
|
|
+
|
|
|
+ :param func: Callable object to execute.
|
|
|
+
|
|
|
+ :param args: List of positional args to pass on to the function.
|
|
|
+
|
|
|
+ :param kwargs: Keyword arguments mapping to pass on to the function.
|
|
|
+
|
|
|
+ :returns: the function return value on success.
|
|
|
+
|
|
|
+ :returns: the exception instance on failure.
|
|
|
|
|
|
"""
|
|
|
try:
|
|
|
- result = callable_(*args, **kwargs)
|
|
|
- default_backend.mark_as_done(task_id, result)
|
|
|
- return result
|
|
|
+ result = func(*args, **kwargs)
|
|
|
except Exception, exc:
|
|
|
default_backend.mark_as_failure(task_id, exc)
|
|
|
return exc
|
|
|
+ else:
|
|
|
+ default_backend.mark_as_done(task_id, result)
|
|
|
+ return result
|
|
|
|
|
|
|
|
|
class TaskWrapper(object):
|
|
|
- """Class defining a task to be run."""
|
|
|
+ """Class wrapping a task to be run.
|
|
|
+
|
|
|
+ :param task_name: see :attr:`task_name`.
|
|
|
+
|
|
|
+ :param task_id: see :attr:`task_id`.
|
|
|
+
|
|
|
+ :param task_func: see :attr:`task_func`
|
|
|
+
|
|
|
+ :param args: see :attr:`args`
|
|
|
+
|
|
|
+ :param kwargs: see :attr:`kwargs`.
|
|
|
+
|
|
|
+ .. attribute:: task_name
|
|
|
+
|
|
|
+ Kind of task. Must be a name registered in the task registry.
|
|
|
+
|
|
|
+ .. attribute:: task_id
|
|
|
+
|
|
|
+ UUID of the task.
|
|
|
+
|
|
|
+ .. attribute:: task_func
|
|
|
+
|
|
|
+ The tasks callable object.
|
|
|
+
|
|
|
+ .. attribute:: args
|
|
|
+
|
|
|
+ List of positional arguments to apply to the task.
|
|
|
+
|
|
|
+ .. attribute:: kwargs
|
|
|
+
|
|
|
+ Mapping of keyword arguments to apply to the task.
|
|
|
+
|
|
|
+ """
|
|
|
|
|
|
def __init__(self, task_name, task_id, task_func, args, kwargs):
|
|
|
self.task_name = task_name
|
|
@@ -57,13 +103,13 @@ class TaskWrapper(object):
|
|
|
|
|
|
@classmethod
|
|
|
def from_message(cls, message):
|
|
|
- """Create a TaskWrapper from a message returned by
|
|
|
- :class:`celery.messaging.TaskConsumer`.
|
|
|
+ """Create a :class:`TaskWrapper` from a task message sent by
|
|
|
+ :class:`celery.messaging.TaskPublisher`.
|
|
|
|
|
|
- If the message is not a proper task it raises
|
|
|
- :class:`UnknownTask` exception.
|
|
|
+ :raises UnknownTask: if the message does not describe a task,
|
|
|
+ the message is also rejected.
|
|
|
|
|
|
- :rtype: :class:`TaskWrapper` instance.
|
|
|
+ :returns: :class:`TaskWrapper` instance.
|
|
|
|
|
|
"""
|
|
|
message_data = simplejson.loads(message.body)
|
|
@@ -77,7 +123,7 @@ class TaskWrapper(object):
|
|
|
task_func = tasks[task_name]
|
|
|
return cls(task_name, task_id, task_func, args, kwargs)
|
|
|
|
|
|
- def extend_kwargs_with_logging(self, loglevel, logfile):
|
|
|
+ def extend_with_default_kwargs(self, loglevel, logfile):
|
|
|
"""Extend the tasks keyword arguments with standard task arguments.
|
|
|
|
|
|
These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
|
|
@@ -90,17 +136,30 @@ class TaskWrapper(object):
|
|
|
task_func_kwargs.update(self.kwargs)
|
|
|
return task_func_kwargs
|
|
|
|
|
|
- def execute(self, loglevel, logfile):
|
|
|
- """Execute the task in a ``jail()`` and store its result and status
|
|
|
- in the task meta backend."""
|
|
|
- task_func_kwargs = self.extend_kwargs_with_logging(loglevel, logfile)
|
|
|
+ def execute(self, loglevel=None, logfile=None):
|
|
|
+ """Execute the task in a :func:`jail` and store return value
|
|
|
+ and status in the task meta backend.
|
|
|
+
|
|
|
+ :keyword loglevel: The loglevel used by the task.
|
|
|
+
|
|
|
+ :keyword logfile: The logfile used by the task.
|
|
|
+
|
|
|
+ """
|
|
|
+ task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
|
|
|
return jail(self.task_id, [
|
|
|
self.task_func, self.args, task_func_kwargs])
|
|
|
|
|
|
- def execute_using_pool(self, pool, loglevel, logfile):
|
|
|
- """Like ``execute``, but using the ``multiprocessing`` pool.
|
|
|
+ def execute_using_pool(self, pool, loglevel=None, logfile=None):
|
|
|
+ """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
|
|
|
+
|
|
|
+ :param pool: A :cls:`multiprocessing.Pool` instance.
|
|
|
+
|
|
|
+ :keyword loglevel: The loglevel used by the task.
|
|
|
+
|
|
|
+ :keyword logfile: The logfile used by the task.
|
|
|
+
|
|
|
+ :returns :class:`multiprocessing.AsyncResult` instance.
|
|
|
|
|
|
- :rtype: :class:`multiprocessing.AsyncResult` instance.
|
|
|
"""
|
|
|
task_func_kwargs = self.extend_kwargs_with_logging(loglevel, logfile)
|
|
|
return pool.apply_async(jail, [self.task_id, self.task_func,
|
|
@@ -109,32 +168,41 @@ class TaskWrapper(object):
|
|
|
|
|
|
class TaskDaemon(object):
|
|
|
"""Executes tasks waiting in the task queue.
|
|
|
+
|
|
|
+ :param concurrency: see :attr:`concurrency`.
|
|
|
+
|
|
|
+ :param logfile: see :attr:`logfile`.
|
|
|
+
|
|
|
+ :param loglevel: see :attr:`loglevel`.
|
|
|
+
|
|
|
+ :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
|
|
|
+
|
|
|
|
|
|
.. attribute:: concurrency
|
|
|
|
|
|
The number of simultaneous processes doing work (default:
|
|
|
- ``celery.conf.DAEMON_CONCURRENCY``)
|
|
|
+ :const:`celery.conf.DAEMON_CONCURRENCY`)
|
|
|
|
|
|
.. attribute:: loglevel
|
|
|
|
|
|
- The loglevel used (default: ``logging.INFO``)
|
|
|
+ The loglevel used (default: :const:`logging.INFO`)
|
|
|
|
|
|
.. attribute:: logfile
|
|
|
|
|
|
The logfile used, if no logfile is specified it uses ``stderr``
|
|
|
- (default: ``celery.conf.DAEMON_LOG_FILE``).
|
|
|
+ (default: :const:`celery.conf.DAEMON_LOG_FILE`).
|
|
|
|
|
|
.. attribute:: queue_wakeup_after
|
|
|
|
|
|
The time it takes for the daemon to wake up after the queue is empty,
|
|
|
so it can check for more work
|
|
|
- (default: ``celery.conf.QUEUE_WAKEUP_AFTER``).
|
|
|
+ (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
|
|
|
|
|
|
.. attribute:: empty_msg_emit_every
|
|
|
|
|
|
- How often the daemon emits the ``Waiting for queue`` message.
|
|
|
+ How often the daemon emits the ``"Waiting for queue..."`` message.
|
|
|
If this is ``None``, the message will never be logged.
|
|
|
- (default: ``celery.conf.EMPTY_MSG_EMIT_EVERY``)
|
|
|
+ (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
|
|
|
|
|
|
.. attribute:: logger
|
|
|
|
|
@@ -169,21 +237,26 @@ class TaskDaemon(object):
|
|
|
|
|
|
def reset_connection(self):
|
|
|
"""Reset the AMQP connection, and reinitialize the
|
|
|
- :class:`celery.messaging.TaskConsumer` instance."""
|
|
|
+ :class:`celery.messaging.TaskConsumer` instance.
|
|
|
+
|
|
|
+ Resets the task consumer in :attr:`task_consumer`.
|
|
|
+
|
|
|
+ """
|
|
|
if self.task_consumer:
|
|
|
self.task_consumer.connection.close()
|
|
|
amqp_connection = DjangoAMQPConnection()
|
|
|
self.task_consumer = TaskConsumer(connection=amqp_connection)
|
|
|
|
|
|
def connection_diagnostics(self):
|
|
|
- """Diagnose the AMQP connection, and reset if necessary."""
|
|
|
+ """Diagnose the AMQP connection, and reset connection if
|
|
|
+ necessary."""
|
|
|
if not self.task_consumer.channel.connection:
|
|
|
self.logger.info(
|
|
|
"AMQP Connection has died, restoring connection.")
|
|
|
self.reset_connection()
|
|
|
|
|
|
def receive_message(self):
|
|
|
- """Receive the next message from the Task consumer queue.
|
|
|
+ """Receive the next message from the message broker.
|
|
|
|
|
|
Tries to reset the AMQP connection if not available.
|
|
|
Returns ``None`` if no message is waiting on the queue.
|
|
@@ -197,23 +270,14 @@ class TaskDaemon(object):
|
|
|
message.ack()
|
|
|
return message
|
|
|
|
|
|
- def receive_message_cc(self):
|
|
|
- """UNUSED."""
|
|
|
- amqp_connection = DjangoAMQPConnection()
|
|
|
- task_consumer = TaskConsumer(connection=amqp_connection)
|
|
|
- message = task_consumer.fetch()
|
|
|
- if message is not None:
|
|
|
- message.ack()
|
|
|
- amqp_connection.close()
|
|
|
- return message
|
|
|
-
|
|
|
def fetch_next_task(self):
|
|
|
"""Fetch the next task from the AMQP broker.
|
|
|
|
|
|
- Raises :class:`EmptyQueue` exception if there is no messages
|
|
|
+ Raises :exc:`EmptyQueue` exception if there is no message
|
|
|
waiting on the queue.
|
|
|
|
|
|
- :rtype: :class:`TaskWrapper` instance.
|
|
|
+ :returns: :class:`TaskWrapper` instance.
|
|
|
+
|
|
|
"""
|
|
|
message = self.receive_message()
|
|
|
if message is None: # No messages waiting.
|
|
@@ -226,10 +290,10 @@ class TaskDaemon(object):
|
|
|
return task, message
|
|
|
|
|
|
def execute_next_task(self):
|
|
|
- """Execute the next task on the queue using the multiprocessing
|
|
|
- pool.
|
|
|
+ """Execute the next task on the queue using the multiprocessing pool.
|
|
|
|
|
|
- Catches all exceptions and logs them with level ``logging.CRITICAL``.
|
|
|
+ Catches all exceptions and logs them with level
|
|
|
+ :const:`logging.CRITICAL`.
|
|
|
|
|
|
"""
|
|
|
task, message = self.fetch_next_task()
|
|
@@ -259,7 +323,7 @@ class TaskDaemon(object):
|
|
|
pass
|
|
|
|
|
|
def run(self):
|
|
|
- """The worker server's main loop."""
|
|
|
+ """Starts the workers main loop."""
|
|
|
results = TaskProcessQueue(self.concurrency, logger=self.logger,
|
|
|
done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
|
|
|
log_wait = lambda: self.logger.info("Waiting for queue...")
|