Sfoglia il codice sorgente

Added support for persistent revokes.

Use the -S|--statedb argument to celeryd to enable it:

    celeryd --statedb=/var/run/celeryd

will use the file: /var/run/celeryd.db

still hate that shelve automatically adds the .db suffix :(
Ask Solem 15 anni fa
parent
commit
a064754287

+ 8 - 1
celery/bin/celeryd.py

@@ -135,6 +135,11 @@ OPTION_LIST = (
                     option. The extension '.db' will be appended to the \
                     filename. Default: %s" % (
                     conf.CELERYBEAT_SCHEDULE_FILENAME)),
+    optparse.make_option('-S', '--statedb', default=conf.CELERYD_STATE_DB,
+            action="store", dest="db",
+            help="Path to the state database. The extension '.db' will \
+                 be appended to the filename. Default: %s" % (
+                     conf.CELERYD_STATE_DB)),
     optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
             action="store_true", dest="events",
             help="Send events so celery can be monitored by e.g. celerymon."),
@@ -169,7 +174,7 @@ class Worker(object):
             task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
             task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
             max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
-            queues=None, events=False, **kwargs):
+            queues=None, events=False, db=None, **kwargs):
         self.concurrency = concurrency or multiprocessing.cpu_count()
         self.loglevel = loglevel
         self.logfile = logfile
@@ -181,6 +186,7 @@ class Worker(object):
         self.task_time_limit = task_time_limit
         self.task_soft_time_limit = task_soft_time_limit
         self.max_tasks_per_child = max_tasks_per_child
+        self.db = db
         self.queues = queues or []
 
         if isinstance(self.queues, basestring):
@@ -292,6 +298,7 @@ class Worker(object):
                                 embed_clockservice=self.run_clockservice,
                                 schedule_filename=self.schedule,
                                 send_events=self.events,
+                                db=self.db,
                                 max_tasks_per_child=self.max_tasks_per_child,
                                 task_time_limit=self.task_time_limit,
                                 task_soft_time_limit=self.task_soft_time_limit)

+ 2 - 0
celery/conf.py

@@ -57,6 +57,7 @@ _DEFAULTS = {
     "CELERYD_LOG_COLOR": False,
     "CELERYD_LOG_LEVEL": "WARN",
     "CELERYD_LOG_FILE": None, # stderr
+    "CELERYD_STATE_DB": None,
     "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
     "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
     "CELERYBEAT_LOG_LEVEL": "INFO",
@@ -157,6 +158,7 @@ CELERYD_LOG_COLOR = _get("CELERYD_LOG_COLOR",
 CELERYD_LOG_LEVEL = _get("CELERYD_LOG_LEVEL",
                             compat=["CELERYD_DAEMON_LOG_LEVEL"])
 CELERYD_LOG_LEVEL = LOG_LEVELS[CELERYD_LOG_LEVEL.upper()]
+CELERYD_STATE_DB = _get("CELERYD_STATE_DB")
 CELERYD_CONCURRENCY = _get("CELERYD_CONCURRENCY")
 CELERYD_PREFETCH_MULTIPLIER = _get("CELERYD_PREFETCH_MULTIPLIER")
 CELERYD_POOL_PUTLOCKS = _get("CELERYD_POOL_PUTLOCKS")

+ 9 - 0
celery/datastructures.py

@@ -222,6 +222,15 @@ class LimitedSet(object):
     def __contains__(self, value):
         return value in self._data
 
+    def update(self, other):
+        if isinstance(other, self.__class__):
+            self._data.update(other._data)
+        else:
+            self._data.update(other)
+
+    def as_dict(self):
+        return self._data
+
     def __iter__(self):
         return iter(self._data.keys())
 

+ 8 - 1
celery/worker/__init__.py

@@ -16,6 +16,7 @@ from celery.log import setup_logger, _hijack_multiprocessing_logger
 from celery.beat import EmbeddedClockService
 from celery.utils import noop, instantiate
 
+from celery.worker import state
 from celery.worker.buckets import TaskBucket, FastQueue
 from celery.worker.scheduler import Scheduler
 
@@ -123,7 +124,8 @@ class WorkController(object):
             task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
             task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
             max_tasks_per_child=conf.CELERYD_MAX_TASKS_PER_CHILD,
-            pool_putlocks=conf.CELERYD_POOL_PUTLOCKS):
+            pool_putlocks=conf.CELERYD_POOL_PUTLOCKS,
+            db=conf.CELERYD_STATE_DB):
 
         # Options
         self.loglevel = loglevel or self.loglevel
@@ -138,8 +140,13 @@ class WorkController(object):
         self.task_soft_time_limit = task_soft_time_limit
         self.max_tasks_per_child = max_tasks_per_child
         self.pool_putlocks = pool_putlocks
+        self.db = db
         self._finalize = Finalize(self, self.stop, exitpriority=1)
 
+        if self.db:
+            persistence = state.Persistent(self.db)
+            Finalize(persistence, persistence.save, exitpriority=5)
+
         # Queues
         if conf.DISABLE_RATE_LIMITS:
             self.ready_queue = FastQueue()

+ 5 - 0
celery/worker/control/builtins.py

@@ -130,6 +130,11 @@ def stats(panel, **kwargs):
             "pool": panel.listener.pool.info}
 
 
+@Panel.register
+def dump_revoked(panel, **kwargs):
+    return list(state.revoked)
+
+
 @Panel.register
 def dump_tasks(panel, **kwargs):
 

+ 42 - 0
celery/worker/state.py

@@ -1,3 +1,5 @@
+import shelve
+
 from celery.utils.compat import defaultdict
 from celery.datastructures import LimitedSet
 
@@ -18,3 +20,43 @@ def task_ready(task_name):
     active[task_name] -= 1
     if active[task_name] == 0:
         active.pop(task_name, None)
+
+
+class Persistent(object):
+    _open = None
+
+    def __init__(self, filename):
+        self.filename = filename
+        self._load()
+
+    def _load(self):
+        self.merge(self.db)
+        self.close()
+
+    def save(self):
+        self.sync(self.db).sync()
+        self.close()
+
+    def merge(self, d):
+        revoked.update(d.get("revoked") or {})
+        return d
+
+    def sync(self, d):
+        prev = d.get("revoked") or {}
+        prev.update(revoked.as_dict())
+        d["revoked"] = prev
+        return d
+
+    def open(self):
+        return shelve.open(self.filename)
+
+    def close(self):
+        if self._open:
+            self._open.close()
+            self._open = None
+
+    @property
+    def db(self):
+        if self._open is None:
+            self._open = self.open()
+        return self._open