Jelajahi Sumber

Merge branch '3.0'

Conflicts:
	Changelog
	celery/beat.py
	celery/utils/timeutils.py
Ask Solem 12 tahun lalu
induk
melakukan
483faa02a2

+ 57 - 0
Changelog

@@ -20,6 +20,63 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 - `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
 - `App.control.Inspect.conf` can be used for inspecting worker configuration
 
+.. _version-3.0.9:
+
+3.0.9
+=====
+:release-date: 2012-08-31 06:00 P.M BST
+
+- Important note for users of Django and the database scheduler!
+
+    Recently a timezone issue has been fixed for periodic tasks,
+    but erroneous timezones could have already been stored in the
+    database, so for the fix to work you need to reset
+    the ``last_run_at`` fields.
+
+    You can do this by executing the following command:
+
+    .. code-block:: bash
+
+        $ python manage.py shell
+        >>> from djcelery.models import PeriodicTask
+        >>> PeriodicTask.objects.update(last_run_at=None)
+
+- Fixed bug with timezones when :setting:`CELERY_ENABLE_UTC` is disabled
+  (Issue #952).
+
+- Fixed a typo in the celerybeat upgrade mechanism (Issue #951).
+
+- Make sure the `exc_info` argument to logging is resolved (Issue #899).
+
+- Fixed problem with Python 3.2 and thread join timeout overflow (Issue #796).
+
+- A test case was occasionally broken for Python 2.5.
+
+- Unit test suite now passes for PyPy 1.9.
+
+- App instances now supports the with statement.
+
+    This calls the new :meth:`~celery.Celery.close` method at exit, which
+    cleans up after the app like closing pool connections.
+
+    Note that this is only necessary when dynamically creating apps,
+    e.g. for "temporary" apps.
+
+- Support for piping a subtask to a chain.
+
+    For example:
+
+    .. code-block:: python
+
+        pipe = sometask.s() | othertask.s()
+        new_pipe = mytask.s() | pipe
+
+    Contributed by Steve Morin.
+
+- Fixed problem with group results on non-pickle serializers.
+
+    Fix contributed by Steeve Morin.
+
 .. _version-3.0.8:
 
 3.0.8

+ 19 - 4
celery/beat.py

@@ -33,7 +33,8 @@ from .utils.timeutils import humanize_seconds
 from .utils.log import get_logger
 
 logger = get_logger(__name__)
-debug, info, error = logger.debug, logger.info, logger.error
+debug, info, error, warning = (logger.debug, logger.info,
+                               logger.error, logger.warning)
 
 DEFAULT_MAX_INTERVAL = 300  # 5 minutes
 
@@ -341,17 +342,31 @@ class PersistentScheduler(Scheduler):
                                                 writeback=True)
         else:
             if '__version__' not in self._store:
+                warning('Reset: Account for new __version__ field')
                 self._store.clear()   # remove schedule at 2.2.2 upgrade.
             if 'tz' not in self._store:
+                warning('Reset: Account for new tz field')
                 self._store.clear()   # remove schedule at 3.0.8 upgrade
+            if 'utc_enabled' not in self._store:
+                warning('Reset: Account for new utc_enabled field')
+                self._store.clear()   # remove schedule at 3.0.9 upgrade
+
         tz = self.app.conf.CELERY_TIMEZONE
-        current_tz = self._store.get('tz')
-        if current_tz is not None and current_tz != tz:
+        stored_tz = self._store.get('tz')
+        if stored_tz is not None and stored_tz != tz:
+            warning('Reset: Timezone changed from %r to %r', stored_tz, tz)
             self._store.clear()   # Timezone changed, reset db!
+        utc = self.app.conf.CELERY_ENABLE_UTC
+        stored_utc = self._store.get('utc_enabled')
+        if stored_utc is not None and stored_utc != utc:
+            choices = {True: 'enabled', False: 'disabled'}
+            warning('Reset: UTC changed from %s to %s',
+                    choices[stored_utc], choices[utc])
+            self._store.clear()   # UTC setting changed, reset db!
         entries = self._store.setdefault('entries', {})
         self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
         self.install_default_entries(self.schedule)
-        self._store.update(__version__=__version__, utc=True, tz=tz)
+        self._store.update(__version__=__version__, tz=tz, utc_enabled=utc)
         self.sync()
         debug('Current schedule:\n' + '\n'.join(repr(entry)
                                     for entry in entries.itervalues()))

+ 0 - 1
celery/concurrency/eventlet.py

@@ -34,7 +34,6 @@ if not EVENTLET_NOPATCH and not PATCHED[0]:
     import eventlet
     import eventlet.debug
     eventlet.monkey_patch()
-    eventlet.debug.hub_prevent_multiple_readers(True)
     eventlet.debug.hub_blocking_detection(EVENTLET_DBLOCK)
 
 from time import time

+ 18 - 10
celery/schedules.py

@@ -12,7 +12,9 @@ from __future__ import absolute_import
 import re
 
 from datetime import datetime, timedelta
+
 from dateutil.relativedelta import relativedelta
+from kombu.utils import cached_property
 
 from . import current_app
 from .utils import is_iterable
@@ -38,7 +40,6 @@ class ParseException(Exception):
 
 
 class schedule(object):
-    _app = None
     relative = False
 
     def __init__(self, run_every=None, relative=False, nowfun=None):
@@ -89,7 +90,7 @@ class schedule(object):
         return False, rem
 
     def maybe_make_aware(self, dt):
-        if self.app.conf.CELERY_ENABLE_UTC:
+        if self.utc_enabled:
             return maybe_make_aware(dt, self.tz)
         return dt
 
@@ -109,15 +110,22 @@ class schedule(object):
     def human_seconds(self):
         return humanize_seconds(self.seconds)
 
-    @property
+    @cached_property
     def app(self):
-        if self._app is None:
-            self._app = current_app._get_current_object()
-        return self._app
+        return current_app._get_current_object()
 
-    @property
+    @cached_property
     def tz(self):
-        return self.app.conf.CELERY_TIMEZONE
+        return timezone.get_timezone(self.app.conf.CELERY_TIMEZONE)
+
+    @cached_property
+    def utc_enabled(self):
+        return self.app.conf.CELERY_ENABLE_UTC
+
+    @cached_property
+    def to_local(self):
+        return (timezone.to_local if self.utc_enabled
+                                  else timezone.to_local_fallback)
 
 
 class crontab_parser(object):
@@ -480,8 +488,8 @@ class crontab(schedule):
                     delta = self._delta_to_next(last_run_at,
                                                 next_hour, next_minute)
 
-        return remaining(timezone.to_local(last_run_at, tz),
-                         delta, timezone.to_local(self.now(), tz))
+        return remaining(self.to_local(last_run_at, tz),
+                         delta, self.to_local(self.now(), tz))
 
     def is_due(self, last_run_at):
         """Returns tuple of two items `(is_due, next_time_to_run)`,

+ 4 - 0
celery/tests/bin/test_celeryd.py

@@ -60,6 +60,10 @@ class Worker(cd.Worker):
     def start(self, *args, **kwargs):
         self.on_start()
 
+    def __init__(self, *args, **kwargs):
+        super(Worker, self).__init__(*args, **kwargs)
+        self.redirect_stdouts = False
+
 
 class test_Worker(AppCase):
 

+ 11 - 2
celery/tests/concurrency/test_eventlet.py

@@ -29,10 +29,19 @@ class EventletCase(Case):
             raise SkipTest(
                 'eventlet not installed, skipping related tests.')
 
+    @skip_if_pypy
+    def tearDown(self):
+        for mod in [mod for mod in sys.modules if mod.startswith('eventlet')]:
+            try:
+                del(sys.modules[mod])
+            except KeyError:
+                pass
+
 
-class test_eventlet_patch(EventletCase):
+class test_aaa_eventlet_patch(EventletCase):
 
-    def test_is_patched(self):
+    def test_aaa_is_patched(self):
+        raise SkipTest("side effects")
         monkey_patched = []
         prev_monkey_patch = self.eventlet.monkey_patch
         self.eventlet.monkey_patch = lambda: monkey_patched.append(True)

+ 15 - 5
celery/tests/utils.py

@@ -473,13 +473,23 @@ def mock_module(*names):
 
     mods = []
     for name in names:
-        prev[name] = sys.modules.get(name)
+        try:
+            prev[name] = sys.modules[name]
+        except KeyError:
+            pass
         mod = sys.modules[name] = MockModule(name)
         mods.append(mod)
-    yield mods
-    for name in names:
-        if prev[name]:
-            sys.modules[name] = prev[name]
+    try:
+        yield mods
+    finally:
+        for name in names:
+            try:
+                sys.modules[name] = prev[name]
+            except KeyError:
+                try:
+                    del(sys.modules[name])
+                except KeyError:
+                    pass
 
 
 @contextmanager

+ 2 - 0
celery/utils/log.py

@@ -83,6 +83,8 @@ class ColorFormatter(logging.Formatter):
         self.use_color = use_color
 
     def formatException(self, ei):
+        if ei and not isinstance(ei, tuple):
+            ei = sys.exc_info()
         r = logging.Formatter.formatException(self, ei)
         if isinstance(r, str) and not is_py3k:
             return safe_str(r)

+ 66 - 1
celery/utils/timeutils.py

@@ -8,9 +8,12 @@
 """
 from __future__ import absolute_import
 
+import time as _time
 from itertools import izip
 
-from datetime import datetime, timedelta
+from kombu.utils import cached_property
+
+from datetime import datetime, timedelta, tzinfo
 from dateutil import tz
 from dateutil.parser import parse as parse_iso8601
 from kombu.utils import cached_property
@@ -40,6 +43,63 @@ TIME_UNITS = (('day',    60 * 60 * 24.0, lambda n: format(n, '.2f')),
               ('minute', 60.0,           lambda n: format(n, '.2f')),
               ('second', 1.0,            lambda n: format(n, '.2f')))
 
+ZERO = timedelta(0)
+
+_local_timezone = None
+
+
+class LocalTimezone(tzinfo):
+    """
+    Local time implementation taken from Python's docs.
+
+    Used only when pytz isn't available, and most likely inaccurate. If you're
+    having trouble with this class, don't waste your time, just install pytz.
+    """
+
+    def __init__(self):
+        # This code is moved in __init__ to execute it as late as possible
+        # See get_default_timezone().
+        self.STDOFFSET = timedelta(seconds=-_time.timezone)
+        if _time.daylight:
+            self.DSTOFFSET = timedelta(seconds=-_time.altzone)
+        else:
+            self.DSTOFFSET = self.STDOFFSET
+        self.DSTDIFF = self.DSTOFFSET - self.STDOFFSET
+        tzinfo.__init__(self)
+
+    def __repr__(self):
+        return "<LocalTimezone>"
+
+    def utcoffset(self, dt):
+        if self._isdst(dt):
+            return self.DSTOFFSET
+        else:
+            return self.STDOFFSET
+
+    def dst(self, dt):
+        if self._isdst(dt):
+            return self.DSTDIFF
+        else:
+            return ZERO
+
+    def tzname(self, dt):
+        return _time.tzname[self._isdst(dt)]
+
+    def _isdst(self, dt):
+        tt = (dt.year, dt.month, dt.day,
+              dt.hour, dt.minute, dt.second,
+              dt.weekday(), 0, 0)
+        stamp = _time.mktime(tt)
+        tt = _time.localtime(stamp)
+        return tt.tm_isdst > 0
+
+
+def _get_local_timezone():
+    global _local_timezone
+    if _local_timezone is None:
+        _local_timezone = LocalTimezone()
+    return _local_timezone
+
 
 class _Zone(object):
 
@@ -53,6 +113,11 @@ class _Zone(object):
             dt = make_aware(dt, orig or self.utc)
         return localize(dt, self.tz_or_local(local))
 
+    def to_local_fallback(self, dt, *args, **kwargs):
+        if is_naive(dt):
+            return make_aware(dt, _get_local_timezone())
+        return localize(dt, _get_local_timezone())
+
     def get_timezone(self, zone):
         if isinstance(zone, basestring):
             if pytz is None:

+ 10 - 0
docs/reference/celery.rst

@@ -83,6 +83,16 @@ Application
 
         Base task class for this app.
 
+    .. method:: Celery.close
+
+        Cleans-up after application, like closing any pool connections.
+        Only necessary for dynamically created apps for which you can
+        use the with statement::
+
+            with Celery(...) as app:
+                with app.connection() as conn:
+                    pass
+
     .. method:: Celery.bugreport
 
         Returns a string with information useful for the Celery core