Przeglądaj źródła

added ScheduledTask

Patrick 15 lat temu
rodzic
commit
e8835f1052

+ 1 - 0
AUTHORS

@@ -26,3 +26,4 @@ Ordered by date of first contribution:
   Reza Lotun <rlotun@gmail.com>
   Mikhail Korobov <kmike84@gmail.com>
   Jeff Balogh <me@jeffbalogh.org>
+  Patrick Altman <paltman@gmail.com>

+ 3 - 1
celery/task/__init__.py

@@ -8,12 +8,14 @@ from billiard.serialization import pickle
 from celery.execute import apply_async
 from celery.registry import tasks
 from celery.task.base import Task, TaskSet, PeriodicTask, ExecuteRemoteTask
+from celery.task.base import ScheduledTask
 from celery.task.control import discard_all
 from celery.task.builtins import PingTask
 from celery.task.http import HttpDispatchTask
 
 __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
-           "dmap", "dmap_async", "execute_remote", "ping", "HttpDispatchTask"]
+           "dmap", "dmap_async", "execute_remote", "ping", "HttpDispatchTask",
+           "ScheduledTask"]
 
 
 def dmap(fun, args, timeout=None):

+ 92 - 0
celery/task/base.py

@@ -16,6 +16,10 @@ from celery.messaging import establish_connection as _establish_connection
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 
 
+def get_current_time():
+    return datetime.now()
+
+
 class TaskType(type):
     """Metaclass for tasks.
 
@@ -776,3 +780,91 @@ class PeriodicTask(Task):
             if predicate(delta) >= 1.0:
                 return datetime(*args[:res])
         return dt
+
+
+class ScheduledTask(PeriodicTask):
+    """A scheduled task is a task that adds more precise scheduling to the
+    features of a :class:`celery.task.base.PeriodicTask`.
+
+    Like a :manpage:`cron` job, you can specify units of time of when you would
+    like the task to execute.  While not a full implementation of cron, it
+    should provide a fair degree of common scheduling needs.  You can specify
+    a minute, an hour, and/or a day of the week.
+
+    .. attribute:: minute
+
+        An integer from 0-59 that represents the minute of an hour of when
+        execution should occur.
+
+    .. attribute:: hour
+
+        An integer from 0-23 that represents the hour of a day of when
+        execution should occur.
+
+    .. attribute:: day_of_week
+
+        An integer from 0-6, where Sunday = 0 and Saturday = 6, that represents
+        the day of week that execution should occur.
+
+    Example
+
+        >>> from celery.task import ScheduledTask
+        >>> class EveryMondayMorningTask(ScheduledTask):
+        ...     hour = 7
+        ...     minute = 30
+        ...     day_of_week = 1
+        ...     def run(self, **kwargs):
+        ...         logger = self.get_logger(**kwargs)
+        ...         logger.info("Execute every Monday at 7:30AM.")
+
+        >>> from celery.task import ScheduledTask
+        >>> class EveryMorningTask(ScheduledTask):
+        ...     hour = 7
+        ...     minute = 30
+        ...     def run(self, **kwargs):
+        ...         logger = self.get_logger(**kwargs)
+        ...         logger.info("Execute every day at 7:30AM.")
+
+        >>> from celery.task import ScheduledTask
+        >>> class EveryQuarterPastTheHourTask(ScheduledTask):
+        ...     minute = 15
+        ...     def run(self, **kwargs):
+        ...         logger = self.get_logger(**kwargs)
+        ...         logger.info("Execute every 0:15 past the hour every day.")
+
+    """
+    run_every = timedelta(seconds=1)
+    hour = None           # (0 - 23)
+    minute = None         # (0 - 59)
+    day_of_week = None    # (0 - 6) (Sunday=0)
+    abstract = True
+
+    def is_due(self, last_run_at):
+        n = get_current_time()
+        last = (n - last_run_at)
+        if last.days > 0 or last.seconds > 60:
+            if self.day_of_week is None:
+                if self.hour is None and self.minute is None:
+                    return (True, 1)
+
+                if self.hour is None and self.minute == n.minute:
+                    return (True, 1)
+
+                if self.hour == n.hour and self.minute is None:
+                    return (True, 1)
+
+                if self.hour == n.hour and self.minute == n.minute:
+                    return (True, 1)
+            elif self.day_of_week == n.isoweekday():
+                if self.hour is None and self.minute is None:
+                    return (True, 1)
+
+                if self.hour is None and self.minute == n.minute:
+                    return (True, 1)
+
+                if self.hour == n.hour and self.minute is None:
+                    return (True, 1)
+
+                if self.hour == n.hour and self.minute == n.minute:
+                    return (True, 1)
+        return (False, 1)

+ 78 - 0
celery/tests/test_task.py

@@ -1,6 +1,7 @@
 import unittest2 as unittest
 from StringIO import StringIO
 from datetime import datetime, timedelta
+from mock import patch
 
 from celery import task
 from celery import messaging
@@ -465,3 +466,80 @@ class TestPeriodicTask(unittest.TestCase):
         due, remaining = p.is_due(datetime.now() - p.run_every)
         self.assertTrue(due)
         self.assertEqual(remaining, p.timedelta_seconds(p.run_every))
+
+
+class EveryMinutePeriodic(task.ScheduledTask):
+    pass
+
+
+class HourlyPeriodic(task.ScheduledTask):
+    minute = 30
+
+
+class DailyPeriodic(task.ScheduledTask):
+    hour = 7
+    minute = 30
+
+
+class WeeklyPeriodic(task.ScheduledTask):
+    hour = 7
+    minute = 30
+    day_of_week = 4
+
+
+class TestScheduledTask(unittest.TestCase):
+    
+    def test_every_minute_execution_is_due(self):
+        last_ran = datetime.now() - timedelta(seconds=61)
+        due, remaining = EveryMinutePeriodic().is_due(last_ran)
+        self.assertTrue(due)
+        self.assertEquals(remaining, 1)
+        
+    def test_every_minute_execution_is_not_due(self):
+        last_ran = datetime.now() - timedelta(seconds=30)
+        due, remaining = EveryMinutePeriodic().is_due(last_ran)
+        self.assertFalse(due)
+        self.assertEquals(remaining, 1)
+    
+    @patch('celery.task.base.get_current_time')
+    def test_every_hour_execution_is_due(self, NowMock):
+        NowMock.return_value = datetime(2010, 5, 10, 10, 30)
+        due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
+        self.assertTrue(due)
+        self.assertEquals(remaining, 1)
+    
+    @patch('celery.task.base.get_current_time')
+    def test_every_hour_execution_is_not_due(self, NowMock):
+        NowMock.return_value = datetime(2010, 5, 10, 10, 29)
+        due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 9, 30))
+        self.assertFalse(due)
+        self.assertEquals(remaining, 1)
+
+    @patch('celery.task.base.get_current_time')
+    def test_daily_execution_is_due(self, NowMock):
+        NowMock.return_value = datetime(2010, 5, 10, 7, 30)
+        due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 9, 7, 30))
+        self.assertTrue(due)
+        self.assertEquals(remaining, 1)
+
+    @patch('celery.task.base.get_current_time')
+    def test_daily_execution_is_not_due(self, NowMock):
+        NowMock.return_value = datetime(2010, 5, 10, 10, 30)
+        due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 10, 6, 29))
+        self.assertFalse(due)
+        self.assertEquals(remaining, 1)
+
+    @patch('celery.task.base.get_current_time')
+    def test_weekly_execution_is_due(self, NowMock):
+        NowMock.return_value = datetime(2010, 5, 6, 7, 30)
+        due, remaining = WeeklyPeriodic().is_due(datetime(2010, 4, 30, 7, 30))
+        self.assertTrue(due)
+        self.assertEquals(remaining, 1)
+
+    @patch('celery.task.base.get_current_time')
+    def test_weekly_execution_is_not_due(self, NowMock):
+        NowMock.return_value = datetime(2010, 5, 7, 10, 30)
+        due, remaining = WeeklyPeriodic().is_due(datetime(2010, 4, 30, 6, 29))
+        self.assertFalse(due)
+        self.assertEquals(remaining, 1)
+

+ 7 - 0
contrib/requirements/test.txt

@@ -8,3 +8,10 @@ pytyrant
 redis
 pymongo
 git+git://github.com/exogen/nose-achievements.git
+mock>=0.6.0
+-e git+git://github.com/ask/billiard.git#egg=billiard
+-e git+git://github.com/ask/carrot.git#egg=carrot
+django
+importlib
+django-picklefield
+python-dateutil

+ 15 - 0
docs/getting-started/periodic-tasks.rst

@@ -19,6 +19,21 @@ Here's an example of a periodic task:
             logger.info("Running periodic task!")
     >>> tasks.register(MyPeriodicTask)
 
+If you want a little more control over when the task is executed, for example,
+a particular time of day or day of the week, you can subclass a ``ScheduledTask``:
+
+.. code-block:: python
+
+    from celery.task import ScheduledTask
+
+    class EveryMondayMorningTask(ScheduledTask):
+        hour = 7
+        minute = 30
+        day_of_week = 1
+
+        def run(self, **kwargs):
+            logger = self.get_logger(**kwargs)
+            logger.info("This will execute every Monday at 7:30AM.")
 
 If you want to use periodic tasks you need to start the ``celerybeat``
 service. You have to make sure only one instance of this server is running at

+ 0 - 0
tests/manage.py