浏览代码

Refactor table locking.

Ask Solem 16 年之前
父节点
当前提交
49ab2a518b
共有 1 个文件被更改,包括 50 次插入22 次删除
  1. 50 22
      celery/managers.py

+ 50 - 22
celery/managers.py

@@ -12,6 +12,53 @@ import random
 SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
 
 
+class TableLock(object):
+   """Base class for database table locks. Also works as a NOOP lock."""
+  
+   def __init__(self, table, type="read"):
+        self.table = table
+        self.type = type
+        self.cursor = None
+
+   def lock_table(self):
+       """Lock the table."""
+       pass
+
+   def unlock_table(self):
+       """Release previously locked tables."""
+       pass
+
+   @classmethod
+   def acquire(cls, table, type=None):
+       """Acquire table lock."""
+       lock = cls(table, type)
+       lock.lock_table()
+       return lock
+
+   def release(self):
+       """Release the lock."""
+       self.unlock_table()
+       if self.cursor:
+           self.cursor.close()
+           self.cursor = None
+
+
+class MySQLTableLock(TableLock):
+    """Table lock support for MySQL."""
+
+    def lock_table(self):
+        """Lock MySQL table."""
+        self.cursor = connection.cursor()
+        self.cursor.execute("LOCK TABLES %s %s" % (self.table, self.type.upper()))
+
+    def unlock_table(self):
+        """Unlock MySQL table."""
+        self.cursor.execute("UNLOCK TABLES")
+
+TABLE_LOCK_FOR_ENGINE = {"mysql": MySQLTableLock}
+table_lock = TABLE_LOCK_FOR_ENGINE.get(settings.DATABASE_ENGINE, TableLock)
+
+
 class TaskManager(models.Manager):
     """Manager for :class:`celery.models.Task` models."""
 
@@ -57,26 +104,6 @@ class TaskManager(models.Manager):
 class PeriodicTaskManager(models.Manager):
     """Manager for :class:`celery.models.PeriodicTask` models."""
 
-    def lock(self):
-        """Lock the periodic task table for reading."""
-        if settings.DATABASE_ENGINE != "mysql":
-            return
-        cursor = connection.cursor()
-        table = self.model._meta.db_table
-        cursor.execute("LOCK TABLES %s READ" % table)
-        row = cursor.fetchone()
-        return row
-
-    def unlock(self):
-        """Unlock the periodic task table."""
-        if settings.DATABASE_ENGINE != "mysql":
-            return
-        cursor = connection.cursor()
-        table = self.model._meta.db_table
-        cursor.execute("UNLOCK TABLES")
-        row = cursor.fetchone()
-        return row
-
     def init_entries(self):
         """Add entries for all registered periodic tasks.
 
@@ -107,6 +134,7 @@ class PeriodicTaskManager(models.Manager):
         :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
         """
         periodic_tasks = tasks.get_all_periodic()
+        db_table = self.model._meta.db_table
 
         # Find all periodic tasks to be run.
         waiting = []
@@ -117,7 +145,7 @@ class PeriodicTaskManager(models.Manager):
                 if self.is_time(task_meta.last_run_at, run_every):
                     # Get the object again to be sure noone else
                     # has already taken care of it.
-                    self.lock()
+                    lock = table_lock.acquire(db_table, "write")
                     try:
                         secure = self.get(pk=task_meta.pk)
                         if self.is_time(secure.last_run_at, run_every):
@@ -125,5 +153,5 @@ class PeriodicTaskManager(models.Manager):
                             secure.save()
                             waiting.append(secure)
                     finally:
-                        self.unlock()
+                        lock.release()
         return waiting