Browse Source

The current datetime now taken from loader.now

Ask Solem 13 years ago
parent
commit
c86a7df558

+ 4 - 4
celery/app/amqp.py

@@ -164,7 +164,7 @@ class TaskPublisher(messaging.Publisher):
             _exchanges_declared.add(self.exchange.name)
 
     def _declare_queue(self, name, retry=False, retry_policy={}):
-        options = self.app.queues[name]
+        options = self.app.amqp.queues[name]
         queue = messaging.entry_to_queue(name, **options)(self.channel)
         if retry:
             self.connection.ensure(queue, queue.declare, **retry_policy)()
@@ -209,10 +209,10 @@ class TaskPublisher(messaging.Publisher):
         if not isinstance(task_kwargs, dict):
             raise ValueError("task kwargs must be a dictionary")
         if countdown:                           # Convert countdown to ETA.
-            now = now or datetime.utcnow()
+            now = now or self.app.now()
             eta = now + timedelta(seconds=countdown)
         if isinstance(expires, (int, float)):
-            now = now or datetime.utcnow()
+            now = now or self.app.now()
             expires = now + timedelta(seconds=expires)
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
@@ -323,7 +323,7 @@ class AMQP(object):
                     "retry": conf.CELERY_TASK_PUBLISH_RETRY,
                     "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
                     "enable_utc": conf.CELERY_ENABLE_UTC,
-                    "app": self}
+                    "app": self.app}
         return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
 
     def get_task_consumer(self, connection, queues=None, **kwargs):

+ 4 - 0
celery/app/base.py

@@ -17,6 +17,7 @@ import platform as _platform
 
 from contextlib import contextmanager
 from copy import deepcopy
+from datetime import datetime
 from functools import wraps
 
 from kombu.clocks import LamportClock
@@ -268,6 +269,9 @@ class BaseApp(object):
         find_deprecated_settings(c)
         return c
 
+    def now(self):
+        return self.loader.now()
+
     def mail_admins(self, subject, body, fail_silently=False):
         """Send an email to the admins in the :setting:`ADMINS` setting."""
         if self.conf.ADMINS:

+ 1 - 1
celery/backends/cassandra.py

@@ -125,7 +125,7 @@ class CassandraBackend(BaseDictBackend):
 
         def _do_store():
             cf = self._get_column_family()
-            date_done = datetime.utcnow()
+            date_done = self.app.now()
             meta = {"status": status,
                     "result": self.encode(result),
                     "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),

+ 3 - 2
celery/backends/database.py

@@ -127,11 +127,12 @@ class DatabaseBackend(BaseDictBackend):
         """Delete expired metadata."""
         session = self.ResultSession()
         expires = self.expires
+        now = self.app.now()
         try:
             session.query(Task).filter(
-                    Task.date_done < (datetime.utcnow() - expires)).delete()
+                    Task.date_done < (now - expires)).delete()
             session.query(TaskSet).filter(
-                    TaskSet.date_done < (datetime.utcnow() - expires)).delete()
+                    TaskSet.date_done < (now - expires)).delete()
             session.commit()
         finally:
             session.close()

+ 1 - 1
celery/backends/mongodb.py

@@ -200,7 +200,7 @@ class MongoBackend(BaseDictBackend):
         taskmeta_collection = db[self.mongodb_taskmeta_collection]
         taskmeta_collection.remove({
                 "date_done": {
-                    "$lt": datetime.utcnow() - self.expires,
+                    "$lt": self.app.now() - self.expires,
                  }
         })
 

+ 3 - 2
celery/beat.py

@@ -29,6 +29,7 @@ from . import __version__
 from . import platforms
 from . import registry
 from . import signals
+from . import current_app
 from .app import app_or_default
 from .log import SilenceRepeated
 from .schedules import maybe_schedule, crontab
@@ -88,13 +89,13 @@ class ScheduleEntry(object):
         self.total_run_count = total_run_count or 0
 
     def _default_now(self):
-        return datetime.utcnow()
+        return current_app.now()
 
     def _next_instance(self, last_run_at=None):
         """Returns a new instance of the same class, but with
         its date and count fields updated."""
         return self.__class__(**dict(self,
-                                last_run_at=last_run_at or datetime.utcnow(),
+                                last_run_at=last_run_at or self._default_now(),
                                 total_run_count=self.total_run_count + 1))
     __next__ = next = _next_instance  # for 2to3
 

+ 4 - 0
celery/loaders/base.py

@@ -17,6 +17,7 @@ import re
 import warnings
 
 from anyjson import deserialize
+from datetime import datetime
 
 from ..datastructures import DictAttribute
 from ..exceptions import ImproperlyConfigured
@@ -61,6 +62,9 @@ class BaseLoader(object):
         from ..app import app_or_default
         self.app = app_or_default(app)
 
+    def now(self):
+        return datetime.utcnow()
+
     def on_task_init(self, task_id, task):
         """This method is called before a task is executed."""
         pass

+ 7 - 5
celery/schedules.py

@@ -17,6 +17,7 @@ import re
 from datetime import datetime, timedelta
 from dateutil.relativedelta import relativedelta
 
+from . import current_app
 from .utils import is_iterable
 from .utils.timeutils import (timedelta_seconds, weekday, maybe_timedelta,
                               remaining, humanize_seconds)
@@ -29,13 +30,15 @@ class ParseException(Exception):
 class schedule(object):
     relative = False
 
-    def __init__(self, run_every=None, relative=False):
+    def __init__(self, run_every=None, relative=False, nowfun=None):
         self.run_every = maybe_timedelta(run_every)
         self.relative = relative
+        self.nowfun = nowfun or current_app.now
 
     def remaining_estimate(self, last_run_at):
         """Returns when the periodic task should run next as a timedelta."""
-        return remaining(last_run_at, self.run_every, relative=self.relative)
+        return remaining(last_run_at, self.run_every, relative=self.relative,
+                         now=self.nowfun())
 
     def is_due(self, last_run_at):
         """Returns tuple of two items `(is_due, next_time_to_run)`,
@@ -257,15 +260,14 @@ class crontab(schedule):
 
         return result
 
-    def __init__(self, minute='*', hour='*', day_of_week='*',
-            nowfun=datetime.utcnow):
+    def __init__(self, minute='*', hour='*', day_of_week='*', nowfun=None):
         self._orig_minute = minute
         self._orig_hour = hour
         self._orig_day_of_week = day_of_week
         self.hour = self._expand_cronspec(hour, 24)
         self.minute = self._expand_cronspec(minute, 60)
         self.day_of_week = self._expand_cronspec(day_of_week, 7)
-        self.nowfun = nowfun
+        self.nowfun = nowfun or current_app.now
 
     def __repr__(self):
         return "<crontab: %s %s %s (m/h/d)>" % (self._orig_minute or "*",