Jelajahi Sumber

SQLAlchemy backend now seems to work with SQLite

Ask Solem 15 tahun lalu
induk
melakukan
ed7fa1cda9
3 mengubah file dengan 17 tambahan dan 6 penghapusan
  1. 8 1
      celery/backends/database.py
  2. 8 5
      celery/db/models.py
  3. 1 0
      celery/loaders/default.py

+ 8 - 1
celery/backends/database.py

@@ -5,7 +5,7 @@ from sqlalchemy import create_engine
 from sqlalchemy.orm import sessionmaker
 
 from celery import conf
-from celery.db.models import Task, TaskSet
+from celery.db.models import ModelBase, Task, TaskSet
 from celery.backends.base import BaseDictBackend
 
 server = '<sql server host>'
@@ -20,6 +20,10 @@ connection_string = 'sqlite:///celery.db'
 engine = create_engine(connection_string)
 Session = sessionmaker(bind=engine)
 
+import os
+if os.environ.get("CELERYINIT"):
+    ModelBase.metadata.create_all(engine)
+
 
 class DatabaseBackend(BaseDictBackend):
     """The database result backend."""
@@ -32,6 +36,7 @@ class DatabaseBackend(BaseDictBackend):
             if not tasks:
                 task = Task(task_id)
                 session.add(task)
+                session.flush()
             else:
                 task = tasks[0]
             task.result = result
@@ -48,6 +53,7 @@ class DatabaseBackend(BaseDictBackend):
         session = Session()
         try:
             session.add(taskset)
+            session.flush()
             session.commit()
         finally:
             session.close()
@@ -63,6 +69,7 @@ class DatabaseBackend(BaseDictBackend):
             if not task:
                 task = Task(task_id)
                 session.add(task)
+                session.flush()
                 session.commit()
             if task:
                 return task.to_dict()

+ 8 - 5
celery/db/models.py

@@ -13,9 +13,11 @@ ModelBase = declarative_base()
 class Task(ModelBase):
     """Task result/status."""
     __tablename__ = "celery_taskmeta"
+    __table_args__ = {"sqlite_autoincrement": True}
 
-    id = Column("id", Integer, Sequence("task_id_sequence"), primary_key=True)
-    task_id = Column("task_id", String(255), primary_key=True)
+    id = Column("id", Integer, Sequence("task_id_sequence"), primary_key=True,
+            autoincrement=True)
+    task_id = Column("task_id", String(255))
     status = Column("status", String(50), default=states.PENDING)
     result = Column("result", PickleType, nullable=True)
     date_done = Column("date_done", DateTime, default=datetime.now,
@@ -45,13 +47,14 @@ class Task(ModelBase):
 class TaskSet(ModelBase):
     """TaskSet result"""
     __tablename__ = "celery_tasksetmeta"
+    __table_args__ = {"sqlite_autoincrement": True}
 
     id = Column("id", Integer, Sequence("taskset_id_sequence"),
-                primary_key=True)
-    taskset_id = Column("taskset_id", String(255), unique=True)
+                autoincrement=True, primary_key=True)
+    taskset_id = Column("taskset_id", String(255))
     result = Column("result", PickleType, nullable=True)
     date_done = Column("date_done", DateTime, default=datetime.now,
-                       onupdate=datetime.now, nullable=True)
+                       nullable=True)
 
     def __init__(self, task_id):
         self.task_id = task_id

+ 1 - 0
celery/loaders/default.py

@@ -6,6 +6,7 @@ DEFAULT_CONFIG_MODULE = "celeryconfig"
 
 DEFAULT_SETTINGS = {
     "DEBUG": False,
+    "ADMINS": (),
     "DATABASE_ENGINE": "sqlite3",
     "DATABASE_NAME": "celery.sqlite",
     "INSTALLED_APPS": ("celery", ),