Ask Solem il y a 14 ans
Parent
commit
ab7f041fc7
5 fichiers modifiés avec 36 ajouts et 23 suppressions
  1. 13 9
      celery/beat.py
  2. 1 1
      celery/events/snapshot.py
  3. 9 8
      celery/events/state.py
  4. 11 3
      celery/schedules.py
  5. 2 2
      celery/task/base.py

+ 13 - 9
celery/beat.py

@@ -60,9 +60,11 @@ class ScheduleEntry(object):
 
     """
 
-    def __init__(self, name, last_run_at=None, total_run_count=None,
-            schedule=None, args=(), kwargs={}, options={}, relative=False):
+    def __init__(self, name=None, task=None, last_run_at=None,
+            total_run_count=None, schedule=None, args=(), kwargs={},
+            options={}, relative=False):
         self.name = name
+        self.task = task
         self.schedule = maybe_schedule(schedule, relative)
         self.args = args
         self.kwargs = kwargs
@@ -86,6 +88,7 @@ class ScheduleEntry(object):
         kwargs, options).
 
         """
+        self.task = other.task
         self.schedule = other.schedule
         self.args = other.args
         self.kwargs = other.kwargs
@@ -99,10 +102,11 @@ class ScheduleEntry(object):
         return vars(self).iteritems()
 
     def __repr__(self):
-        return "<Entry: %s(*%s, **%s) {%s}>" % (self.name,
-                                                self.args,
-                                                self.kwargs,
-                                                self.schedule)
+        return "<Entry: %s %s(*%s, **%s) {%s}>" % (self.name,
+                                                   self.task,
+                                                   self.args,
+                                                   self.kwargs,
+                                                   self.schedule)
 
 
 class Scheduler(UserDict):
@@ -141,13 +145,13 @@ class Scheduler(UserDict):
         is_due, next_time_to_run = entry.is_due()
 
         if is_due:
-            self.logger.debug("Scheduler: Sending due task %s" % entry.name)
+            self.logger.debug("Scheduler: Sending due task %s" % entry.task)
             try:
                 result = self.apply_async(entry, connection=connection)
             except SchedulingError, exc:
                 self.logger.error("Scheduler: %s" % exc)
             else:
-                self.logger.debug("%s sent. id->%s" % (entry.name,
+                self.logger.debug("%s sent. id->%s" % (entry.task,
                                                        result.task_id))
         return next_time_to_run
 
@@ -183,7 +187,7 @@ class Scheduler(UserDict):
         entry = self.reserve(entry)
 
         try:
-            result = self.send_task(entry.name, entry.args, entry.kwargs,
+            result = self.send_task(entry.task, entry.args, entry.kwargs,
                                     connection=connection, **entry.options)
         except Exception, exc:
             raise SchedulingError("Couldn't apply scheduled task %s: %s" % (

+ 1 - 1
celery/events/snapshot.py

@@ -25,7 +25,7 @@ class Polaroid(object):
         self.state = state
         self.freq = freq
         self.cleanup_freq = cleanup_freq
-        self.logger = logger
+        self.logger = logger or log.get_default_logger(name="celery.cam")
         self.maxrate = maxrate and TokenBucket(rate(maxrate))
 
     def install(self):

+ 9 - 8
celery/events/state.py

@@ -136,6 +136,15 @@ class State(object):
     buffer = deque()
     frozen = False
 
+    def __init__(self, callback=None,
+            max_workers_in_memory=5000, max_tasks_in_memory=10000):
+        self.workers = LocalCache(max_workers_in_memory)
+        self.tasks = LocalCache(max_tasks_in_memory)
+        self.event_callback = callback
+        self.group_handlers = {"worker": self.worker_event,
+                               "task": self.task_event}
+        self._resource = RLock()
+
     def freeze(self, buffer=True):
         """Stop recording the event stream.
 
@@ -182,14 +191,6 @@ class State(object):
         finally:
             self.thaw(replay=True)
 
-    def __init__(self, callback=None,
-            max_workers_in_memory=5000, max_tasks_in_memory=10000):
-        self.workers = LocalCache(max_workers_in_memory)
-        self.tasks = LocalCache(max_tasks_in_memory)
-        self.event_callback = callback
-        self.group_handlers = {"worker": self.worker_event,
-                               "task": self.task_event}
-        self._resource = RLock()
 
     def clear_tasks(self, ready=True):
         if ready:

+ 11 - 3
celery/schedules.py

@@ -210,15 +210,23 @@ class crontab(schedule):
 
     def __init__(self, minute='*', hour='*', day_of_week='*',
             nowfun=datetime.now):
+        self._orig_minute = minute
+        self._orig_hour = hour
+        self._orig_day_of_week = day_of_week
         self.hour = self._expand_cronspec(hour, 24)
         self.minute = self._expand_cronspec(minute, 60)
         self.day_of_week = self._expand_cronspec(day_of_week, 7)
         self.nowfun = nowfun
 
+    def __repr__(self):
+        return "%s %s %s (m/d/h)" % (self._orig_minute or "*",
+                                     self._orig_hour or "*",
+                                     self._orig_day_of_week or "*")
+
     def __reduce__(self):
-        return (self.__class__, (self.minute,
-                                 self.hour,
-                                 self.day_of_week), None)
+        return (self.__class__, (self._orig_minute,
+                                 self._orig_hour,
+                                 self._orig_day_of_week), None)
 
     def remaining_estimate(self, last_run_at):
         # remaining_estimate controls the frequency of scheduler

+ 2 - 2
celery/task/base.py

@@ -24,7 +24,7 @@ in celery v3.0.
 Please use the CELERYBEAT_SCHEDULE setting instead:
 
     CELERYBEAT_SCHEDULE = {
-        name: dict(name=task_name, schedule=run_every,
+        name: dict(task=task_name, schedule=run_every,
                    args=(), kwargs={}, options={}, relative=False)
     }
 
@@ -624,7 +624,7 @@ class PeriodicTask(Task):
         # For backward compatibility, add the periodic task to the
         # configuration schedule instead.
         conf.CELERYBEAT_SCHEDULE[self.name] = {
-                "name": self.name,
+                "task": self.name,
                 "schedule": self.run_every,
                 "args": (),
                 "kwargs": {},