Kaynağa Gözat

Remove the old periodic task system

Ask Solem 15 yıl önce
ebeveyn
işleme
abbad2657b

+ 0 - 28
celery/backends/__init__.py

@@ -27,17 +27,6 @@ def get_backend_cls(backend):
 get_default_backend_cls = partial(get_backend_cls, conf.CELERY_BACKEND)
 
 
-"""
-.. function:: get_default_periodicstatus_backend_cls()
-
-    Get the backend class specified in
-    :setting:`CELERY_PERIODIC_STATUS_BACKEND`.
-
-"""
-get_default_periodicstatus_backend_cls = partial(get_backend_cls,
-                                        conf.CELERY_PERIODIC_STATUS_BACKEND)
-
-
 """
 .. class:: DefaultBackend
 
@@ -48,15 +37,6 @@ get_default_periodicstatus_backend_cls = partial(get_backend_cls,
 DefaultBackend = get_default_backend_cls()
 
 
-"""
-.. class:: DefaultPeriodicStatusBackend
-
-    The default backend for storing periodic task metadata, specified
-    in :setting:`CELERY_PERIODIC_STATUS_BACKEND`.
-
-"""
-DefaultPeriodicStatusBackend = get_default_periodicstatus_backend_cls()
-
 """
 .. data:: default_backend
 
@@ -64,11 +44,3 @@ DefaultPeriodicStatusBackend = get_default_periodicstatus_backend_cls()
 
 """
 default_backend = DefaultBackend()
-
-"""
-.. data:: default_periodic_status_backend
-
-    An instance of :class:`DefaultPeriodicStatusBackend`.
-
-"""
-default_periodic_status_backend = DefaultPeriodicStatusBackend()

+ 2 - 20
celery/backends/database.py

@@ -1,35 +1,17 @@
 """celery.backends.database"""
-from celery.models import TaskMeta, PeriodicTaskMeta
+from celery.models import TaskMeta
 from celery.backends.base import BaseBackend
 
 
 class Backend(BaseBackend):
     """The database backends. Using Django models to store task metadata."""
 
-    capabilities = ["ResultStore", "PeriodicStatus"]
+    capabilities = ["ResultStore"]
 
     def __init__(self, *args, **kwargs):
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
 
-    def init_periodic_tasks(self):
-        """Create entries for all periodic tasks in the database."""
-        PeriodicTaskMeta.objects.init_entries()
-
-    def run_periodic_tasks(self):
-        """Run all waiting periodic tasks.
-
-        :returns: a list of ``(task, task_id)`` tuples containing
-            the task class and id for the resulting tasks applied.
-
-        """
-        waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
-        task_id_tuples = []
-        for waiting_task in waiting_tasks:
-            task_id = waiting_task.delay()
-            task_id_tuples.append((waiting_task, task_id))
-        return task_id_tuples
-
     def store_result(self, task_id, result, status, traceback=None):
         """Store return value and status of an executed task."""
         if status == "DONE":

+ 2 - 90
celery/backends/mongodb.py

@@ -1,23 +1,16 @@
 """MongoDB backend for celery."""
-
-import random
-from datetime import datetime, timedelta
-
+from datetime import datetime
 from django.core.exceptions import ImproperlyConfigured
 from celery.serialization import pickle
 from celery.backends.base import BaseBackend
 from celery.loaders import settings
 from celery.conf import TASK_RESULT_EXPIRES
-from celery.registry import tasks
 
 try:
     import pymongo
 except ImportError:
     pymongo = None
 
-# taken from celery.managers.PeriodicTaskManager
-SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
-
 
 class Bunch:
 
@@ -27,7 +20,7 @@ class Bunch:
 
 class Backend(BaseBackend):
 
-    capabilities = ("ResultStore", "PeriodicStatus")
+    capabilities = ["ResultStore"]
 
     mongodb_host = 'localhost'
     mongodb_port = 27017
@@ -35,7 +28,6 @@ class Backend(BaseBackend):
     mongodb_password = None
     mongodb_database = 'celery'
     mongodb_taskmeta_collection = 'celery_taskmeta'
-    mongodb_periodictaskmeta_collection = 'celery_periodictaskmeta'
 
     def __init__(self, *args, **kwargs):
         """Initialize MongoDB backend instance.
@@ -65,9 +57,6 @@ class Backend(BaseBackend):
                     'database', self.mongodb_database)
             self.mongodb_taskmeta_collection = conf.get(
                 'taskmeta_collection', self.mongodb_taskmeta_collection)
-            self.mongodb_collection_periodictaskmeta = conf.get(
-                'periodictaskmeta_collection',
-                self.mongodb_periodictaskmeta_collection)
 
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
@@ -104,83 +93,6 @@ class Backend(BaseBackend):
             # goes out of scope
             self._connection = None
 
-    def init_periodic_tasks(self):
-        """Create collection for periodic tasks in database."""
-        db = self._get_database()
-        collection = db[self.mongodb_periodictaskmeta_collection]
-        collection.ensure_index("name", pymongo.ASCENDING, unique=True)
-
-        periodic_tasks = tasks.get_all_periodic()
-        for task_name in periodic_tasks.keys():
-            if not collection.find_one({"name": task_name}):
-                collection.save({"name": task_name,
-                                 "last_run_at": datetime.fromtimestamp(0),
-                                 "total_run_count": 0}, safe=True)
-
-    def run_periodic_tasks(self):
-        """Run all waiting periodic tasks.
-
-        :returns: a list of ``(task, task_id)`` tuples containing
-            the task class and id for the resulting tasks applied.
-        """
-        db = self._get_database()
-        collection = db[self.mongodb_periodictaskmeta_collection]
-
-        waiting_tasks = self._get_waiting_tasks()
-        task_id_tuples = []
-        for waiting_task in waiting_tasks:
-            task = tasks[waiting_task['name']]
-            resp = task.delay()
-            collection.update({'_id': waiting_task['_id']},
-                              {"$inc": {"total_run_count": 1}})
-
-            task_meta = Bunch(name=waiting_task['name'],
-                              last_run_at=waiting_task['last_run_at'],
-                              total_run_count=waiting_task['total_run_count'])
-            task_id_tuples.append((task_meta, resp.task_id))
-
-        return task_id_tuples
-
-    def _is_time(self, last_run_at, run_every):
-        """Check if if it is time to run the periodic task.
-
-        :param last_run_at: Last time the periodic task was run.
-        :param run_every: How often to run the periodic task.
-
-        :rtype bool:
-
-        """
-        # code taken from celery.managers.PeriodicTaskManager
-        run_every_drifted = run_every + SERVER_DRIFT
-        run_at = last_run_at + run_every_drifted
-        if datetime.now() > run_at:
-            return True
-        return False
-
-    def _get_waiting_tasks(self):
-        """Get all waiting periodic tasks."""
-        db = self._get_database()
-        collection = db[self.mongodb_periodictaskmeta_collection]
-
-        periodic_tasks = tasks.get_all_periodic()
-
-        # find all periodic tasks to be run
-        waiting = []
-        for task_meta in collection.find():
-            if task_meta['name'] in periodic_tasks:
-                task = periodic_tasks[task_meta['name']]
-                run_every = task.run_every
-                if self._is_time(task_meta['last_run_at'], run_every):
-                    collection.update(
-                        {"name": task_meta['name'],
-                         "last_run_at": task_meta['last_run_at']},
-                        {"$set": {"last_run_at": datetime.now()}})
-
-                    if db.last_status()['updatedExisting']:
-                        waiting.append(task_meta)
-
-        return waiting
-
     def store_result(self, task_id, result, status, traceback=None):
         """Store return value and status of an executed task."""
         from pymongo.binary import Binary

+ 4 - 1
celery/beat.py

@@ -66,7 +66,10 @@ class Scheduler(UserDict):
                 self.tick() 
                 time.sleep(1)
         finally:
-            self.schedule.close()
+            self.stop()
+
+    def stop(self):
+        self.schedule.close()
 
     def tick(self):
         """Run a tick, that is one iteration of the scheduler.

+ 1 - 11
celery/conf.py

@@ -1,3 +1,4 @@
+from celery.registry import tasks
 """celery.conf"""
 from celery.loaders import settings
 from datetime import timedelta
@@ -21,7 +22,6 @@ DEFAULT_AMQP_CONNECTION_RETRY = True
 DEFAULT_AMQP_CONNECTION_MAX_RETRIES = 100
 DEFAULT_TASK_SERIALIZER = "pickle"
 DEFAULT_BACKEND = "database"
-DEFAULT_PERIODIC_STATUS_BACKEND = "database"
 
 
 """
@@ -245,16 +245,6 @@ CELERY_BACKEND = getattr(settings, "CELERY_BACKEND", DEFAULT_BACKEND)
 
 """
 
-.. data:: CELERY_PERIODIC_STATUS_BACKEND
-
-The backend used to store the status of periodic tasks.
-
-"""
-CELERY_PERIODIC_STATUS_BACKEND = getattr(settings,
-                                    "CELERY_PERIODIC_STATUS_BACKEND",
-                                    DEFAULT_PERIODIC_STATUS_BACKEND)
-
-
 """
 
 .. data:: CELERY_CACHE_BACKEND

+ 2 - 112
celery/managers.py

@@ -1,63 +1,9 @@
 """celery.managers"""
 from django.db import models
-from django.db import connection, transaction
-from celery.registry import tasks
+from django.db import transaction
 from celery.conf import TASK_RESULT_EXPIRES
-from datetime import datetime, timedelta
+from datetime import datetime
 from django.conf import settings
-import random
-
-# server_drift can be negative, but timedelta supports addition on
-# negative seconds.
-SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
-
-
-class TableLock(object):
-    """Base class for database table locks. Also works as a NOOP lock."""
-
-    def __init__(self, table, type="read"):
-        self.table = table
-        self.type = type
-        self.cursor = None
-
-    def lock_table(self):
-        """Lock the table."""
-        pass
-
-    def unlock_table(self):
-        """Release previously locked tables."""
-        pass
-
-    @classmethod
-    def acquire(cls, table, type=None):
-        """Acquire table lock."""
-        lock = cls(table, type)
-        lock.lock_table()
-        return lock
-
-    def release(self):
-        """Release the lock."""
-        self.unlock_table()
-        if self.cursor:
-            self.cursor.close()
-            self.cursor = None
-
-
-class MySQLTableLock(TableLock):
-    """Table lock support for MySQL."""
-
-    def lock_table(self):
-        """Lock MySQL table."""
-        self.cursor = connection.cursor()
-        self.cursor.execute("LOCK TABLES %s %s" % (
-            self.table, self.type.upper()))
-
-    def unlock_table(self):
-        """Unlock MySQL table."""
-        self.cursor.execute("UNLOCK TABLES")
-
-TABLE_LOCK_FOR_ENGINE = {"mysql": MySQLTableLock}
-table_lock = TABLE_LOCK_FOR_ENGINE.get(settings.DATABASE_ENGINE, TableLock)
 
 
 class TaskManager(models.Manager):
@@ -119,59 +65,3 @@ class TaskManager(models.Manager):
                 self.store_result(task_id, result, status, traceback, False)
             else:
                 raise
-
-
-class PeriodicTaskManager(models.Manager):
-    """Manager for :class:`celery.models.PeriodicTask` models."""
-
-    def init_entries(self):
-        """Add entries for all registered periodic tasks.
-
-        Should be run at worker start.
-        """
-        periodic_tasks = tasks.get_all_periodic()
-        for task_name in periodic_tasks.keys():
-            task_meta, created = self.get_or_create(name=task_name)
-
-    def is_time(self, last_run_at, run_every):
-        """Check if if it is time to run the periodic task.
-
-        :param last_run_at: Last time the periodic task was run.
-        :param run_every: How often to run the periodic task.
-
-        :rtype bool:
-
-        """
-        run_every_drifted = run_every + SERVER_DRIFT
-        run_at = last_run_at + run_every_drifted
-        if datetime.now() > run_at:
-            return True
-        return False
-
-    def get_waiting_tasks(self):
-        """Get all waiting periodic tasks.
-
-        :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
-        """
-        periodic_tasks = tasks.get_all_periodic()
-        db_table = self.model._meta.db_table
-
-        # Find all periodic tasks to be run.
-        waiting = []
-        for task_meta in self.all():
-            if task_meta.name in periodic_tasks:
-                task = periodic_tasks[task_meta.name]
-                run_every = task.run_every
-                if self.is_time(task_meta.last_run_at, run_every):
-                    # Get the object again to be sure noone else
-                    # has already taken care of it.
-                    lock = table_lock.acquire(db_table, "write")
-                    try:
-                        secure = self.get(pk=task_meta.pk)
-                        if self.is_time(secure.last_run_at, run_every):
-                            secure.last_run_at = datetime.now()
-                            secure.save()
-                            waiting.append(secure)
-                    finally:
-                        lock.release()
-        return waiting

+ 3 - 37
celery/models.py

@@ -6,7 +6,7 @@ Django Models.
 import django
 from django.db import models
 from celery.registry import tasks
-from celery.managers import TaskManager, PeriodicTaskManager
+from celery.managers import TaskManager
 from celery.fields import PickledObjectField
 from celery import conf
 from django.utils.translation import ugettext_lazy as _
@@ -40,42 +40,8 @@ class TaskMeta(models.Model):
     def __unicode__(self):
         return u"<Task: %s done:%s>" % (self.task_id, self.status)
 
-
-class PeriodicTaskMeta(models.Model):
-    """Information about a Periodic Task."""
-    name = models.CharField(_(u"name"), max_length=255, unique=True)
-    last_run_at = models.DateTimeField(_(u"last time run"),
-                                       blank=True,
-                                       default=datetime.fromtimestamp(0))
-    total_run_count = models.PositiveIntegerField(_(u"total run count"),
-                                                  default=0)
-
-    objects = PeriodicTaskManager()
-
-    class Meta:
-        """Model meta-data."""
-        verbose_name = _(u"periodic task")
-        verbose_name_plural = _(u"periodic tasks")
-
-    def __unicode__(self):
-        return u"<PeriodicTask: %s [last-run:%s, total-run:%d]>" % (
-                self.name, self.last_run_at, self.total_run_count)
-
-    def delay(self, *args, **kwargs):
-        """Apply the periodic task immediately."""
-        self.task.delay()
-        self.total_run_count = self.total_run_count + 1
-        self.save()
-
-    @property
-    def task(self):
-        """The entry registered in the task registry for this task."""
-        return tasks[self.name]
-
-
 if (django.VERSION[0], django.VERSION[1]) >= (1, 1):
-    # keep models away from syncdb/reset if database backend is not being used.
+    # keep models away from syncdb/reset if database backend is not
+    # being used.
     if conf.CELERY_BACKEND != 'database':
         TaskMeta._meta.managed = False
-    if conf.CELERY_PERIODIC_STATUS_BACKEND != 'database':
-        PeriodicTaskMeta._meta.managed = False

+ 2 - 25
celery/worker/controllers.py

@@ -3,11 +3,9 @@
 Worker Controller Threads
 
 """
-from celery.backends import default_periodic_status_backend
 from Queue import Empty as QueueEmpty
 from datetime import datetime
 from celery.log import get_default_logger
-import traceback
 import threading
 import time
 
@@ -99,9 +97,7 @@ class Mediator(BackgroundThread):
 
 
 class PeriodicWorkController(BackgroundThread):
-    """A thread that continuously checks if there are
-    :class:`celery.task.PeriodicTask` tasks waiting for execution,
-    and executes them. It also finds tasks in the hold queue that is
+    """Finds tasks in the hold queue that is
     ready for execution and moves them to the bucket queue.
 
     (Tasks in the hold queue are tasks waiting for retry, or with an
@@ -114,33 +110,14 @@ class PeriodicWorkController(BackgroundThread):
         self.hold_queue = hold_queue
         self.bucket_queue = bucket_queue
 
-    def on_start(self):
-        """Do backend-specific periodic task initialization."""
-        default_periodic_status_backend.init_periodic_tasks()
-
     def on_iteration(self):
-        """Run periodic tasks and process the hold queue."""
+        """Process the hold queue."""
         logger = get_default_logger()
-        logger.debug("PeriodicWorkController: Running periodic tasks...")
-        try:
-            self.run_periodic_tasks()
-        except Exception, exc:
-            logger.error(
-                "PeriodicWorkController got exception: %s\n%s" % (
-                    exc, traceback.format_exc()))
         logger.debug("PeriodicWorkController: Processing hold queue...")
         self.process_hold_queue()
         logger.debug("PeriodicWorkController: Going to sleep...")
         time.sleep(1)
 
-    def run_periodic_tasks(self):
-        logger = get_default_logger()
-        applied = default_periodic_status_backend.run_periodic_tasks()
-        for task, task_id in applied:
-            logger.debug(
-                "PeriodicWorkController: Periodic task %s applied (%s)" % (
-                    task.name, task_id))
-
     def process_hold_queue(self):
         """Finds paused tasks that are ready for execution and move
         them to the :attr:`bucket_queue`."""

+ 1 - 18
docs/configuration.rst

@@ -1,4 +1,5 @@
 ============================
+
  Configuration and defaults
 ============================
 
@@ -73,23 +74,9 @@ Task result backend settings
         try to receive the result once).
 
 
-* CELERY_PERIODIC_STATUS_BACKEND
-    The backend used to store the status of periodic tasks.
-    Can be one of the following:
-
-    * database (default)
-        Use a relational database supported by the Django ORM.
-
-    * mongodb
-        Use MongoDB.
-
-
 Database backend settings
 =========================
 
-This applies to both the result store backend and the periodic status
-backend.
-
 Please see the Django ORM database settings documentation:
 http://docs.djangoproject.com/en/dev/ref/settings/#database-engine
 
@@ -194,10 +181,6 @@ MongoDB backend settings
         The collection name to store task metadata.
         Defaults to "celery_taskmeta".
 
-    * periodictaskmeta_collection
-        The collection name to store periodic task metadata.
-        Defaults to "celery_periodictaskmeta".
-
 
 Example configuration
 ---------------------