Przeglądaj źródła

Merge branch '3.0'

Conflicts:
	README.rst
	celery/__init__.py
	celery/app/utils.py
	celery/worker/__init__.py
	docs/includes/introduction.txt
	requirements/default.txt
	setup.cfg
Ask Solem 12 lat temu
rodzic
commit
3b17a2e521

+ 23 - 4
Changelog

@@ -24,10 +24,32 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 
 3.0.6
 =====
-:release-date: 2012-09-XX XX:XX X.M BST
+:release-date: 2012-09-17 11:00 P.M BST
+
+- Now depends on kombu 2.4.0
 
 - Now depends on billiard 2.7.3.12
 
+- Redis: Celery now tries to restore messages whenever there are no messages
+  in the queue.
+
+- Crontab schedules now properly respects :setting:`CELERY_TIMEZONE` setting.
+
+    It's important to note that crontab schedules uses UTC time by default
+    unless this setting is set.
+
+    Issue #904 and django-celery #150.
+
+- ``billiard.enable_forking`` is now only set by the processes pool.
+
+- The transport is now properly shown by :program:`celery report`
+  (Issue #913).
+
+- The `--app` argument now works if the last part is a module name
+  (Issue #921).
+
+- Fixed problem with unpickleable exceptions (billiard #12).
+
 - Adds ``task_name`` attribute to ``EagerResult`` which is always
   :const:`None` (Issue #907).
 
@@ -54,9 +76,6 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 - The argument to :class:`~celery.exceptions.TaskRevokedError` is now one
   of the reasons ``revoked``, ``expired`` or ``terminated``.
 
-- Redis: Celery now tries to restore messages whenever there are no messages
-  in the queue.
-
 - Old Task class does no longer use classmethods for push_request and
   pop_request  (Issue #912).
 

+ 1 - 1
README.rst

@@ -97,7 +97,7 @@ Celery is...
 
         celery = Celery('hello', broker='amqp://guest@localhost//')
 
-        @celery.task()
+        @celery.task
         def hello():
             return 'hello world'
 

+ 11 - 4
celery/app/utils.py

@@ -52,6 +52,11 @@ class Settings(datastructures.ConfigurationView):
         return (os.environ.get('CELERY_BROKER_URL') or
                 self.first('BROKER_URL', 'BROKER_HOST'))
 
+    @property
+    def CELERY_TIMEZONE(self):
+        # this way we also support django's time zone.
+        return self.first('CELERY_TIMEZONE', 'TIME_ZONE')
+
     def without_defaults(self):
         """Returns the current configuration, but without defaults."""
         # the last stash is the default settings, so just skip that
@@ -131,10 +136,12 @@ def bugreport(app):
     import kombu
 
     try:
-        trans = app.connection().transport
-        driver_v = '{0}:{1}'.format(trans.driver_name, trans.driver_version())
+        conn = app.connection()
+        driver_v = '{0}:{1}'.format(conn.transport.driver_name,
+                                    conn.transport.driver_version())
+        transport = conn.transport_cls
     except Exception:
-        driver_v = ''
+        transport = driver_v = ''
 
     return BUGREPORT_INFO.format(
         system=_platform.system(),
@@ -145,7 +152,7 @@ def bugreport(app):
         billiard_v=billiard.__version__,
         py_v=_platform.python_version(),
         driver_v=driver_v,
-        transport=app.conf.BROKER_TRANSPORT or 'amqp',
+        transport=transport,
         results=app.conf.CELERY_RESULT_BACKEND or 'disabled',
         human_settings=app.conf.humanize(),
         loader=qualname(app.loader.__class__),

+ 3 - 1
celery/concurrency/base.py

@@ -56,10 +56,12 @@ class BasePool(object):
     #: only used by multiprocessing pool
     uses_semaphore = False
 
-    def __init__(self, limit=None, putlocks=True, **options):
+    def __init__(self, limit=None, putlocks=True, forking_enable=True,
+            **options):
         self.limit = limit
         self.putlocks = putlocks
         self.options = options
+        self.forking_enable = forking_enable
         self._does_debug = logger.isEnabledFor(logging.DEBUG)
 
     def on_start(self):

+ 4 - 1
celery/concurrency/processes.py

@@ -13,12 +13,14 @@ from __future__ import absolute_import
 
 import os
 
+from billiard import forking_enable
+from billiard.pool import Pool, RUN, CLOSE
+
 from celery import platforms
 from celery import signals
 from celery._state import set_default_app
 from celery.concurrency.base import BasePool
 from celery.task import trace
-from billiard.pool import Pool, RUN, CLOSE
 
 #: List of signals to reset when a child process starts.
 WORKER_SIGRESET = frozenset(['SIGTERM',
@@ -69,6 +71,7 @@ class TaskPool(BasePool):
         Will pre-fork all workers so they're ready to accept tasks.
 
         """
+        forking_enable(self.forking_enable)
         P = self._pool = self.Pool(processes=self.limit,
                                    initializer=process_initializer,
                                    **self.options)

+ 19 - 14
celery/schedules.py

@@ -16,9 +16,10 @@ from dateutil.relativedelta import relativedelta
 
 from . import current_app
 from .utils import is_iterable
-from .utils.timeutils import (timedelta_seconds, weekday, maybe_timedelta,
-                              remaining, humanize_seconds, is_naive, to_utc,
-                              timezone)
+from .utils.timeutils import (
+    timedelta_seconds, weekday, maybe_timedelta, remaining,
+    humanize_seconds, timezone, maybe_make_aware
+)
 from .datastructures import AttributeDict
 
 CRON_PATTERN_INVALID = """\
@@ -48,12 +49,8 @@ class schedule(object):
         return (self.nowfun or current_app.now)()
 
     def remaining_estimate(self, last_run_at):
-        """Returns when the periodic task should run next as a timedelta."""
-        now = self.now()
-        if not is_naive(last_run_at):
-            now = to_utc(now)
         return remaining(last_run_at, self.run_every,
-                         relative=self.relative, now=now)
+                         self.relative, maybe_make_aware(self.now()))
 
     def is_due(self, last_run_at):
         """Returns tuple of two items `(is_due, next_time_to_run)`,
@@ -394,7 +391,10 @@ class crontab(schedule):
         self.day_of_week = self._expand_cronspec(day_of_week, 7)
         self.day_of_month = self._expand_cronspec(day_of_month, 31, 1)
         self.month_of_year = self._expand_cronspec(month_of_year, 12, 1)
-        self.nowfun = nowfun or current_app.now
+        self.nowfun = nowfun
+
+    def now(self):
+        return (self.nowfun or current_app.now)()
 
     def __repr__(self):
         return ('<crontab: %s %s %s %s %s (m/h/d/dM/MY)>' %
@@ -411,10 +411,10 @@ class crontab(schedule):
                                  self._orig_day_of_month,
                                  self._orig_month_of_year), None)
 
-    def remaining_estimate(self, last_run_at):
+    def remaining_estimate(self, last_run_at, tz=None):
         """Returns when the periodic task should run next as a timedelta."""
-        if not is_naive(last_run_at):
-            last_run_at = last_run_at.astimezone(timezone.utc)
+        tz = tz or self.tz
+        last_run_at = maybe_make_aware(last_run_at)
         dow_num = last_run_at.isoweekday() % 7  # Sunday is day 0, not day 7
 
         execute_this_date = (last_run_at.month in self.month_of_year and
@@ -463,7 +463,8 @@ class crontab(schedule):
                     delta = self._delta_to_next(last_run_at,
                                                 next_hour, next_minute)
 
-        return remaining(last_run_at, delta, now=self.nowfun())
+        return remaining(timezone.to_local(last_run_at, tz),
+                         delta, timezone.to_local(self.now(), tz))
 
     def is_due(self, last_run_at):
         """Returns tuple of two items `(is_due, next_time_to_run)`,
@@ -476,7 +477,7 @@ class crontab(schedule):
         rem = timedelta_seconds(rem_delta)
         due = rem == 0
         if due:
-            rem_delta = self.remaining_estimate(last_run_at=self.nowfun())
+            rem_delta = self.remaining_estimate(self.now())
             rem = timedelta_seconds(rem_delta)
         return due, rem
 
@@ -489,6 +490,10 @@ class crontab(schedule):
                     other.minute == self.minute)
         return other is self
 
+    @property
+    def tz(self):
+        return current_app.conf.CELERY_TIMEZONE
+
 
 def maybe_schedule(s, relative=False):
     if isinstance(s, int):

+ 1 - 0
celery/tests/config.py

@@ -25,6 +25,7 @@ CELERY_QUEUES = (
 )
 
 CELERY_ENABLE_UTC = True
+CELERY_TIMEZONE = 'UTC'
 
 CELERYD_LOG_COLOR = False
 

+ 6 - 0
celery/utils/serialization.py

@@ -135,6 +135,12 @@ class UnpickleableExceptionWrapper(Exception):
 
 def get_pickleable_exception(exc):
     """Make sure exception is pickleable."""
+    try:
+        pickle.loads(pickle.dumps(exc))
+    except Exception:
+        pass
+    else:
+        return exc
     nearest = find_nearest_pickleable_exception(exc)
     if nearest:
         return nearest

+ 21 - 5
celery/utils/timeutils.py

@@ -50,8 +50,8 @@ class _Zone(object):
 
     def to_local(self, dt, local=None, orig=None):
         if is_naive(dt):
-            dt = set_tz(dt, orig or self.utc)
-        return dt.astimezone(self.tz_or_local(local))
+            dt = make_aware(dt, orig or self.utc)
+        return localize(dt, self.tz_or_local(local))
 
     def get_timezone(self, zone):
         if isinstance(zone, basestring):
@@ -140,7 +140,6 @@ def remaining(start, ends_in, now=None, relative=False):
 
     """
     now = now or datetime.utcnow()
-
     end_date = start + ends_in
     if relative:
         end_date = delta_resolution(end_date, ends_in)
@@ -202,7 +201,7 @@ def is_naive(dt):
     return dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None
 
 
-def set_tz(dt, tz):
+def make_aware(dt, tz):
     """Sets the timezone for a datetime object."""
     try:
         localize = tz.localize
@@ -213,6 +212,23 @@ def set_tz(dt, tz):
         return localize(dt, is_dst=None)
 
 
+def localize(dt, tz):
+    """Convert aware datetime to another timezone."""
+    dt = dt.astimezone(tz)
+    try:
+        normalize = tz.normalize
+    except AttributeError:
+        return dt
+    else:
+        return normalize(dt)  # pytz
+
+
 def to_utc(dt):
     """Converts naive datetime to UTC"""
-    return set_tz(dt, timezone.utc)
+    return make_aware(dt, timezone.utc)
+
+
+def maybe_make_aware(dt, tz=None):
+    if is_naive(dt):
+        return to_utc(dt)
+    return localize(dt, timezone.utc if tz is None else tz)

+ 1 - 0
celery/worker/__init__.py

@@ -18,6 +18,7 @@ import traceback
 from threading import Event
 
 from billiard import cpu_count
+from billiard.exceptions import WorkerLostError
 from kombu.syn import detect_environment
 from kombu.utils.finalize import Finalize
 

+ 2 - 2
celery/worker/components.py

@@ -13,7 +13,6 @@ import time
 
 from functools import partial
 
-from billiard import forking_enable
 from billiard.exceptions import WorkerLostError
 
 from celery.utils.log import get_logger
@@ -107,8 +106,8 @@ class Pool(bootsteps.StartStopComponent):
 
     def create(self, w, semaphore=None, max_restarts=None):
         threaded = not w.use_eventloop
-        forking_enable(not threaded or (w.no_execv or not w.force_execv))
         procs = w.min_concurrency
+        forking_enable = not threaded or (w.no_execv or not w.force_execv)
         if not threaded:
             semaphore = w.semaphore = BoundedSemaphore(procs)
             w._quick_acquire = w.semaphore.acquire
@@ -125,6 +124,7 @@ class Pool(bootsteps.StartStopComponent):
                             threads=threaded,
                             max_restarts=max_restarts,
                             allow_restart=allow_restart,
+                            forking_enable=forking_enable,
                             semaphore=semaphore)
         if w.hub:
             w.hub.on_init.append(partial(self.on_poll_init, pool))

+ 9 - 0
docs/django/first-steps-with-django.rst

@@ -92,6 +92,15 @@ Our example task is pretty pointless, it just returns the sum of two
 arguments, but it will do for demonstration, and it is referred to in many
 parts of the Celery documentation.
 
+.. admonition:: Relative Imports
+
+    You have to consistent in how you import the task module, e.g. if
+    you have ``project.app`` in ``INSTALLED_APPS`` then you also
+    need to import the tasks ``from project.app`` or else the names
+    of the tasks will be different.
+
+    See :ref:`task-naming-relative-imports`
+
 Starting the worker process
 ===========================
 

+ 1 - 0
extra/release/doc4allmods

@@ -3,6 +3,7 @@
 PACKAGE="$1"
 SKIP_PACKAGES="$PACKAGE tests management urls"
 SKIP_FILES="celery.__compat__.rst
+            celery.__main__.rst
             celery.task.sets.rst
             celery.bin.rst
             celery.bin.celeryd_detach.rst

+ 1 - 1
requirements/default-py3k.txt

@@ -1,4 +1,4 @@
 billiard>=2.7.3.12
 python-dateutil>=2.1
 pytz
-kombu>=2.3
+kombu>=2.4.0,<3.0

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 billiard>=2.7.3.12
 python-dateutil>=2.1
-kombu>=2.3.1,<3.0
+kombu>=2.4.0,<3.0

+ 2 - 2
setup.cfg

@@ -16,7 +16,7 @@ upload-dir = docs/.build/html
 [bdist_rpm]
 requires = uuid
            importlib
-           billiard>=2.7.3.12
+           billiard >= 2.7.3.12
            python-dateutil >= 2.1
-           kombu >= 2.3.1
+           kombu >= 2.4.0
            ordereddict