Browse Source

Fix task ETA issues when timezone is defined in configuration (#3867)

* Fix wrong task ETA when using timezone setting

* Optimization: reduce polling interval in test cases

* Test cases for App.now, App.uses_utc_timezone

* Fix PEP8 and pydocstyle issues
George Psarakis 8 years ago
parent
commit
eac96241d2

+ 1 - 7
celery/app/amqp.py

@@ -21,7 +21,7 @@ from celery.local import try_import
 from celery.utils.nodenames import anon_nodename
 from celery.utils.saferepr import saferepr
 from celery.utils.text import indent as textindent
-from celery.utils.time import maybe_make_aware, to_utc
+from celery.utils.time import maybe_make_aware
 
 from . import routes as _routes
 
@@ -407,17 +407,11 @@ class AMQP(object):
         if countdown:  # convert countdown to ETA
             self._verify_seconds(countdown, 'countdown')
             now = now or self.app.now()
-            timezone = timezone or self.app.timezone
             eta = now + timedelta(seconds=countdown)
-            if utc:
-                eta = to_utc(eta).astimezone(timezone)
         if isinstance(expires, numbers.Real):
             self._verify_seconds(expires, 'expires')
             now = now or self.app.now()
-            timezone = timezone or self.app.timezone
             expires = now + timedelta(seconds=expires)
-            if utc:
-                expires = to_utc(expires).astimezone(timezone)
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
 

+ 13 - 5
celery/app/base.py

@@ -870,7 +870,8 @@ class Celery(object):
 
     def now(self):
         """Return the current time and date as a datetime."""
-        return self.loader.now(utc=self.conf.enable_utc)
+        from datetime import datetime
+        return datetime.utcnow().replace(tzinfo=self.timezone)
 
     def select_queues(self, queues=None):
         """Select subset of queues.
@@ -1231,6 +1232,10 @@ class Celery(object):
     def producer_pool(self):
         return self.amqp.producer_pool
 
+    def uses_utc_timezone(self):
+        """Check if the application uses the UTC timezone."""
+        return self.conf.timezone == 'UTC' or self.conf.timezone is None
+
     @cached_property
     def timezone(self):
         """Current timezone for this app.
@@ -1239,9 +1244,12 @@ class Celery(object):
         :setting:`timezone` setting.
         """
         conf = self.conf
-        tz = conf.timezone
+        tz = conf.timezone or 'UTC'
         if not tz:
-            return (timezone.get_timezone('UTC') if conf.enable_utc
-                    else timezone.local)
-        return timezone.get_timezone(conf.timezone)
+            if conf.enable_utc:
+                return timezone.get_timezone('UTC')
+            else:
+                if not conf.timezone:
+                    return timezone.local
+        return timezone.get_timezone(tz)
 App = Celery  # noqa: E305 XXX compat

+ 2 - 2
celery/worker/strategy.py

@@ -92,7 +92,7 @@ def default(task, app, consumer,
                              to_timestamp=to_timestamp):
         if body is None:
             body, headers, decoded, utc = (
-                message.body, message.headers, False, True,
+                message.body, message.headers, False, app.uses_utc_timezone(),
             )
             if not body_can_be_buffer:
                 body = bytes(body) if isinstance(body, buffer_t) else body
@@ -126,7 +126,7 @@ def default(task, app, consumer,
                 if req.utc:
                     eta = to_timestamp(to_system_tz(req.eta))
                 else:
-                    eta = to_timestamp(req.eta, timezone.local)
+                    eta = to_timestamp(req.eta, app.timezone)
             except (OverflowError, ValueError) as exc:
                 error("Couldn't convert ETA %r to timestamp: %r. Task: %r",
                       req.eta, exc, req.info(safe=True), exc_info=True)

+ 44 - 2
t/unit/app/test_app.py

@@ -1,5 +1,6 @@
 from __future__ import absolute_import, unicode_literals
 
+from datetime import datetime, timedelta
 import gc
 import itertools
 import os
@@ -23,7 +24,7 @@ from celery.loaders.base import unconfigured
 from celery.platforms import pyimplementation
 from celery.utils.collections import DictAttribute
 from celery.utils.serialization import pickle
-from celery.utils.time import timezone
+from celery.utils.time import timezone, to_utc, localize
 from celery.utils.objects import Bunch
 
 THIS_IS_A_KEY = 'this is a value'
@@ -73,6 +74,36 @@ class test_App:
     def setup(self):
         self.app.add_defaults(deepcopy(self.CELERY_TEST_CONFIG))
 
+    def test_now(self):
+        timezone_setting_value = 'US/Eastern'
+        tz_utc = timezone.get_timezone('UTC')
+        tz_us_eastern = timezone.get_timezone(timezone_setting_value)
+
+        now = datetime.utcnow().replace(tzinfo=tz_utc)
+        app_now = self.app.now()
+
+        assert app_now.tzinfo == tz_utc
+        assert app_now - now <= timedelta(seconds=1)
+
+        # Check that timezone conversion is applied from configuration
+        self.app.conf.enable_utc = False
+        self.app.conf.timezone = timezone_setting_value
+        # timezone is a cached property
+        del self.app.timezone
+
+        app_now = self.app.now()
+        assert app_now.tzinfo == tz_us_eastern
+
+        diff = to_utc(datetime.utcnow()) - localize(app_now, tz_utc)
+        assert diff <= timedelta(seconds=1)
+
+        # Verify that timezone setting overrides enable_utc=on setting
+        self.app.conf.enable_utc = True
+        del self.app.timezone
+        app_now = self.app.now()
+        assert self.app.timezone == tz_us_eastern
+        assert app_now.tzinfo == tz_us_eastern
+
     @patch('celery.app.base.set_default_app')
     def test_set_default(self, set_default_app):
         self.app.set_default()
@@ -647,7 +678,8 @@ class test_App:
         _args = {'foo': 'bar', 'spam': 'baz'}
 
         self.app.config_from_object(Bunch())
-        assert self.app.conf.broker_transport_options == {}
+        assert self.app.conf.broker_transport_options == \
+            {'polling_interval': 0.1}
 
         self.app.config_from_object(Bunch(broker_transport_options=_args))
         assert self.app.conf.broker_transport_options == _args
@@ -797,6 +829,16 @@ class test_App:
         tz = self.app.timezone
         assert tz == timezone.get_timezone('UTC')
 
+    def test_uses_utc_timezone(self):
+        self.app.conf.timezone = None
+        assert self.app.uses_utc_timezone() is True
+
+        self.app.conf.timezone = 'US/Eastern'
+        assert self.app.uses_utc_timezone() is False
+
+        self.app.conf.timezone = 'UTC'
+        assert self.app.uses_utc_timezone() is True
+
     def test_compat_on_configure(self):
         _on_configure = Mock(name='on_configure')
 

+ 10 - 4
t/unit/backends/test_base.py

@@ -357,19 +357,25 @@ class test_KeyValueStoreBackend:
             ids = {uuid(): i for i in range(10)}
             for id, i in items(ids):
                 self.b.mark_as_done(id, i)
-            it = self.b.get_many(list(ids))
+            it = self.b.get_many(list(ids), interval=0.01)
             for i, (got_id, got_state) in enumerate(it):
                 assert got_state['result'] == ids[got_id]
             assert i == 9
-            assert list(self.b.get_many(list(ids)))
+            assert list(self.b.get_many(list(ids), interval=0.01))
 
             self.b._cache.clear()
             callback = Mock(name='callback')
-            it = self.b.get_many(list(ids), on_message=callback)
+            it = self.b.get_many(
+                list(ids),
+                on_message=callback,
+                interval=0.05
+            )
             for i, (got_id, got_state) in enumerate(it):
                 assert got_state['result'] == ids[got_id]
             assert i == 9
-            assert list(self.b.get_many(list(ids)))
+            assert list(
+                self.b.get_many(list(ids), interval=0.01)
+            )
             callback.assert_has_calls([
                 call(ANY) for id in ids
             ])

+ 3 - 1
t/unit/conftest.py

@@ -48,8 +48,10 @@ CASE_LOG_HANDLER_EFFECT = 'Test {0} modified handlers for the root logger'
 def celery_config():
     return {
         'broker_url': 'memory://',
+        'broker_transport_options': {
+            'polling_interval': 0.1
+        },
         'result_backend': 'cache+memory://',
-
         'task_default_queue': 'testcelery',
         'task_default_exchange': 'testcelery',
         'task_default_routing_key': 'testcelery',

+ 6 - 4
t/unit/contrib/test_migrate.py

@@ -267,8 +267,11 @@ class test_migrate_task:
 class test_migrate_tasks:
 
     def test_migrate(self, app, name='testcelery'):
-        x = Connection('memory://foo')
-        y = Connection('memory://foo')
+        connection_kwargs = dict(
+            transport_options={'polling_interval': 0.01}
+        )
+        x = Connection('memory://foo', **connection_kwargs)
+        y = Connection('memory://foo', **connection_kwargs)
         # use separate state
         x.default_channel.queues = {}
         y.default_channel.queues = {}
@@ -281,7 +284,6 @@ class test_migrate_tasks:
         Producer(x).publish('baz', exchange=name, routing_key=name)
         assert x.default_channel.queues
         assert not y.default_channel.queues
-
         migrate_tasks(x, y, accept=['text/plain'], app=app)
 
         yq = q(y.default_channel)
@@ -309,7 +311,7 @@ class test_migrate_tasks:
             qd.side_effect = effect
             migrate_tasks(x, y, app=app)
 
-        x = Connection('memory://')
+        x = Connection('memory://', **connection_kwargs)
         x.default_channel.queues = {}
         y.default_channel.queues = {}
         callback = Mock()

+ 10 - 0
t/unit/tasks/test_tasks.py

@@ -251,6 +251,16 @@ class test_task_retries(TasksCase):
         self.autoretry_task.apply((1, 0))
         assert self.autoretry_task.iterations == 6
 
+    def test_retry_wrong_eta_when_not_enable_utc(self):
+        """Issue #3753"""
+        self.app.conf.enable_utc = False
+        self.app.conf.timezone = 'US/Eastern'
+        self.autoretry_task.iterations = 0
+        self.autoretry_task.default_retry_delay = 2
+
+        self.autoretry_task.apply((1, 0))
+        assert self.autoretry_task.iterations == 6
+
 
 class test_canvas_utils(TasksCase):