Explorar el Código

ETA must be converted to system timezone. Closes #1004

Ask Solem hace 12 años
padre
commit
c2037732fe
Se han modificado 4 ficheros con 31 adiciones y 28 borrados
  1. 3 0
      Changelog
  2. 6 10
      celery/utils/timeutils.py
  3. 6 4
      celery/worker/consumer.py
  4. 16 14
      celery/worker/job.py

+ 3 - 0
Changelog

@@ -56,6 +56,9 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 - Fixes request stack protection when app is initialized more than
   once (Issue #1003).
 
+- ETA tasks now properly works when system timezone is not the same
+  as the configured timezone (Issue #1004).
+
 - Terminating a task now works if the task has been sent to the
   pool but not yet acknowledged by a pool process (Issue #1007).
 

+ 6 - 10
celery/utils/timeutils.py

@@ -96,13 +96,6 @@ class LocalTimezone(tzinfo):
         return tt.tm_isdst > 0
 
 
-def _get_local_timezone():
-    global _local_timezone
-    if _local_timezone is None:
-        _local_timezone = LocalTimezone()
-    return _local_timezone
-
-
 class _Zone(object):
 
     def tz_or_local(self, tzinfo=None):
@@ -115,10 +108,13 @@ class _Zone(object):
             dt = make_aware(dt, orig or self.utc)
         return localize(dt, self.tz_or_local(local))
 
+    def to_system(self, dt):
+        return localize(dt, self.local)
+
     def to_local_fallback(self, dt, *args, **kwargs):
         if is_naive(dt):
-            return make_aware(dt, _get_local_timezone())
-        return localize(dt, _get_local_timezone())
+            return make_aware(dt, self.local)
+        return localize(dt, self.local)
 
     def get_timezone(self, zone):
         if isinstance(zone, basestring):
@@ -132,7 +128,7 @@ class _Zone(object):
 
     @cached_property
     def local(self):
-        return _get_local_timezone()
+        return LocalTimezone()
 
     @cached_property
     def utc(self):

+ 6 - 4
celery/worker/consumer.py

@@ -92,7 +92,7 @@ from celery.utils import text
 from celery.utils import timer2
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
-from celery.utils.timeutils import humanize_seconds
+from celery.utils.timeutils import humanize_seconds, timezone
 
 from . import state
 from .bootsteps import StartStopComponent
@@ -516,16 +516,18 @@ class Consumer(object):
                     expires=task.expires and task.expires.isoformat())
 
         if task.eta:
+            eta = timezone.to_system(task.eta) if task.utc else task.eta
             try:
-                eta = timer2.to_timestamp(task.eta)
+                eta = timer2.to_timestamp(eta)
             except OverflowError, exc:
                 error("Couldn't convert eta %s to timestamp: %r. Task: %r",
                       task.eta, exc, task.info(safe=True), exc_info=True)
                 task.acknowledge()
             else:
                 self.qos.increment_eventually()
-                self.timer.apply_at(eta, self.apply_eta_task, (task, ),
-                                    priority=6)
+                self.timer.apply_at(
+                    eta, self.apply_eta_task, (task, ), priority=6,
+                )
         else:
             task_reserved(task)
             self._quick_put(task)

+ 16 - 14
celery/worker/job.py

@@ -34,7 +34,7 @@ from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.serialization import get_pickled_exception
 from celery.utils.text import truncate
-from celery.utils.timeutils import maybe_iso8601, timezone
+from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
 
 from . import state
 
@@ -45,9 +45,8 @@ _does_debug = logger.isEnabledFor(logging.DEBUG)
 _does_info = logger.isEnabledFor(logging.INFO)
 
 # Localize
-tz_to_local = timezone.to_local
-tz_or_local = timezone.tz_or_local
 tz_utc = timezone.utc
+tz_or_local = timezone.tz_or_local
 send_revoked = signals.task_revoked.send
 
 task_accepted = state.task_accepted
@@ -64,7 +63,7 @@ class Request(object):
                  'eventer', 'connection_errors',
                  'task', 'eta', 'expires',
                  'request_dict', 'acknowledged', 'success_msg',
-                 'error_msg', 'retry_msg', 'ignore_msg',
+                 'error_msg', 'retry_msg', 'ignore_msg', 'utc',
                  'time_start', 'worker_pid', '_already_revoked',
                  '_terminate_on_ack', '_tzlocal')
 
@@ -108,7 +107,7 @@ class Request(object):
             self.kwargs = kwdict(self.kwargs)
         eta = body.get('eta')
         expires = body.get('expires')
-        utc = body.get('utc', False)
+        utc = self.utc = body.get('utc', False)
         self.on_ack = on_ack
         self.hostname = hostname or socket.gethostname()
         self.eventer = eventer
@@ -121,14 +120,15 @@ class Request(object):
         # timezone means the message is timezone-aware, and the only timezone
         # supported at this point is UTC.
         if eta is not None:
-            tz = tz_utc if utc else self.tzlocal
-            self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
+            self.eta = maybe_iso8601(eta)
+            if utc:
+                self.eta = maybe_make_aware(self.eta, self.tzlocal)
         else:
             self.eta = None
         if expires is not None:
-            tz = tz_utc if utc else self.tzlocal
-            self.expires = tz_to_local(maybe_iso8601(expires),
-                                       self.tzlocal, tz)
+            self.expires = maybe_iso8601(expires)
+            if utc:
+                self.expires = maybe_make_aware(self.expires, self.tzlocal)
         else:
             self.expires = None
 
@@ -238,9 +238,11 @@ class Request(object):
 
     def maybe_expire(self):
         """If expired, mark the task as revoked."""
-        if self.expires and datetime.now(self.tzlocal) > self.expires:
-            revoked_tasks.add(self.id)
-            return True
+        if self.expires:
+            now = datetime.now(tz_or_local(self.tzlocal) if self.utc else None)
+            if now > self.expires:
+                revoked_tasks.add(self.id)
+                return True
 
     def terminate(self, pool, signal=None):
         if self.time_start:
@@ -456,7 +458,7 @@ class Request(object):
     @property
     def tzlocal(self):
         if self._tzlocal is None:
-            self._tzlocal = tz_or_local(self.app.conf.CELERY_TIMEZONE)
+            self._tzlocal = self.app.conf.CELERY_TIMEZONE
         return self._tzlocal
 
     @property