Browse Source

Improve docstrings, remove TASK_META_USE_DB setting.

Ask Solem 16 years ago
parent
commit
8f569fcb59

+ 8 - 0
celery/backends/__init__.py

@@ -1,3 +1,4 @@
+"""celery.backends"""
 from functools import partial
 from functools import partial
 from django.conf import settings
 from django.conf import settings
 import sys
 import sys
@@ -7,6 +8,13 @@ CELERY_BACKEND = getattr(settings, "CELERY_BACKEND", DEFAULT_BACKEND)
 
 
 
 
 def get_backend_cls(backend):
 def get_backend_cls(backend):
+    """Get backend class by name.
+
+    If the name does not include ``.`` (is not fully qualified),
+    ``celery.backends.`` will be prepended to the name. e.g.
+    ``database`` becomes ``celery.backends.database``.
+
+    """
     if backend.find(".") == -1:
     if backend.find(".") == -1:
         backend = "celery.backends.%s" % backend
         backend = "celery.backends.%s" % backend
     __import__(backend)
     __import__(backend)

+ 5 - 1
celery/backends/cache.py

@@ -1,3 +1,4 @@
+"""celery.backends.cache"""
 from django.core.cache import cache
 from django.core.cache import cache
 from celery.backends.base import BaseBackend
 from celery.backends.base import BaseBackend
 try:
 try:
@@ -7,7 +8,7 @@ except ImportError:
 
 
 
 
 class Backend(BaseBackend):
 class Backend(BaseBackend):
-
+    """Backend using the Django cache framework to store task metadata."""
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         super(Backend, self).__init__(*args, **kwargs)
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
         self._cache = {}
@@ -22,12 +23,15 @@ class Backend(BaseBackend):
         cache.set(self._cache_key(task_id), meta)
         cache.set(self._cache_key(task_id), meta)
 
 
     def get_status(self, task_id):
     def get_status(self, task_id):
+        """Get the status of a task."""
         return self._get_task_meta_for(self, task_id)["status"]
         return self._get_task_meta_for(self, task_id)["status"]
 
 
     def get_result(self, task_id):
     def get_result(self, task_id):
+        """Get the result of a task."""
         return self._get_task_meta_for(self, task_id)["result"]
         return self._get_task_meta_for(self, task_id)["result"]
 
 
     def is_done(self, task_id):
     def is_done(self, task_id):
+        """Returns ``True`` if the task has been executed successfully."""
         return self.get_status(task_id) == "DONE"
         return self.get_status(task_id) == "DONE"
 
 
     def _get_task_meta_for(self, task_id):
     def _get_task_meta_for(self, task_id):

+ 3 - 0
celery/backends/database.py

@@ -1,8 +1,10 @@
+"""celery.backends.database"""
 from celery.models import TaskMeta
 from celery.models import TaskMeta
 from celery.backends.base import BaseBackend
 from celery.backends.base import BaseBackend
 
 
 
 
 class Backend(BaseBackend):
 class Backend(BaseBackend):
+    """The database backends. Using Django models to store task metadata."""
 
 
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         super(Backend, self).__init__(*args, **kwargs)
         super(Backend, self).__init__(*args, **kwargs)
@@ -34,4 +36,5 @@ class Backend(BaseBackend):
         return meta
         return meta
 
 
     def cleanup(self):
     def cleanup(self):
+        """Delete expired metadata."""
         TaskMeta.objects.delete_expired()
         TaskMeta.objects.delete_expired()

+ 4 - 0
celery/backends/tyrant.py

@@ -1,3 +1,4 @@
+"""celery.backends.tyrant"""
 from django.core.exceptions import ImproperlyConfigured
 from django.core.exceptions import ImproperlyConfigured
 
 
 try:
 try:
@@ -43,12 +44,15 @@ class Backend(BaseBackend):
         get_server()[self._cache_key(task_id)] = serialize(meta)
         get_server()[self._cache_key(task_id)] = serialize(meta)
 
 
     def get_status(self, task_id):
     def get_status(self, task_id):
+        """Get the status for a task."""
         return self._get_task_meta_for(self, task_id)["status"]
         return self._get_task_meta_for(self, task_id)["status"]
 
 
     def get_result(self, task_id):
     def get_result(self, task_id):
+        """Get the result of a task."""
         return self._get_task_meta_for(self, task_id)["result"]
         return self._get_task_meta_for(self, task_id)["result"]
 
 
     def is_done(self, task_id):
     def is_done(self, task_id):
+        """Returns ``True`` if the task executed successfully."""
         return self.get_status(task_id) == "DONE"
         return self.get_status(task_id) == "DONE"
 
 
     def _get_task_meta_for(self, task_id):
     def _get_task_meta_for(self, task_id):

+ 1 - 6
celery/conf.py

@@ -1,3 +1,4 @@
+"""celery.conf"""
 from django.conf import settings
 from django.conf import settings
 import logging
 import logging
 
 
@@ -10,10 +11,6 @@ DEFAULT_AMQP_ROUTING_KEY = "celery"
 # The default AMQP consumer queue.
 # The default AMQP consumer queue.
 DEFAULT_AMQP_CONSUMER_QUEUE = "celery"
 DEFAULT_AMQP_CONSUMER_QUEUE = "celery"
 
 
-# If True, task meta information (like is_done) is saved to the database
-# instead of using the Django cache framework.
-DEFAULT_TASK_META_USE_DB = False
-
 # The number of processes to work simultaneously at processing the queue.
 # The number of processes to work simultaneously at processing the queue.
 DEFAULT_DAEMON_CONCURRENCY = 10
 DEFAULT_DAEMON_CONCURRENCY = 10
 
 
@@ -48,8 +45,6 @@ LOG_LEVELS = {
 }
 }
 
 
 
 
-TASK_META_USE_DB = getattr(settings, "CELERY_TASK_META_USE_DB",
-                            DEFAULT_TASK_META_USE_DB)
 LOG_FORMAT = getattr(settings, "CELERYD_DAEMON_LOG_FORMAT",
 LOG_FORMAT = getattr(settings, "CELERYD_DAEMON_LOG_FORMAT",
                             DEFAULT_LOG_FMT)
                             DEFAULT_LOG_FMT)
 DAEMON_LOG_FILE = getattr(settings, "CELERYD_LOG_FILE",
 DAEMON_LOG_FILE = getattr(settings, "CELERYD_LOG_FILE",

+ 1 - 1
celery/datastructures.py

@@ -14,7 +14,7 @@ class PositionQueue(UserList):
         self.length = length
         self.length = length
         self.data = map(self.UnfilledPosition, xrange(length))
         self.data = map(self.UnfilledPosition, xrange(length))
 
 
-    def is_full(self):
+    def full(self):
         """Returns ``True`` if all the positions has been filled."""
         """Returns ``True`` if all the positions has been filled."""
         return len(self) >= self.length
         return len(self) >= self.length
 
 

+ 54 - 28
celery/task.py

@@ -1,6 +1,5 @@
 from carrot.connection import DjangoAMQPConnection
 from carrot.connection import DjangoAMQPConnection
 from celery.log import setup_logger
 from celery.log import setup_logger
-from celery.conf import TASK_META_USE_DB
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.models import TaskMeta
 from celery.models import TaskMeta
@@ -17,8 +16,6 @@ import pickle
 def delay_task(task_name, *args, **kwargs):
 def delay_task(task_name, *args, **kwargs):
     """Delay a task for execution by the ``celery`` daemon.
     """Delay a task for execution by the ``celery`` daemon.
 
 
-    Examples
-    --------
         >>> delay_task("update_record", name="George Constanza", age=32)
         >>> delay_task("update_record", name="George Constanza", age=32)
 
 
     """
     """
@@ -70,10 +67,6 @@ class Task(object):
     it also has to define the ``run`` method, which is the actual method the
     it also has to define the ``run`` method, which is the actual method the
     ``celery`` daemon executes.
     ``celery`` daemon executes.
     
     
-
-    Examples
-    --------
-
     This is a simple task just logging a message,
     This is a simple task just logging a message,
 
 
         >>> from celery.task import tasks, Task
         >>> from celery.task import tasks, Task
@@ -151,9 +144,6 @@ class TaskSet(object):
     """A task containing several subtasks, making it possible
     """A task containing several subtasks, making it possible
     to track how many, or when all of the tasks are completed.
     to track how many, or when all of the tasks are completed.
     
     
-    Example Usage
-    --------------
-
         >>> from djangofeeds.tasks import RefreshFeedTask
         >>> from djangofeeds.tasks import RefreshFeedTask
         >>> taskset = TaskSet(RefreshFeedTask, args=[
         >>> taskset = TaskSet(RefreshFeedTask, args=[
         ...                 {"feed_url": "http://cnn.com/rss"},
         ...                 {"feed_url": "http://cnn.com/rss"},
@@ -184,8 +174,6 @@ class TaskSet(object):
 
 
         Returns a tuple with the taskset id, and a list of subtask id's.
         Returns a tuple with the taskset id, and a list of subtask id's.
 
 
-        Examples
-        --------
             >>> ts = RefreshFeeds([
             >>> ts = RefreshFeeds([
             ...         ["http://foo.com/rss", {}],
             ...         ["http://foo.com/rss", {}],
             ...         ["http://bar.com/rss", {}],
             ...         ["http://bar.com/rss", {}],
@@ -213,45 +201,76 @@ class TaskSet(object):
         return taskset_id, subtask_ids
         return taskset_id, subtask_ids
 
 
     def iterate(self):
     def iterate(self):
+        """Iterate over the results returned after calling ``run()``.
+        
+        If any of the tasks raises an exception, the exception will
+        be reraised by ``iterate``.
+        """
         taskset_id, subtask_ids = self.run()
         taskset_id, subtask_ids = self.run()
         results = dict([(task_id, AsyncResult(task_id))
         results = dict([(task_id, AsyncResult(task_id))
                             for task_id in subtask_ids])
                             for task_id in subtask_ids])
         while results:
         while results:
-            for pending_result in results:
+            for task_id, pending_result in results.items():
                 if pending_result.status == "DONE":
                 if pending_result.status == "DONE":
+                    del(results[task_id])
                     yield pending_result.result
                     yield pending_result.result
                 elif pending_result.status == "FAILURE":
                 elif pending_result.status == "FAILURE":
                     raise pending_result.result
                     raise pending_result.result
 
 
     def join(self, timeout=None):
     def join(self, timeout=None):
-        timeout_timer = TimeOutTimer(timeout)
+        """Gather the results for all of the tasks in the taskset,
+        and return a list with them ordered by the order of which they
+        were called.
+
+        If any of the tasks raises an exception, the exception
+        will be reraised by ``join``.
+
+        If ``timeout`` is not ``None`` and the operation takes
+        longer than ``timeout`` seconds, it will raise
+        the :class:`celery.timer.TimeoutError` exception.
+
+        """
+        timeout_timer = TimeOutTimer(timeout) # Timeout timer starts here.
         taskset_id, subtask_ids = self.run()
         taskset_id, subtask_ids = self.run()
         pending_results = map(AsyncResult, subtask_ids)
         pending_results = map(AsyncResult, subtask_ids)
         results = PositionQueue(length=len(subtask_ids))
         results = PositionQueue(length=len(subtask_ids))
 
 
         while True:
         while True:
-            for i, pending_result in enumerate(pending_results):
+            for position, pending_result in enumerate(pending_results):
                 if pending_result.status == "DONE":
                 if pending_result.status == "DONE":
-                    results[i] = pending_result.result
+                    results[position] = pending_result.result
                 elif pending_result.status == "FAILURE":
                 elif pending_result.status == "FAILURE":
                     raise pending_result.result
                     raise pending_result.result
-            if results.is_full():
+            if results.full():
+                # Make list copy, so the returned type is not a position
+                # queue.
                 return list(results)
                 return list(results)
+
+            # This raises TimeoutError when timed out.
             timeout_timer.tick()
             timeout_timer.tick()
 
 
     @classmethod
     @classmethod
     def remote_execute(cls, func, args):
     def remote_execute(cls, func, args):
+        """Apply ``args`` to function by distributing the args to the
+        celery server(s)."""
         pickled = pickle.dumps(func)
         pickled = pickle.dumps(func)
         arguments = [[[pickled, arg, {}], {}] for arg in args]
         arguments = [[[pickled, arg, {}], {}] for arg in args]
         return cls(ExecuteRemoteTask, arguments)
         return cls(ExecuteRemoteTask, arguments)
 
 
     @classmethod
     @classmethod
     def map(cls, func, args, timeout=None):
     def map(cls, func, args, timeout=None):
+        """Distribute processing of the arguments and collect the results."""
         remote_task = cls.remote_execute(func, args)
         remote_task = cls.remote_execute(func, args)
         return remote_task.join(timeout=timeout)
         return remote_task.join(timeout=timeout)
 
 
     @classmethod
     @classmethod
     def map_async(cls, func, args, timeout=None):
     def map_async(cls, func, args, timeout=None):
+        """Distribute processing of the arguments and collect the results
+        asynchronously.
+        
+        Returns :class:`celery.result.AsyncResult` instance.
+        
+        """
         serfunc = pickle.dumps(func)
         serfunc = pickle.dumps(func)
         return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
         return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
 
 
@@ -259,9 +278,6 @@ class TaskSet(object):
 def dmap(func, args, timeout=None):
 def dmap(func, args, timeout=None):
     """Distribute processing of the arguments and collect the results.
     """Distribute processing of the arguments and collect the results.
 
 
-    Example
-    --------
-
         >>> from celery.task import map
         >>> from celery.task import map
         >>> import operator
         >>> import operator
         >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
         >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
@@ -272,6 +288,7 @@ def dmap(func, args, timeout=None):
 
 
 
 
 class AsynchronousMapTask(Task):
 class AsynchronousMapTask(Task):
+    """Task used internally by ``dmap_async``."""
     name = "celery.map_async"
     name = "celery.map_async"
 
 
     def run(self, serfunc, args, **kwargs):
     def run(self, serfunc, args, **kwargs):
@@ -282,10 +299,9 @@ tasks.register(AsynchronousMapTask)
 
 
 def dmap_async(func, args, timeout=None):
 def dmap_async(func, args, timeout=None):
     """Distribute processing of the arguments and collect the results
     """Distribute processing of the arguments and collect the results
-    asynchronously. Returns a :class:`AsyncResult` object.
-
-    Example
-    --------
+    asynchronously.
+    
+    Returns a :class:`celery.result.AsyncResult` object.
 
 
         >>> from celery.task import dmap_async
         >>> from celery.task import dmap_async
         >>> import operator
         >>> import operator
@@ -310,9 +326,6 @@ class PeriodicTask(Task):
 
 
     You have to register the periodic task in the task registry.
     You have to register the periodic task in the task registry.
 
 
-    Examples
-    --------
-
         >>> from celery.task import tasks, PeriodicTask
         >>> from celery.task import tasks, PeriodicTask
         >>> from datetime import timedelta
         >>> from datetime import timedelta
         >>> class MyPeriodicTask(PeriodicTask):
         >>> class MyPeriodicTask(PeriodicTask):
@@ -340,17 +353,30 @@ class PeriodicTask(Task):
         super(PeriodicTask, self).__init__()
         super(PeriodicTask, self).__init__()
 
 
 
 
-
 class ExecuteRemoteTask(Task):
 class ExecuteRemoteTask(Task):
+    """Execute arbitrary function/object.
+
+    The object must be pickleable, so you can't use lambdas or functions
+    defined in the REPL.
+    
+    """
     name = "celery.execute_remote"
     name = "celery.execute_remote"
 
 
     def run(self, ser_callable, fargs, fkwargs, **kwargs):
     def run(self, ser_callable, fargs, fkwargs, **kwargs):
+        """Execute the pickled ``ser_callable``, with ``fargs`` as positional
+        arguments and ``fkwargs`` as keyword arguments."""
         callable_ = pickle.loads(ser_callable)
         callable_ = pickle.loads(ser_callable)
         return callable_(*fargs, **fkwargs)
         return callable_(*fargs, **fkwargs)
 tasks.register(ExecuteRemoteTask)
 tasks.register(ExecuteRemoteTask)
 
 
 
 
 def execute_remote(func, *args, **kwargs):
 def execute_remote(func, *args, **kwargs):
+    """Execute arbitrary function/object remotely.
+
+    The object must be picklable, so you can't use lambdas or functions
+    defined in the REPL (the objects must have an associated module).
+    
+    """
     return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
     return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
 
 
 
 

+ 0 - 2
celery/tests/test_conf.py

@@ -22,8 +22,6 @@ SETTING_VARS = (
         "DEFAULT_DAEMON_LOG_FILE"),
         "DEFAULT_DAEMON_LOG_FILE"),
     ("CELERYD_DAEMON_LOG_FORMAT", "LOG_FORMAT",
     ("CELERYD_DAEMON_LOG_FORMAT", "LOG_FORMAT",
         "DEFAULT_LOG_FMT"),
         "DEFAULT_LOG_FMT"),
-    ("CELERY_TASK_META_USE_DB", "TASK_META_USE_DB",
-        "DEFAULT_TASK_META_USE_DB"),
 )
 )
 
 
 
 

+ 1 - 15
celery/worker.py

@@ -7,6 +7,7 @@ from celery.registry import tasks
 from celery.datastructures import TaskProcessQueue
 from celery.datastructures import TaskProcessQueue
 from celery.models import PeriodicTaskMeta
 from celery.models import PeriodicTaskMeta
 from celery.task import mark_as_done, mark_as_failure
 from celery.task import mark_as_done, mark_as_failure
+from celery.timer import EventTimer
 import multiprocessing
 import multiprocessing
 import simplejson
 import simplejson
 import traceback
 import traceback
@@ -75,21 +76,6 @@ class TaskWrapper(object):
                                        self.args, task_func_kwargs])
                                        self.args, task_func_kwargs])
 
 
 
 
-class EventTimer(object):
-    """Do something at an interval."""
-
-    def __init__(self, event, interval=None):
-        self.event = event
-        self.interval = interval
-        self.last_triggered = None
-
-    def tick(self):
-        if not self.interval: # never trigger if no interval.
-            return
-        if not self.last_triggered or \
-                time.time() > self.last_triggered + self.interval:
-            self.event()
-            self.last_triggered = time.time()
 
 
 
 
 class TaskDaemon(object):
 class TaskDaemon(object):