Browse Source

Preparing CELERYBEAT_SCHEDULE is now the responsibility of Scheduler not post_config_merge

Ask Solem 14 years ago
parent
commit
81388574c3
4 changed files with 27 additions and 34 deletions
  1. 8 2
      celery/app/amqp.py
  2. 2 27
      celery/app/base.py
  3. 2 1
      celery/app/defaults.py
  4. 15 4
      celery/beat.py

+ 8 - 2
celery/app/amqp.py

@@ -265,9 +265,15 @@ class AMQP(object):
     def Queues(self, queues):
         """Create new :class:`Queues` instance, using queue defaults
         from the current configuration."""
+        conf = self.app.conf
+        if not queues:
+            queues = {conf.CELERY_DEFAULT_QUEUE: {
+                        "exchange": conf.CELERY_DEFAULT_EXCHANGE,
+                        "exchange_type": conf.CELERY_DEFAULT_EXCHANGE_TYPE,
+                        "binding_key": conf.CELERY_DEFAULT_ROUTING_KEY}}
         return Queues.with_defaults(queues,
-                                    self.app.conf.CELERY_DEFAULT_EXCHANGE,
-                                    self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
+                                    conf.CELERY_DEFAULT_EXCHANGE,
+                                    conf.CELERY_DEFAULT_EXCHANGE_TYPE)
 
     def Router(self, queues=None, create_missing=None):
         """Returns the current task router."""

+ 2 - 27
celery/app/base.py

@@ -205,29 +205,6 @@ class BaseApp(object):
                 c["BROKER_BACKEND"] = cbackend
         return c
 
-    def post_config_merge(self, c):
-        # XXX This should be done by whoever requires these settings.
-        """Prepare configuration after it has been merged with the
-        defaults."""
-        if not c.get("CELERY_QUEUES"):
-            c["CELERY_QUEUES"] = {
-                c["CELERY_DEFAULT_QUEUE"]: {
-                    "exchange": c["CELERY_DEFAULT_EXCHANGE"],
-                    "exchange_type": c["CELERY_DEFAULT_EXCHANGE_TYPE"],
-                    "binding_key": c["CELERY_DEFAULT_ROUTING_KEY"]}}
-
-        # Install backend cleanup periodic task.
-        c["CELERYBEAT_SCHEDULE"] = maybe_promise(c["CELERYBEAT_SCHEDULE"])
-        if c["CELERY_TASK_RESULT_EXPIRES"]:
-            from celery.schedules import crontab
-            c["CELERYBEAT_SCHEDULE"].setdefault("celery.backend_cleanup",
-                    dict(task="celery.backend_cleanup",
-                         schedule=crontab(minute="00", hour="04",
-                                          day_of_week="*"),
-                         options={"expires": 12 * 3600}))
-
-        return c
-
     def mail_admins(self, subject, body, fail_silently=False):
         """Send an e-mail to the admins in conf.ADMINS."""
         if not self.conf.ADMINS:
@@ -266,10 +243,8 @@ class BaseApp(object):
         return backend_cls(app=self)
 
     def _get_config(self):
-        return self.post_config_merge(
-                        ConfigurationView({}, [
-                            self.pre_config_merge(self.loader.conf),
-                            DEFAULTS]))
+        return ConfigurationView({},
+                [self.pre_config_merge(self.loader.conf), DEFAULTS])
 
     @cached_property
     def amqp(self):

+ 2 - 1
celery/app/defaults.py

@@ -101,6 +101,7 @@ NAMESPACES = {
         "TRACK_STARTED": Option(False, type="bool"),
         "REDIRECT_STDOUTS": Option(True, type="bool"),
         "REDIRECT_STDOUTS_LEVEL": Option("WARNING"),
+        "QUEUES": Option(None, type="dict"),
     },
     "CELERYD": {
         "AUTOSCALER": Option("celery.worker.autoscale.Autoscaler"),
@@ -124,7 +125,7 @@ NAMESPACES = {
         "TASK_TIME_LIMIT": Option(type="int"),
     },
     "CELERYBEAT": {
-        "SCHEDULE": Option(promise(lambda: {}), type="dict"),
+        "SCHEDULE": Option({}, type="dict"),
         "SCHEDULER": Option("celery.beat.PersistentScheduler"),
         "SCHEDULE_FILENAME": Option("celerybeat-schedule"),
         "MAX_LOOP_INTERVAL": Option(5 * 60, type="int"),

+ 15 - 4
celery/beat.py

@@ -17,8 +17,8 @@ from celery import registry
 from celery import signals
 from celery.app import app_or_default
 from celery.log import SilenceRepeated
-from celery.schedules import maybe_schedule
-from celery.utils import cached_property, instantiate
+from celery.schedules import maybe_schedule, crontab
+from celery.utils import cached_property, instantiate, maybe_promise
 from celery.utils.compat import UserDict
 from celery.utils.timeutils import humanize_seconds
 
@@ -141,7 +141,7 @@ class Scheduler(UserDict):
             schedule = {}
         self.app = app_or_default(app)
         conf = self.app.conf
-        self.data = schedule
+        self.data = self.install_default_entries(schedule)
         self.logger = logger or self.app.log.get_default_logger(
                                                 name="celery.beat")
         self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
@@ -149,6 +149,16 @@ class Scheduler(UserDict):
         if not lazy:
             self.setup_schedule()
 
+    def install_default_entries(self, schedule):
+        schedule = maybe_promise(schedule)
+        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
+            schedule.setdefault("celery.backend_cleanup",
+                    {"task": "celery.backend_cleanup",
+                     "schedule": crontab("0", "4", "*"),
+                     "options": {"expires": 12 * 3600}})
+        return schedule
+
+
     def maybe_due(self, entry, publisher=None):
         is_due, next_time_to_run = entry.is_due()
 
@@ -278,7 +288,8 @@ class PersistentScheduler(Scheduler):
     def setup_schedule(self):
         self._store = self.persistence.open(self.schedule_filename)
         self.data = self._store
-        self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
+        self.merge_inplace(self.install_default_entries(
+                            self.app.conf.CELERYBEAT_SCHEDULE))
         self.sync()
         self.data = self._store