Przeglądaj źródła

Added CELERY_TASK_META_USE_DB to store task metadata in the database instead of memcache.

Ask Solem 16 lat temu
rodzic
commit
64e64729d3
4 zmienionych plików z 68 dodań i 11 usunięć
  1. 4 0
      celery/conf.py
  2. 24 1
      celery/managers.py
  3. 16 2
      celery/models.py
  4. 24 8
      celery/task.py

+ 4 - 0
celery/conf.py

@@ -1,6 +1,8 @@
 from django.conf import settings
 import logging
 
+DEFAULT_TASK_META_USE_DB = False
+
 # The number of processes to work simultaneously at processing the queue.
 DEFAULT_DAEMON_CONCURRENCY = 10
 
@@ -34,6 +36,8 @@ LOG_LEVELS = {
     "FATAL": logging.FATAL,
 }
 
+TASK_META_USE_DB = getattr(settings, "CELERY_TASK_META_USE_DB",
+                            DEFAULT_TASK_META_USE_DB)
 LOG_FORMAT = getattr(settings, "CELERYD_DAEMON_LOG_FORMAT",
                             DEFAULT_LOG_FMT)
 DAEMON_LOG_FILE = getattr(settings, "CELERYD_LOG_FILE",

+ 24 - 1
celery/managers.py

@@ -2,7 +2,30 @@ from django.db import models
 from celery.registry import tasks
 from datetime import datetime, timedelta
 
-__all__ = ["PeriodicTaskManager"]
+__all__ = ["TaskManager", "PeriodicTaskManager"]
+
+
+class TaskManager(models.Manager):
+    
+    def get_task(self, task_id):
+        task, created = self.get_or_create(task_id=task_id)
+        return task
+
+    def is_done(self, task_id):
+        return self.get_task(task_id).is_done
+
+    def get_all_expired(self):
+        return self.filter(date_done__lt=datetime.now() - timedelta(days=5))
+
+    def delete_expired(self):
+        self.get_all_expired().delete()
+
+    def mark_as_done(self, task_id):
+        task, created = self.get_or_create(task_id=task_id, defaults={
+                                            "is_done": True})
+        if not created:
+            task.is_done = True
+            task.save()
 
 
 class PeriodicTaskManager(models.Manager):

+ 16 - 2
celery/models.py

@@ -1,9 +1,23 @@
 from django.db import models
 from celery.registry import tasks
-from celery.managers import PeriodicTaskManager
+from celery.managers import TaskManager, PeriodicTaskManager
 from django.utils.translation import ugettext_lazy as _
 
-__all__ = ["PeriodicTaskMeta"]
+__all__ = ["TaskMeta", "PeriodicTaskMeta"]
+
+
+class TaskMeta(models.Model):
+    task_id = models.CharField(_(u"task id"), max_length=255, unique=True)
+    is_done = models.BooleanField(_(u"is done"), default=False)
+    date_done = models.DateTimeField(_(u"done at"), auto_add=True)
+    objects = TaskManager()
+
+    class Meta:
+        verbose_name = _(u"task meta")
+        verbose_name_plural = _(u"task meta")
+
+    def __unicode__(self):
+        return u"<Task: %s done:%s>" % (self.task_id, self.is_done)
 
 
 class PeriodicTaskMeta(models.Model):

+ 24 - 8
celery/task.py

@@ -1,7 +1,9 @@
 from carrot.connection import DjangoAMQPConnection
 from celery.log import setup_logger
+from celery.conf import TASK_META_USE_DB
 from celery.registry import tasks
 from celery.messaging import TaskPublisher, TaskConsumer
+from celery.models import TaskMeta
 from django.core.cache import cache
 from datetime import timedelta
 import uuid
@@ -36,13 +38,19 @@ def gen_task_done_cache_key(task_id):
 def mark_as_done(task_id, result):
     if result is None:
         result = True
-    cache_key = gen_task_done_cache_key(task_id)
-    cache.set(cache_key, result)
+    if TASK_META_USE_DB:
+        TaskMeta.objects.mark_as_done(task_id)
+    else:
+        cache_key = gen_task_done_cache_key(task_id)
+        cache.set(cache_key, result)
 
 
 def is_done(task_id):
-    cache_key = gen_task_done_cache_key(task_id)
-    return cache.get(cache_key)
+    if TASK_META_USE_DB:
+        return TaskMeta.objects.is_done(task_id)
+    else:
+        cache_key = gen_task_done_cache_key(task_id)
+        return cache.get(cache_key)
 
 
 class Task(object):
@@ -146,13 +154,21 @@ class PeriodicTask(Task):
 
 
 class TestTask(Task):
-    name = "celery-test-task"
+    name = "celery.test_task"
 
     def run(self, some_arg, **kwargs):
         logger = self.get_logger(**kwargs)
         logger.info("TestTask got some_arg=%s" % some_arg)
+tasks.register(TestTask)
+
+
+class DeleteExpiredTaskMetaTask(PeriodicTask):
+    name = "celery.delete_expired_task_meta"
+    run_every = timedelta(days=1)
 
-    def after(self, task_id):
+    def run(self, **kwargs):
         logger = self.get_logger(**kwargs)
-        logger.info("TestTask with id %s was successfully executed." % task_id)
-tasks.register(TestTask)
+        logger.info("Deleting expired task meta objects...")
+        TaskMeta.objects.delete_expired()
+if TASK_META_USE_DB:
+    tasks.register(DeleteExpiredTaskMetaTask)