Browse Source

Added Periodic Tasks

Ask Solem 16 years ago
parent
commit
88a5aede4c
7 changed files with 124 additions and 8 deletions
  1. 23 0
      README.rst
  2. 16 0
      crunchy/managers.py
  3. 8 6
      crunchy/messaging.py
  4. 31 0
      crunchy/models.py
  5. 15 0
      crunchy/registry.py
  6. 23 0
      crunchy/task.py
  7. 8 2
      crunchy/worker.py

+ 23 - 0
README.rst

@@ -113,6 +113,29 @@ Then you can add new tasks in your applications ``tasks.py`` module,
                         for_url, clicks_for_url.clicks)
     tasks.register(increment_click, "increment_click")
 
+
+Periodic Tasks
+---------------
+
+Periodic tasks are tasks that are run every ``n`` seconds. They don't
+support extra arguments. Here's an example of a periodic task:
+
+
+    >>> from crunchy.task import tasks, PeriodicTask
+    >>> class MyPeriodicTask(PeriodicTask):
+    ...     name = "foo.my-periodic-task"
+    ...     run_every = 30 # seconds
+    ...
+    ...     def run(self, **kwargs):
+    ...         logger = self.get_logger(**kwargs)
+    ...         logger.info("Running periodic task!")
+    ...
+    >>> tasks.register(MyPeriodicTask)
+
+
+For periodic tasks to work you need to add crunchy to ``INSTALLED_APPS``,
+and issue a ``syncdb``.
+
 License
 =======
 

+ 16 - 0
crunchy/managers.py

@@ -0,0 +1,16 @@
+from django.db import models
+from crunchy.registry import tasks
+from datetime import datetime, timedelta
+
+
+class PeriodicTaskManager(models.Manager):
+
+    def get_waiting_tasks(self):
+        periodic_tasks = tasks.get_all_periodic()
+        waiting = []
+        for task_name, task in periodic_tasks.items():
+            task_meta, created = self.get_or_create(name=task_name)
+            run_at = task_meta.last_run_at + timedelta(seconds=task.run_every)
+            if datetime.now() > run_at:
+                waiting.append(task_meta)
+        return waiting

+ 8 - 6
crunchy/messaging.py

@@ -2,8 +2,14 @@ from carrot.messaging import Publisher, Consumer
 import uuid
 
 
+class NoProcessConsumer(Consumer):
+    
+    def receive(self, message_data, message):
+        raise NotImplementedError(
+                "Don't use process_next() or wait() with the TaskConsumer!")
+
+
 class TaskPublisher(Publisher):
-    queue = "crunchy"
     exchange = "crunchy"
     routing_key = "crunchy"
 
@@ -16,11 +22,7 @@ class TaskPublisher(Publisher):
         return task_id
 
 
-class TaskConsumer(Consumer):
+class TaskConsumer(NoProcessConsumer):
     queue = "crunchy"
     exchange = "crunchy"
     routing_key = "crunchy"
-
-    def receive(self, message_data, message):
-        raise NotImplementedError(
-                "Don't use process_next() or wait() with the TaskConsumer!")

+ 31 - 0
crunchy/models.py

@@ -0,0 +1,31 @@
+from django.db import models
+from crunchy.registry import tasks
+from crunchy.managers import PeriodicTaskManager
+from django.utils.translation import ugettext_lazy as _
+
+
+class PeriodicTaskMeta(models.Model):
+    name = models.CharField(_(u"name"), max_length=255, unique=True)
+    last_run_at = models.DateTimeField(_(u"last time run"),
+                                       auto_now=True, blank=True)
+    total_run_count = models.PositiveIntegerField(_(u"total run count"),
+                                                  default=0)
+
+    objects = PeriodicTaskManager()
+
+    class Meta:
+        verbose_name = _(u"periodic task")
+        verbose_name_plural = _(u"periodic tasks")
+
+    def __unicode__(self):
+        return u"<PeriodicTask: %s [last-run:%s, total-run:%d]>" % (
+                self.name, self.last_run_at, self.total_run_count)
+
+    def delay(self, **kwargs):
+        self.task.delay()
+        self.total_run_count = self.total_run_count + 1
+        self.save()
+
+    @property
+    def task(self):
+        return tasks[self.name]

+ 15 - 0
crunchy/registry.py

@@ -34,6 +34,7 @@ class TaskRegistry(UserDict):
         if is_class:
             self.data[task_name] = task() # instantiate Task class
         else:
+            task.type = "regular"
             self.data[task_name] = task
 
     def unregister(self, task_name):
@@ -48,6 +49,20 @@ class TaskRegistry(UserDict):
         """Get all task types."""
         return self.data
 
+    def filter_types(self, type):
+        """Return all tasks of a specific type."""
+        return dict([(task_name, task)
+                        for task_name, task in self.data.items()
+                            if task.type == type])
+
+    def get_all_regular(self):
+        """Get all regular task types."""
+        return self.filter_types(type="regular")
+
+    def get_all_periodic(self):
+        """Get all periodic task types."""
+        return self.filter_types(type="periodic")
+
     def get_task(self, task_name):
         """Get task by name."""
         return self.data[task_name]

+ 23 - 0
crunchy/task.py

@@ -23,6 +23,7 @@ def discard_all():
 
 class Task(object):
     name = None
+    type = "regular"
 
     def __init__(self):
         if not self.name:
@@ -49,6 +50,28 @@ class Task(object):
     @classmethod
     def delay(cls, **kwargs):
         return delay_task(cls.name, **kwargs)
+
+
+class PeriodicTask(Task):
+    run_every = 86400
+    type = "periodic"
+
+    def __init__(self):
+        if not self.run_every:
+            raise NotImplementedError(
+                    "Periodic tasks must have a run_every attribute")
+        super(PeriodicTask, self).__init__()
+
+
+class TestPeriodicTask(PeriodicTask):
+    name = "crunchy-test-periodic-task"
+    run_every = 20
+    
+    def run(self, **kwargs):
+        logger = setup_logger(**kwargs)
+        logger.info("Running periodic task foo!")
+tasks.register(TestPeriodicTask)
+
 class TestTask(Task):
     name = "crunchy-test-task"
 

+ 8 - 2
crunchy/worker.py

@@ -5,6 +5,7 @@ from crunchy.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
 from crunchy.log import setup_logger
 from crunchy.registry import tasks
 from crunchy.process import ProcessQueue
+from crunchy.models import PeriodicTaskMeta
 import multiprocessing
 import simplejson
 import traceback
@@ -72,12 +73,17 @@ class TaskDaemon(object):
         message.ack()
         return result, task_name, task_id
 
+    def run_periodic_tasks(self):
+        for task in PeriodicTaskMeta.objects.get_waiting_tasks():
+            task.delay()
+
     def run(self):
         results = ProcessQueue(self.concurrency, logger=self.logger,
                 done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
         last_empty_emit = None
 
         while True:
+            self.run_periodic_tasks()
             try:
                 result, task_name, task_id = self.fetch_next_task()
             except EmptyQueue:
@@ -88,8 +94,8 @@ class TaskDaemon(object):
                 time.sleep(self.queue_wakeup_after)
                 continue
             except UnknownTask, e:
-                self.logger.info("Unknown task %s requeued and ignored." % (
-                                    task_name))
+                self.logger.info("Unknown task requeued and ignored: %s" % (
+                                    e))
                 continue
             #except Exception, e:
             #    self.logger.critical("Raised %s: %s\n%s" % (