فهرست منبع

added TaskSetMeta model for database backend

Brad Jasper 15 سال پیش
والد
کامیت
cf06dc5cb3
4فایلهای تغییر یافته به همراه93 افزوده شده و 3 حذف شده
  1. 3 1
      celery/backends/database.py
  2. 45 0
      celery/managers.py
  3. 16 1
      celery/models.py
  4. 29 1
      celery/tests/test_models.py

+ 3 - 1
celery/backends/database.py

@@ -1,5 +1,5 @@
 """celery.backends.database"""
-from celery.models import TaskMeta, PeriodicTaskMeta
+from celery.models import TaskMeta, TaskSetMeta, PeriodicTaskMeta
 from celery.backends.base import BaseBackend
 
 
@@ -72,3 +72,5 @@ class Backend(BaseBackend):
     def cleanup(self):
         """Delete expired metadata."""
         TaskMeta.objects.delete_expired()
+        TaskSetMeta.objects.delete_expired()
+

+ 45 - 0
celery/managers.py

@@ -60,6 +60,51 @@ TABLE_LOCK_FOR_ENGINE = {"mysql": MySQLTableLock}
 table_lock = TABLE_LOCK_FOR_ENGINE.get(settings.DATABASE_ENGINE, TableLock)
 
 
+class TaskSetManager(models.Manager):
+    """Manager for :class:`celery.models.TaskSet` models."""
+
+    def get_all_expired(self):
+        """Get all expired taskset results."""
+        return self.filter(date_done__lt=datetime.now() - TASK_RESULT_EXPIRES)
+
+    def delete_expired(self):
+        """Delete all expired taskset results."""
+        self.get_all_expired().delete()
+
+    def get_result(self, taskset_id):
+        """Get task meta for task by ``taskset_id``."""
+        try:
+            return self.get(taskset_id=taskset_id)
+        except self.model.DoesNotExist:
+            return None
+
+    def store_result(self, taskset_id, result, exception_retry=True):
+        """Store the result of a taskset.
+
+        :param taskset_id: task set id
+
+        :param result: The return value of the taskset
+
+        """
+        try:
+            taskset, created = self.get_or_create(taskset_id=taskset_id, defaults={
+                                                "result": result})
+            if not created:
+                taskset.result = result
+                taskset.save()
+        except Exception, exc:
+            # depending on the database backend we can get various exceptions.
+            # for excample, psycopg2 raises an exception if some operation
+            # breaks transaction, and saving task result won't be possible
+            # until we rollback transaction
+            if exception_retry:
+                transaction.rollback_unless_managed()
+                self.store_result(taskset_id, result, False)
+            else:
+                raise
+
+
+
 class TaskManager(models.Manager):
     """Manager for :class:`celery.models.Task` models."""
 

+ 16 - 1
celery/models.py

@@ -6,7 +6,7 @@ Django Models.
 import django
 from django.db import models
 from celery.registry import tasks
-from celery.managers import TaskManager, PeriodicTaskManager
+from celery.managers import TaskManager, TaskSetManager, PeriodicTaskManager
 from celery.fields import PickledObjectField
 from celery import conf
 from django.utils.translation import ugettext_lazy as _
@@ -40,6 +40,21 @@ class TaskMeta(models.Model):
     def __unicode__(self):
         return u"<Task: %s done:%s>" % (self.task_id, self.status)
 
+class TaskSetMeta(models.Model):
+    """TaskSet result"""
+    taskset_id = models.CharField(_(u"task id"), max_length=255, unique=True)
+    result = PickledObjectField()
+    date_done = models.DateTimeField(_(u"done at"), auto_now=True)
+
+    objects = TaskSetManager()
+
+    class Meta:
+        """Model meta-data."""
+        verbose_name = _(u"taskset meta")
+        verbose_name_plural = _(u"taskset meta")
+
+    def __unicode__(self):
+        return u"<TaskSet: %s>" % (self.taskset_id)
 
 class PeriodicTaskMeta(models.Model):
     """Information about a Periodic Task."""

+ 29 - 1
celery/tests/test_models.py

@@ -1,6 +1,6 @@
 import unittest
 from datetime import datetime, timedelta
-from celery.models import TaskMeta, PeriodicTaskMeta
+from celery.models import TaskMeta, TaskSetMeta, PeriodicTaskMeta
 from celery.task import PeriodicTask
 from celery.registry import tasks
 from celery.utils import gen_unique_id
@@ -18,6 +18,11 @@ class TestModels(unittest.TestCase):
         taskmeta, created = TaskMeta.objects.get_or_create(task_id=id)
         return taskmeta
 
+    def createTaskSetMeta(self):
+        id = gen_unique_id()
+        tasksetmeta, created = TaskSetMeta.objects.get_or_create(taskset_id=id)
+        return tasksetmeta
+
     def createPeriodicTaskMeta(self, name):
         ptaskmeta, created = PeriodicTaskMeta.objects.get_or_create(name=name,
                 defaults={"last_run_at": datetime.now()})
@@ -50,6 +55,29 @@ class TestModels(unittest.TestCase):
 
         TaskMeta.objects.delete_expired()
         self.assertFalse(m1 in TaskMeta.objects.all())
+    
+    def test_tasksetmeta(self):
+        m1 = self.createTaskSetMeta()
+        m2 = self.createTaskSetMeta()
+        m3 = self.createTaskSetMeta()
+        self.assertTrue(unicode(m1).startswith("<TaskSet:"))
+        self.assertTrue(m1.taskset_id)
+        self.assertTrue(isinstance(m1.date_done, datetime))
+
+        self.assertEquals(TaskSetMeta.objects.get_result(m1.taskset_id).taskset_id,
+                m1.taskset_id)
+
+        # Have to avoid save() because it applies the auto_now=True.
+        TaskSetMeta.objects.filter(taskset_id=m1.taskset_id).update(
+                date_done=datetime.now() - timedelta(days=10))
+
+        expired = TaskSetMeta.objects.get_all_expired()
+        self.assertTrue(m1 in expired)
+        self.assertFalse(m2 in expired)
+        self.assertFalse(m3 in expired)
+
+        TaskSetMeta.objects.delete_expired()
+        self.assertFalse(m1 in TaskSetMeta.objects.all())
 
     def test_periodic_taskmeta(self):
         tasks.register(TestPeriodicTask)