Browse Source

Fix up documentation

Ask Solem 16 years ago
parent
commit
c53f3ba991

+ 22 - 0
celery/backends/base.py

@@ -1,44 +1,66 @@
+"""celery.backends.base"""
 from celery.timer import TimeoutTimer
 
 
 class BaseBackend(object):
+    """The base backend class. All backends should inherit from this."""
 
     def __init__(self):
         pass
 
     def store_result(self, task_id, result, status):
+        """Store the result and status of a task."""
         raise NotImplementedError(
                 "Backends must implement the store_result method")
 
     def mark_as_done(self, task_id, result):
+        """Mark task as successfully executed."""
         return self.store_result(task_id, result, status="DONE")
     
     def mark_as_failure(self, task_id, exc):
+        """Mark task as executed with failure. Stores the execption."""
         return self.store_result(task_id, exc, status="FAILURE")
 
     def mark_as_retry(self, task_id, exc):
+        """Mark task for retry."""
         return self.store_result(task_id, exc, status="RETRY")
 
     def get_status(self, task_id):
+        """Get the status of a task."""
         raise NotImplementedError(
                 "Backends must implement the get_status method")
 
     def prepare_result(self, result):
+        """Prepare result for storage."""
         if result is None:
             return True
         return result
         
     def get_result(self, task_id):
+        """Get the result of a task."""
         raise NotImplementedError(
                 "Backends must implement the get_result method")
 
     def is_done(self, task_id):
+        """Returns ``True`` if the task was successfully executed."""
         return self.get_status(task_id) == "DONE"
 
     def cleanup(self):
+        """Backend cleanup. Is run by
+        :class:`celery.task.DeleteExpiredTaskMetaTask`."""
         pass
 
     def wait_for(self, task_id, timeout=None):
+        """Wait for task and return its result.
+
+        If the task raises an exception, this exception
+        will be re-raised by ``wait_for``.
+
+        If ``timeout`` is not ``None``, this raises
+        ``celery.timer.TimeoutError`` if the operation takes longer than
+        ``timeout`` seconds.
+
+        """
         timeout_timer = TimeoutTimer(timeout)
         while True:
             status = self.get_status(task_id)

+ 1 - 0
celery/bin/celeryd

@@ -1,4 +1,5 @@
 #!/usr/bin/env python
+"""celeryd"""
 import os
 import sys
 sys.path.append(os.getcwd())

+ 1 - 0
celery/datastructures.py

@@ -1,3 +1,4 @@
+"""celery.datastructures"""
 from UserList import UserList
 
 

+ 2 - 1
celery/discovery.py

@@ -1,8 +1,9 @@
+"""celery.discovery"""
 from django.conf import settings
 
 
 def autodiscover():
-    """Include tasks for all applications in settings.INSTALLED_APPS."""
+    """Include tasks for all applications in :setting:`INSTALLED_APPS`."""
     return filter(None, [find_related_module(app, "tasks")
                             for app in settings.INSTALLED_APPS])
 

+ 1 - 0
celery/log.py

@@ -1,3 +1,4 @@
+"""celery.log"""
 import multiprocessing
 import os
 import sys

+ 8 - 0
celery/messaging.py

@@ -1,9 +1,12 @@
+"""celery.messaging"""
 from carrot.messaging import Publisher, Consumer
 from celery import conf
 import uuid
 
 
 class NoProcessConsumer(Consumer):
+    """A consumer that raises an error if used with wait callbacks (i.e.
+    it doesn't support ``carrot.messaging.Consumer.wait``)."""
     
     def receive(self, message_data, message):
         raise NotImplementedError(
@@ -11,21 +14,25 @@ class NoProcessConsumer(Consumer):
 
 
 class TaskPublisher(Publisher):
+    """The AMQP Task Publisher class."""
     exchange = conf.AMQP_EXCHANGE
     routing_key = conf.AMQP_ROUTING_KEY
 
     def delay_task(self, task_name, *task_args, **task_kwargs):
+        """Delay task for execution by the celery nodes."""
         return self._delay_task(task_name=task_name, args=task_args,
                                 kwargs=task_kwargs)
 
 
     def delay_task_in_set(self, task_name, taskset_id, task_args,
             task_kwargs):
+        """Delay a task which part of a task set."""
         return self._delay_task(task_name=task_name, part_of_set=taskset_id,
                                 args=task_args, kwargs=task_kwargs)
     
     def requeue_task(self, task_name, task_id, task_args, task_kwargs,
             part_of_set=None):
+        """Requeue a failed task."""
         return self._delay_task(task_name=task_name, part_of_set=part_of_set,
                                 task_id=task_id, args=task_args,
                                 kwargs=task_kwargs)
@@ -48,6 +55,7 @@ class TaskPublisher(Publisher):
 
 
 class TaskConsumer(NoProcessConsumer):
+    """The AMQP Task Consumer class."""
     queue = conf.AMQP_CONSUMER_QUEUE
     exchange = conf.AMQP_EXCHANGE
     routing_key = conf.AMQP_ROUTING_KEY

+ 3 - 1
celery/models.py

@@ -1,3 +1,4 @@
+"""celery.models"""
 from django.db import models
 from celery.registry import tasks
 from celery.managers import TaskManager, PeriodicTaskManager
@@ -45,6 +46,7 @@ TASK_STATUSES = (TASK_STATUS_PENDING, TASK_STATUS_RETRY,
                  TASK_STATUS_FAILURE, TASK_STATUS_DONE)
 TASK_STATUSES_CHOICES = zip(TASK_STATUSES, TASK_STATUSES)
                 
+
 class TaskMeta(models.Model):
     task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
     status = models.CharField(_(u"task status"), max_length=50,
@@ -79,7 +81,7 @@ class PeriodicTaskMeta(models.Model):
         return u"<PeriodicTask: %s [last-run:%s, total-run:%d]>" % (
                 self.name, self.last_run_at, self.total_run_count)
 
-    def delay(self, **kwargs):
+    def delay(self, *args, **kwargs):
         self.task.delay()
         self.total_run_count = self.total_run_count + 1
         self.save()

+ 19 - 1
celery/platform.py

@@ -1,3 +1,4 @@
+"""celery.platform"""
 import os
 import sys
 import errno
@@ -20,18 +21,27 @@ if (hasattr(os, "devnull")):
 else:
    REDIRECT_TO = "/dev/null"
 
-class PIDFile(object):
 
+class PIDFile(object):
+    """Manages a pid file."""
     def __init__(self, pidfile):
         self.pidfile = pidfile
 
     def get_pid(self):
+        """Get the process id stored in the pidfile."""
         pidfile_fh = file(self.pidfile, "r")
         pid = int(pidfile_fh.read().strip())
         pidfile_fh.close()
         return pid
 
     def check(self):
+        """Check the status of the pidfile.
+        
+        If the pidfile exists, and the process is not running, it will
+        remove the stale pidfile and continue as normal. If the process
+        *is* running, it will exit the program with an error message.
+
+        """
         if os.path.exists(self.pidfile) and os.path.isfile(self.pidfile):
             pid = self.get_pid()
             try:
@@ -44,9 +54,16 @@ class PIDFile(object):
                 raise SystemExit("celeryd is already running.")
 
     def remove(self):
+        """Remove the pidfile."""
         os.unlink(self.pidfile)
 
     def write(self, pid=None):
+        """Write a pidfile.
+        
+        If ``pid`` is not specified the pid of the current process
+        will be used.
+        
+        """
         if not pid:
             pid = os.getpid()
         pidfile_fh = file(self.pidfile, "w")
@@ -55,6 +72,7 @@ class PIDFile(object):
 
 
 def remove_pidfile(pidfile):
+    """Remove the pidfile."""
     os.unlink(pidfile)
 
 

+ 12 - 0
celery/registry.py

@@ -1,3 +1,4 @@
+"""celery.registry"""
 from celery import discovery
 from UserDict import UserDict
 
@@ -20,9 +21,19 @@ class TaskRegistry(UserDict):
         self.data = {}
 
     def autodiscover(self):
+        """Autodiscover tasks using ``celery.discovery.autodiscover``."""
         discovery.autodiscover()
 
     def register(self, task, task_name=None):
+        """Register a task in the task registry.
+        
+        Task can either be a regular function, or a class inheriting
+        from :class:`celery.task.Task`.
+
+        If the task is a regular function, the ``task_name`` argument is
+        required.
+        
+        """
         is_class = False
         if hasattr(task, "run"):
             is_class = True
@@ -38,6 +49,7 @@ class TaskRegistry(UserDict):
             self.data[task_name] = task
 
     def unregister(self, task_name):
+        """Unregister task by name."""
         if hasattr(task_name, "run"):
             task_name = task_name.name
         if task_name not in self.data:

+ 1 - 0
celery/result.py

@@ -1,3 +1,4 @@
+"""celery.result"""
 from celery.backends import default_backend
 
 

+ 14 - 1
celery/timer.py

@@ -1,3 +1,4 @@
+"""celery.timer"""
 import time
 
 
@@ -14,6 +15,12 @@ class EventTimer(object):
         self.last_triggered = None
 
     def tick(self):
+        """Run a event timer clock tick.
+       
+        When the interval has run, the event will be triggered.
+        If interval is not set, the event will never be triggered.
+
+        """
         if not self.interval: # never trigger if no interval.
             return
         if not self.last_triggered or \
@@ -30,8 +37,14 @@ class TimeoutTimer(object):
         self.time_start = time.time()
 
     def tick(self):
+        """Run a timeout timer clock tick.
+
+        When ``timeout`` seconds has passed, it will raise a
+        :class:`TimeoutTimer` exception.
+        If ``timeout`` is not set, it will never time out.
+
+        """
         if not self.timeout:
             return
         if time.time() > self.time_start + self.timeout:
             raise TimeoutError("The operation timed out.")
-

+ 14 - 4
celery/worker.py

@@ -1,3 +1,4 @@
+"""celery.worker"""
 from carrot.connection import DjangoAMQPConnection
 from celery.messaging import TaskConsumer
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
@@ -25,18 +26,19 @@ class UnknownTask(Exception):
 
 
 def jail(task_id, callable_, args, kwargs):
+    """Wraps the task in a jail which saves the status and result
+    of the task execution to the task meta backend."""
     try:
         result = callable_(*args, **kwargs)
         mark_as_done(task_id, result)
-        print("SUCCESS: %s" % result)
         return result
     except Exception, exc:
         mark_as_failure(task_id, exc)
-        print("FAILURE: %s\n%s" % (exc, traceback.format_exc()))
         return exc
 
 
 class TaskWrapper(object):
+    """Class defining a task to be run."""
     def __init__(self, task_name, task_id, task_func, args, kwargs):
         self.task_name = task_name
         self.task_id = task_id
@@ -46,6 +48,8 @@ class TaskWrapper(object):
 
     @classmethod
     def from_message(cls, message):
+        """Create a TaskWrapper from a message returned by
+        :class:`celery.messaging.TaskConsumer`."""
         message_data = simplejson.loads(message.body)
         task_name = message_data["task"]
         task_id = message_data["id"]
@@ -58,6 +62,11 @@ class TaskWrapper(object):
         return cls(task_name, task_id, task_func, args, kwargs)
 
     def extend_kwargs_with_logging(self, loglevel, logfile):
+        """Extend the tasks keyword arguments with standard task arguments.
+
+        These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
+
+        """
         task_func_kwargs = {"logfile": logfile,
                             "loglevel": loglevel,
                             "task_id": self.task_id,
@@ -66,18 +75,19 @@ class TaskWrapper(object):
         return task_func_kwargs
 
     def execute(self, loglevel, logfile):
+        """Execute the task in a ``jail()`` and store its result and status
+        in the task meta backend."""
         task_func_kwargs = self.extend_kwargs_with_logging(loglevel, logfile)
         return jail(self.task_id, [
                         self.task_func, self.args, task_func_kwargs])
 
     def execute_using_pool(self, pool, loglevel, logfile):
+        """Like ``execute``, but using the ``multiprocessing`` pool."""
         task_func_kwargs = self.extend_kwargs_with_logging(loglevel, logfile)
         return pool.apply_async(jail, [self.task_id, self.task_func,
                                        self.args, task_func_kwargs])
 
 
-
-
 class TaskDaemon(object):
     """Executes tasks waiting in the task queue.