Browse Source

Skeleton for the ability to select different backends for different actions,
e.g using the database backend for periodic tasks, while using tyrant for
storing results.

Ask Solem 16 years ago
parent
commit
fa3c19c271

+ 36 - 2
celery/backends/__init__.py

@@ -4,7 +4,11 @@ from django.conf import settings
 import sys
 import sys
 
 
 DEFAULT_BACKEND = "database"
 DEFAULT_BACKEND = "database"
+DEFAULT_PERIODIC_STATUS_BACKEND = "database"
 CELERY_BACKEND = getattr(settings, "CELERY_BACKEND", DEFAULT_BACKEND)
 CELERY_BACKEND = getattr(settings, "CELERY_BACKEND", DEFAULT_BACKEND)
+CELERY_PERIODIC_STATUS_BACKEND = getattr(settings,
+                                    "CELERY_PERIODIC_STATUS_BACKEND",
+                                    DEFAULT_PERIODIC_STATUS_BACKEND)
 
 
 
 
 def get_backend_cls(backend):
 def get_backend_cls(backend):
@@ -31,13 +35,35 @@ get_default_backend_cls = partial(get_backend_cls, CELERY_BACKEND)
 
 
 
 
 """
 """
-.. class:: DefaultBackend
+.. function:: get_default_periodicstatus_backend_cls()
 
 
-    The backend class specified in :setting:`CELERY_BACKEND`.
+    Get the backend class specified in
+    :settings:`CELERY_PERIODIC_STATUS_BACKEND`.
+
+"""
+get_default_periodicstatus_backend_cls = partial(get_backend_cls,
+                                            CELERY_PERIODIC_STATUS_BACKEND)
+
+
+"""
+.. class:: DefaultBackend
+    
+    The default backend class used for storing task results and status,
+    specified in :setting:`CELERY_BACKEND`.
 
 
 """
 """
 DefaultBackend = get_default_backend_cls()
 DefaultBackend = get_default_backend_cls()
 
 
+
+"""
+.. class:: DefaultPeriodicStatusBackend
+
+    The default backend for storing periodic task metadata, specified
+    in :setting:`CELERY_PERIODIC_STATUS_BACKEND`.
+
+"""
+DefaultPeriodicStatusBackend = get_default_periodicstatus_backend_cls()
+
 """
 """
 .. data:: default_backend
 .. data:: default_backend
 
 
@@ -45,3 +71,11 @@ DefaultBackend = get_default_backend_cls()
 
 
 """
 """
 default_backend = DefaultBackend()
 default_backend = DefaultBackend()
+
+"""
+.. data:: default_periodic_status_backend
+
+    An instance of :class:`DefaultPeriodicStatusBackend`.
+
+"""
+default_periodic_status_backend = DefaultPeriodicStatusBackend()

+ 4 - 3
celery/backends/base.py

@@ -80,12 +80,13 @@ class UnpickleableExceptionWrapper(Exception):
 class BaseBackend(object):
 class BaseBackend(object):
     """The base backend class. All backends should inherit from this."""
     """The base backend class. All backends should inherit from this."""
 
 
+    capabilities = []
     UnpickleableExceptionWrapper = UnpickleableExceptionWrapper
     UnpickleableExceptionWrapper = UnpickleableExceptionWrapper
 
 
     def store_result(self, task_id, result, status):
     def store_result(self, task_id, result, status):
         """Store the result and status of a task."""
         """Store the result and status of a task."""
         raise NotImplementedError(
         raise NotImplementedError(
-                "Backends must implement the store_result method")
+                "store_result is not supported by this backend.")
 
 
     def mark_as_done(self, task_id, result):
     def mark_as_done(self, task_id, result):
         """Mark task as successfully executed."""
         """Mark task as successfully executed."""
@@ -130,7 +131,7 @@ class BaseBackend(object):
     def get_status(self, task_id):
     def get_status(self, task_id):
         """Get the status of a task."""
         """Get the status of a task."""
         raise NotImplementedError(
         raise NotImplementedError(
-                "Backends must implement the get_status method")
+                "get_status is not supported by this backend.")
 
 
     def prepare_result(self, result):
     def prepare_result(self, result):
         """Prepare result for storage."""
         """Prepare result for storage."""
@@ -141,7 +142,7 @@ class BaseBackend(object):
     def get_result(self, task_id):
     def get_result(self, task_id):
         """Get the result of a task."""
         """Get the result of a task."""
         raise NotImplementedError(
         raise NotImplementedError(
-                "Backends must implement the get_result method")
+                "get_result is not supported by this backend.")
 
 
     def is_done(self, task_id):
     def is_done(self, task_id):
         """Returns ``True`` if the task was successfully executed."""
         """Returns ``True`` if the task was successfully executed."""

+ 2 - 0
celery/backends/cache.py

@@ -10,6 +10,8 @@ except ImportError:
 class Backend(BaseBackend):
 class Backend(BaseBackend):
     """Backend using the Django cache framework to store task metadata."""
     """Backend using the Django cache framework to store task metadata."""
 
 
+    capabilities = ["ResultStore"]
+
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         super(Backend, self).__init__(*args, **kwargs)
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
         self._cache = {}

+ 8 - 1
celery/backends/database.py

@@ -1,15 +1,22 @@
 """celery.backends.database"""
 """celery.backends.database"""
-from celery.models import TaskMeta
+from celery.models import TaskMeta, PeriodicTaskMeta
 from celery.backends.base import BaseBackend
 from celery.backends.base import BaseBackend
 
 
 
 
 class Backend(BaseBackend):
 class Backend(BaseBackend):
     """The database backends. Using Django models to store task metadata."""
     """The database backends. Using Django models to store task metadata."""
 
 
+    capabilities = ["ResultStore", "PeriodicStatus"]
+
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         super(Backend, self).__init__(*args, **kwargs)
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
         self._cache = {}
 
 
+    def run_periodic_tasks(self):
+        waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
+        for waiting_task in waiting_tasks:
+            waiting_task.delay()
+
     def store_result(self, task_id, result, status):
     def store_result(self, task_id, result, status):
         """Mark task as done (executed)."""
         """Mark task as done (executed)."""
         if status == "DONE":
         if status == "DONE":

+ 2 - 0
celery/backends/tyrant.py

@@ -31,6 +31,8 @@ class Backend(BaseBackend):
     tyrant_host = None
     tyrant_host = None
     tyrant_port = None
     tyrant_port = None
 
 
+    capabilities = ["ResultStore"]
+
     def __init__(self, tyrant_host=None, tyrant_port=None):
     def __init__(self, tyrant_host=None, tyrant_port=None):
         """Initialize Tokyo Tyrant backend instance.
         """Initialize Tokyo Tyrant backend instance.