Преглед изворни кода

Beat: schedule/ScheduleEntry now supports app argument

Ask Solem пре 11 година
родитељ
комит
25b836afbc
2 измењених фајлова са 34 додато и 26 уклоњено
  1. 9 7
      celery/beat.py
  2. 25 19
      celery/schedules.py

+ 9 - 7
celery/beat.py

@@ -24,7 +24,6 @@ from kombu.utils.functional import maybe_evaluate
 from . import __version__
 from . import platforms
 from . import signals
-from . import current_app
 from .five import items, reraise, values
 from .schedules import maybe_schedule, crontab
 from .utils.imports import instantiate
@@ -82,18 +81,21 @@ class ScheduleEntry(object):
 
     def __init__(self, name=None, task=None, last_run_at=None,
                  total_run_count=None, schedule=None, args=(), kwargs={},
-                 options={}, relative=False):
+                 options={}, relative=False, app=None):
+        self.app = app
         self.name = name
         self.task = task
         self.args = args
         self.kwargs = kwargs
         self.options = options
-        self.schedule = maybe_schedule(schedule, relative)
+        self.schedule = maybe_schedule(schedule, relative, app=self.app)
+        if self.schedule:
+            self.schedule.app = self.app
         self.last_run_at = last_run_at or self._default_now()
         self.total_run_count = total_run_count or 0
 
     def _default_now(self):
-        return self.schedule.now() if self.schedule else current_app.now()
+        return self.schedule.now() if self.schedule else self.app.now()
 
     def _next_instance(self, last_run_at=None):
         """Returns a new instance of the same class, but with
@@ -258,14 +260,14 @@ class Scheduler(object):
         self.sync()
 
     def add(self, **kwargs):
-        entry = self.Entry(**kwargs)
+        entry = self.Entry(app=self.app, **kwargs)
         self.schedule[entry.name] = entry
         return entry
 
     def _maybe_entry(self, name, entry):
         if isinstance(entry, self.Entry):
             return entry
-        return self.Entry(**dict(entry, name=name))
+        return self.Entry(**dict(entry, name=name, app=self.app))
 
     def update_from_dict(self, dict_):
         self.schedule.update(dict(
@@ -282,7 +284,7 @@ class Scheduler(object):
 
         # Update and add new items in the schedule
         for key in B:
-            entry = self.Entry(**dict(b[key], name=key))
+            entry = self.Entry(**dict(b[key], name=key, app=self.app))
             if schedule.get(key):
                 schedule[key].update(entry)
             else:

+ 25 - 19
celery/schedules.py

@@ -55,10 +55,11 @@ class ParseException(Exception):
 class schedule(object):
     relative = False
 
-    def __init__(self, run_every=None, relative=False, nowfun=None):
+    def __init__(self, run_every=None, relative=False, nowfun=None, app=None):
         self.run_every = maybe_timedelta(run_every)
         self.relative = relative
         self.nowfun = nowfun
+        self._app = app
 
     def now(self):
         return (self.nowfun or self.app.now)()
@@ -126,9 +127,13 @@ class schedule(object):
     def human_seconds(self):
         return humanize_seconds(self.seconds)
 
-    @cached_property
+    @property
     def app(self):
-        return current_app._get_current_object()
+        return self._app or current_app._get_current_object()
+
+    @app.setter  # noqa
+    def app(self, app):
+        self._app = app
 
     @cached_property
     def tz(self):
@@ -327,6 +332,21 @@ class crontab(schedule):
 
     """
 
+    def __init__(self, minute='*', hour='*', day_of_week='*',
+                 day_of_month='*', month_of_year='*', nowfun=None, app=None):
+        self._orig_minute = cronfield(minute)
+        self._orig_hour = cronfield(hour)
+        self._orig_day_of_week = cronfield(day_of_week)
+        self._orig_day_of_month = cronfield(day_of_month)
+        self._orig_month_of_year = cronfield(month_of_year)
+        self.hour = self._expand_cronspec(hour, 24)
+        self.minute = self._expand_cronspec(minute, 60)
+        self.day_of_week = self._expand_cronspec(day_of_week, 7)
+        self.day_of_month = self._expand_cronspec(day_of_month, 31, 1)
+        self.month_of_year = self._expand_cronspec(month_of_year, 12, 1)
+        self.nowfun = nowfun
+        self._app = app
+
     @staticmethod
     def _expand_cronspec(cronspec, max_, min_=0):
         """Takes the given cronspec argument in one of the forms::
@@ -436,20 +456,6 @@ class crontab(schedule):
                     second=0,
                     microsecond=0)
 
-    def __init__(self, minute='*', hour='*', day_of_week='*',
-                 day_of_month='*', month_of_year='*', nowfun=None):
-        self._orig_minute = cronfield(minute)
-        self._orig_hour = cronfield(hour)
-        self._orig_day_of_week = cronfield(day_of_week)
-        self._orig_day_of_month = cronfield(day_of_month)
-        self._orig_month_of_year = cronfield(month_of_year)
-        self.hour = self._expand_cronspec(hour, 24)
-        self.minute = self._expand_cronspec(minute, 60)
-        self.day_of_week = self._expand_cronspec(day_of_week, 7)
-        self.day_of_month = self._expand_cronspec(day_of_month, 31, 1)
-        self.month_of_year = self._expand_cronspec(month_of_year, 12, 1)
-        self.nowfun = nowfun
-
     def now(self):
         return (self.nowfun or self.app.now)()
 
@@ -546,9 +552,9 @@ class crontab(schedule):
         return not self.__eq__(other)
 
 
-def maybe_schedule(s, relative=False):
+def maybe_schedule(s, relative=False, app=None):
     if isinstance(s, int):
         s = timedelta(seconds=s)
     if isinstance(s, timedelta):
-        return schedule(s, relative)
+        return schedule(s, relative, app=app)
     return s