Prechádzať zdrojové kódy

Merge branch '3.0'

Conflicts:
	Changelog
	celery/apps/worker.py
	celery/canvas.py
	celery/utils/threads.py
	celery/utils/timeutils.py
	celery/worker/__init__.py
	celery/worker/consumer.py
	requirements/README.rst
	requirements/default.txt
	setup.cfg
	setup.py
	tox.ini
Ask Solem 12 rokov pred
rodič
commit
ded23cadcf

+ 8 - 0
Changelog

@@ -20,6 +20,14 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 - `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
 - `App.control.Inspect.conf` can be used for inspecting worker configuration
 
+.. _version-3.0.10:
+
+3.0.10
+======
+:release-date: TBA
+
+- Now depends on kombu 2.4.6
+
 .. _version-3.0.9:
 
 3.0.9

+ 1 - 1
celery/app/amqp.py

@@ -14,10 +14,10 @@ from weakref import WeakValueDictionary
 from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu.common import entry_to_queue
 from kombu.pools import ProducerPool
+from kombu.utils import cached_property, uuid
 from kombu.utils.encoding import safe_repr
 
 from celery import signals
-from celery.utils import cached_property, uuid
 from celery.utils.text import indent as textindent
 
 from . import app_or_default

+ 1 - 1
celery/apps/worker.py

@@ -120,7 +120,7 @@ class Worker(WorkController):
 
     def on_consumer_ready(self, consumer):
         signals.worker_ready.send(sender=consumer)
-        print('celery@{0.hostname} has started.'.format(self))
+        print('celery@{0.hostname} ready.'.format(self))
 
     def redirect_stdouts_to_logger(self):
         self.app.log.setup(self.loglevel, self.logfile,

+ 2 - 1
celery/backends/cache.py

@@ -8,9 +8,10 @@
 """
 from __future__ import absolute_import
 
+from kombu.utils import cached_property
+
 from celery.datastructures import LRUCache
 from celery.exceptions import ImproperlyConfigured
-from celery.utils import cached_property
 
 from .base import KeyValueStoreBackend
 

+ 1 - 1
celery/backends/redis.py

@@ -8,10 +8,10 @@
 """
 from __future__ import absolute_import
 
+from kombu.utils import cached_property
 from kombu.utils.url import _parse_url
 
 from celery.exceptions import ImproperlyConfigured
-from celery.utils import cached_property
 
 from .base import KeyValueStoreBackend
 

+ 1 - 2
celery/beat.py

@@ -18,7 +18,7 @@ import traceback
 from threading import Event, Thread
 
 from billiard import Process, ensure_multiprocessing
-from kombu.utils import reprcall
+from kombu.utils import cached_property, reprcall
 from kombu.utils.functional import maybe_promise
 
 from . import __version__
@@ -27,7 +27,6 @@ from . import signals
 from . import current_app
 from .app import app_or_default
 from .schedules import maybe_schedule, crontab
-from .utils import cached_property
 from .utils.imports import instantiate
 from .utils.timeutils import humanize_seconds
 from .utils.log import get_logger

+ 7 - 6
celery/canvas.py

@@ -11,14 +11,14 @@
 """
 from __future__ import absolute_import
 
+from copy import deepcopy
 from operator import itemgetter
 from itertools import chain as _chain, imap
 
-from kombu.utils import fxrange, kwdict, reprcall
+from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 
 from celery import current_app
 from celery.local import Proxy
-from celery.utils import cached_property, uuid
 from celery.utils.functional import (
     maybe_list, is_list, regen,
     chunks as _chunks,
@@ -116,10 +116,11 @@ class Signature(dict):
                 dict(self.kwargs, **kwargs) if kwargs else self.kwargs,
                 dict(self.options, **options) if options else self.options)
 
-    def clone(self, args=(), kwargs={}, **options):
-        args, kwargs, options = self._merge(args, kwargs, options)
-        s = Signature.from_dict({'task': self.task, 'args': args,
-                                 'kwargs': kwargs, 'options': options,
+    def clone(self, args=(), kwargs={}, **opts):
+        # need to deepcopy options so origins links etc. is not modified.
+        args, kwargs, opts = self._merge(args, kwargs, opts)
+        s = Signature.from_dict({'task': self.task, 'args': tuple(args),
+                                 'kwargs': kwargs, 'options': deepcopy(opts),
                                  'subtask_type': self.subtask_type,
                                  'immutable': self.immutable})
         s._type = self._type

+ 4 - 1
celery/concurrency/__init__.py

@@ -8,7 +8,10 @@
 """
 from __future__ import absolute_import
 
-from celery.local import symbol_by_name
+# Import from kombu directly as it's used
+# early in the import stage, where celery.utils loads
+# too much (e.g. for eventlet patching)
+from kombu.utils import symbol_by_name
 
 ALIASES = {
     'processes': 'celery.concurrency.processes:TaskPool',

+ 1 - 0
celery/concurrency/gevent.py

@@ -7,6 +7,7 @@
 
 """
 from __future__ import absolute_import
+from __future__ import with_statement
 
 import os
 

+ 0 - 6
celery/concurrency/threads.py

@@ -8,16 +8,10 @@
 """
 from __future__ import absolute_import
 
-import os
-
 from celery.utils.compat import UserDict
 
 from .base import apply_target, BasePool
 
-#: Makes sure we don't use threading.local for stacks
-#: since apparently they don't work properly.
-os.environ['USE_PURE_LOCALS'] = '1'
-
 
 class NullDict(UserDict):
 

+ 1 - 1
celery/loaders/base.py

@@ -16,11 +16,11 @@ import re
 from datetime import datetime
 from itertools import imap
 
+from kombu.utils import cached_property
 from kombu.utils.encoding import safe_str
 
 from celery.datastructures import DictAttribute
 from celery.exceptions import ImproperlyConfigured
-from celery.utils import cached_property
 from celery.utils.imports import import_from_cwd, symbol_by_name
 from celery.utils.functional import maybe_list
 

+ 3 - 2
celery/result.py

@@ -14,13 +14,14 @@ from collections import deque
 from copy import copy
 from itertools import imap
 
+from kombu.utils import cached_property
+from kombu.utils.compat import OrderedDict
+
 from . import current_app
 from . import states
 from .app import app_or_default
 from .datastructures import DependencyGraph
 from .exceptions import IncompleteStream, TimeoutError
-from .utils import cached_property
-from .utils.compat import OrderedDict
 
 
 def from_serializable(r):

+ 2 - 1
celery/tests/backends/test_redis.py

@@ -6,13 +6,14 @@ from mock import Mock, patch
 from nose import SkipTest
 from pickle import loads, dumps
 
+from kombu.utils import cached_property, uuid
+
 from celery import current_app
 from celery import states
 from celery.datastructures import AttributeDict
 from celery.exceptions import ImproperlyConfigured
 from celery.result import AsyncResult
 from celery.task import subtask
-from celery.utils import cached_property, uuid
 from celery.utils.timeutils import timedelta_seconds
 
 from celery.tests.utils import Case

+ 7 - 6
celery/tests/concurrency/test_gevent.py

@@ -111,9 +111,10 @@ class test_TasKPool(Case):
 class test_Timer(Case):
 
     def test_timer(self):
-        x = Timer()
-        x.ensure_started()
-        x.schedule = Mock()
-        x.start()
-        x.stop()
-        x.schedule.clear.assert_called_with()
+        with mock_module(*gevent_modules):
+            x = Timer()
+            x.ensure_started()
+            x.schedule = Mock()
+            x.start()
+            x.stop()
+            x.schedule.clear.assert_called_with()

+ 2 - 2
celery/tests/utilities/test_datastructures.py

@@ -9,7 +9,7 @@ from celery.datastructures import (
     ConfigurationView,
     DependencyGraph,
 )
-from celery.utils.threads import TIMEOUT_MAX
+from celery.utils.compat import THREAD_TIMEOUT_MAX
 from celery.tests.utils import Case, WhateverIO
 
 
@@ -222,7 +222,7 @@ class test_LRUCache(Case):
             def stop(self):
                 self._is_shutdown.set()
                 self._is_stopped.wait()
-                self.join(TIMEOUT_MAX)
+                self.join(THREAD_TIMEOUT_MAX)
 
         burglar = Burglar(x)
         burglar.start()

+ 3 - 0
celery/tests/worker/test_worker.py

@@ -631,6 +631,9 @@ class test_Consumer(Case):
             def channel(self):
                 return Mock()
 
+            def as_uri(self):
+                return 'dummy://'
+
             def drain_events(self, **kwargs):
                 if not self.calls:
                     self.calls += 1

+ 7 - 3
celery/utils/compat.py

@@ -43,10 +43,14 @@ else:
 
 
 ############## collections.OrderedDict ######################################
+# was moved to kombu
+from kombu.utils.compat import OrderedDict  # noqa
+
+############## threading.TIMEOUT_MAX #######################################
 try:
-    from collections import OrderedDict
-except ImportError:                         # pragma: no cover
-    from ordereddict import OrderedDict     # noqa
+    from threading import TIMEOUT_MAX as THREAD_TIMEOUT_MAX
+except ImportError:
+    THREAD_TIMEOUT_MAX = 1e10  # noqa
 
 ############## format(int, ',d') ##########################
 

+ 2 - 1
celery/utils/functional.py

@@ -16,8 +16,9 @@ from itertools import islice
 
 from kombu.utils import cached_property
 from kombu.utils.functional import promise, maybe_promise
+from kombu.utils.compat import OrderedDict
 
-from .compat import UserDict, UserList, OrderedDict
+from .compat import UserDict, UserList
 
 KEYWORD_MARK = object()
 is_not_None = partial(operator.is_not, None)

+ 1 - 4
celery/utils/imports.py

@@ -15,10 +15,7 @@ import sys
 
 from contextlib import contextmanager
 
-# symbol_by_name was moved to local because it's used
-# early in the import stage, where celery.utils loads
-# too much (e.g. for eventlet patching)
-from celery.local import symbol_by_name
+from kombu.utils import symbol_by_name
 
 from .compat import reload
 

+ 4 - 11
celery/utils/threads.py

@@ -13,17 +13,10 @@ import sys
 import threading
 import traceback
 
-from kombu.syn import _detect_environment
-
 from celery.local import Proxy
+from celery.utils.compat import THREAD_TIMEOUT_MAX
 
-USE_PURE_LOCALS = os.environ.get('USE_PURE_LOCALS')
-
-
-try:
-    TIMEOUT_MAX = threading.TIMEOUT_MAX
-except AttributeError:
-    TIMEOUT_MAX = 1e10  # noqa
+USE_FAST_LOCALS = os.environ.get('USE_FAST_LOCALS')
 
 
 class bgThread(threading.Thread):
@@ -76,7 +69,7 @@ class bgThread(threading.Thread):
         self._is_shutdown.set()
         self._is_stopped.wait()
         if self.is_alive():
-            self.join(TIMEOUT_MAX)
+            self.join(THREAD_TIMEOUT_MAX)
 
 try:
     from greenlet import getcurrent as get_ident
@@ -301,7 +294,7 @@ class _FastLocalStack(threading.local):
         except (AttributeError, IndexError):
             return None
 
-if _detect_environment() == 'default' and not USE_PURE_LOCALS:
+if USE_FAST_LOCALS:
     LocalStack = _FastLocalStack
 else:
     # - See #706

+ 2 - 2
celery/utils/timer2.py

@@ -19,7 +19,7 @@ from functools import wraps
 from itertools import count, imap
 from time import time, sleep, mktime
 
-from celery.utils.threads import TIMEOUT_MAX
+from celery.utils.compat import THREAD_TIMEOUT_MAX
 from kombu.log import get_logger
 
 VERSION = (1, 0, 0)
@@ -274,7 +274,7 @@ class Timer(threading.Thread):
         if self.running:
             self._is_shutdown.set()
             self._is_stopped.wait()
-            self.join(TIMEOUT_MAX)
+            self.join(THREAD_TIMEOUT_MAX)
             self.running = False
 
     def ensure_started(self):

+ 7 - 0
celery/worker/__init__.py

@@ -43,6 +43,7 @@ try:
 except ImportError:  # pragma: no cover
     IGNORE_ERRORS = ()
 
+#: Worker states
 RUN = 0x1
 CLOSE = 0x2
 TERMINATE = 0x3
@@ -55,6 +56,9 @@ If you want to automatically declare unknown queues you can
 enable the CELERY_CREATE_MISSING_QUEUES setting.
 """
 
+#: Default socket timeout at shutdown.
+SHUTDOWN_SOCKET_TIMEOUT = 5.0
+
 
 class Namespace(bootsteps.Namespace):
     """This is the boot-step namespace of the :class:`WorkController`.
@@ -267,6 +271,8 @@ class WorkController(configurated):
 
     def _shutdown(self, warm=True):
         what = 'Stopping' if warm else 'Terminating'
+        socket_timeout = socket.getdefaulttimeout()
+        socket.setdefaulttimeout(SHUTDOWN_SOCKET_TIMEOUT)  # Issue 975
 
         if self._state in (self.CLOSE, self.TERMINATE):
             return
@@ -297,6 +303,7 @@ class WorkController(configurated):
         if self.pidlock:
             self.pidlock.release()
         self._state = self.TERMINATE
+        socket.setdefaulttimeout(socket_timeout)
         self._shutdown_complete.set()
 
     def reload(self, modules=None, reload=False, reloader=None):

+ 3 - 2
celery/worker/consumer.py

@@ -111,6 +111,7 @@ Received and deleted unknown message. Wrong destination?!?
 
 The full contents of the message body was: %s
 """
+
 #: Error message for when an unregistered task is received.
 UNKNOWN_TASK_ERROR = """\
 Received unregistered task of type %s.
@@ -152,7 +153,7 @@ consumer: Cannot connect to %s: %s.
 """
 
 CONNECTION_RETRY = """\
-Trying again %(when)s...\
+Trying again {when}...\
 """
 
 CONNECTION_FAILOVER = """\
@@ -769,7 +770,7 @@ class Consumer(object):
             if getattr(conn, 'alt', None) and interval == 0:
                 next_step = CONNECTION_FAILOVER
             error(CONNECTION_ERROR, conn.as_uri(), exc,
-                  next_step % {'when': humanize_seconds(interval, 'in', ' ')})
+                  next_step.format(when=humanize_seconds(interval, 'in', ' ')))
 
         # remember that the connection is lazy, it won't establish
         # until it's needed.

+ 2 - 1
celery/worker/state.py

@@ -17,9 +17,10 @@ import shelve
 
 from collections import defaultdict
 
+from kombu.utils import cached_property
+
 from celery import __version__
 from celery.datastructures import LimitedSet
-from celery.utils import cached_property
 
 #: Worker software/platform information.
 SOFTWARE_INFO = {'sw_ident': 'py-celery',

+ 6 - 4
docs/userguide/tasks.rst

@@ -398,8 +398,10 @@ General
 
 .. attribute:: Task.rate_limit
 
-    Set the rate limit for this task type, i.e. how many times in
-    a given period of time is the task allowed to run.
+    Set the rate limit for this task type which limits the number of tasks
+    that can be run in a given time frame.  Tasks will still complete when
+    a rate limit is in effect, but it may take some time before it's allowed to
+    start.
 
     If this is :const:`None` no rate limit is in effect.
     If it is an integer, it is interpreted as "tasks per second".
@@ -699,8 +701,8 @@ state metadata.  This can then be used to create e.g. progress bars.
 Creating pickleable exceptions
 ------------------------------
 
-A little known Python fact is that exceptions must behave a certain
-way to support being pickled.
+A rarely known Python fact is that exceptions must conform to some
+simple rules to support being serialized by the pickle module.
 
 Tasks that raise exceptions that are not pickleable will not work
 properly when Pickle is used as the serializer.

+ 4 - 1
funtests/benchmarks/bench_worker.py

@@ -4,7 +4,10 @@ import os
 import sys
 import time
 
-os.environ['NOSETPS'] = 'yes'
+os.environ.update(
+    NOSETPS='yes',
+    USE_FAST_LOCALS='yes',
+)
 
 import anyjson
 JSONIMP = os.environ.get('JSONIMP')

+ 4 - 10
requirements/README.rst

@@ -14,10 +14,6 @@ Index
 
     Default requirements for Python 3.2+.
 
-* :file:`requirements/py26.txt`
-
-    Extra requirements needed to run on Python 2.6.
-
 * :file:`requirements/jython.txt`
 
     Extra requirements needed to run on Jython 2.5
@@ -47,15 +43,13 @@ Index
 
     Requirement file installing the current master branch of Celery and deps.
 
-
 Examples
 ========
 
-Running the tests using Python 2.6
-----------------------------------
+Running the tests
+-----------------
 
 ::
 
-    $ pip -E $VIRTUAL_ENV install -U -r requirements/default.txt
-    $ pip -E $VIRTUAL_ENV install -U -r requirements/test.txt
-
+    $ pip install -U -r requirements/default.txt
+    $ pip install -U -r requirements/test.txt

+ 1 - 1
requirements/default-py3k.txt

@@ -1,4 +1,4 @@
 billiard>=2.7.3.12
 python-dateutil>=2.1
 pytz
-kombu>=2.4.5,<3.0
+kombu>=2.4.6,<3.0

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 billiard>=2.7.3.12
 python-dateutil>=2.1
-kombu>=2.4.5,<3.0
+kombu>=2.4.6,<3.0

+ 0 - 2
requirements/py26.txt

@@ -1,2 +0,0 @@
-importlib
-ordereddict

+ 2 - 5
setup.cfg

@@ -14,9 +14,6 @@ all_files = 1
 upload-dir = docs/.build/html
 
 [bdist_rpm]
-requires = uuid
-           importlib
-           billiard >= 2.7.3.12
+requires = billiard>=2.7.3.12
            python-dateutil >= 2.1
-           kombu >= 2.4.5
-           ordereddict
+           kombu >= 2.4.6

+ 0 - 2
setup.py

@@ -155,8 +155,6 @@ install_requires = reqs('default-py3k.txt' if is_py3k else 'default.txt')
 
 if is_jython:
     install_requires.extend(reqs('jython.txt'))
-if py_version[0:2] == (2, 6):
-    install_requires.extend(reqs('py26.txt'))
 
 # -*- Tests Requires -*-
 

+ 0 - 1
tox.ini

@@ -49,7 +49,6 @@ commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}
 [testenv:py26]
 basepython = python2.6
 deps = -r{toxinidir}/requirements/default.txt
-       -r{toxinidir}/requirements/py26.txt
        -r{toxinidir}/requirements/test.txt
        -r{toxinidir}/requirements/test-ci.txt
 commands = {toxinidir}/extra/release/removepyc.sh {toxinidir}