Kaynağa Gözat

Merge branch 'master' into 5.0-devel

Ask Solem 8 yıl önce
ebeveyn
işleme
0833afc03e
100 değiştirilmiş dosya ile 1580 ekleme ve 736 silme
  1. 5 4
      .bumpversion.cfg
  2. 0 3
      .travis.yml
  3. 5 0
      CONTRIBUTORS.txt
  4. 189 0
      Changelog
  5. 1 1
      README.rst
  6. 1 1
      celery/__init__.py
  7. 3 0
      celery/_state.py
  8. 21 17
      celery/app/amqp.py
  9. 26 6
      celery/app/base.py
  10. 5 1
      celery/app/defaults.py
  11. 6 2
      celery/app/registry.py
  12. 12 5
      celery/app/task.py
  13. 3 4
      celery/app/trace.py
  14. 10 0
      celery/app/utils.py
  15. 16 10
      celery/apps/worker.py
  16. 2 2
      celery/backends/base.py
  17. 7 1
      celery/backends/cache.py
  18. 1 0
      celery/backends/elasticsearch.py
  19. 11 10
      celery/backends/redis.py
  20. 1 1
      celery/beat.py
  21. 1 0
      celery/bin/amqp.py
  22. 5 2
      celery/bin/base.py
  23. 1 0
      celery/bin/beat.py
  24. 3 0
      celery/bin/call.py
  25. 1 0
      celery/bin/celeryd_detach.py
  26. 1 0
      celery/bin/events.py
  27. 1 0
      celery/bin/multi.py
  28. 1 4
      celery/canvas.py
  29. 8 6
      celery/concurrency/asynpool.py
  30. 1 1
      celery/contrib/migrate.py
  31. 49 6
      celery/contrib/pytest.py
  32. 2 1
      celery/contrib/testing/app.py
  33. 13 4
      celery/contrib/testing/worker.py
  34. 2 1
      celery/events/dispatcher.py
  35. 1 0
      celery/events/dumper.py
  36. 4 4
      celery/events/snapshot.py
  37. 1 1
      celery/events/state.py
  38. 1 1
      celery/exceptions.py
  39. 10 7
      celery/loaders/base.py
  40. 3 1
      celery/local.py
  41. 2 1
      celery/platforms.py
  42. 4 4
      celery/result.py
  43. 0 2
      celery/security/serialization.py
  44. 112 52
      celery/signals.py
  45. 1 0
      celery/states.py
  46. 4 4
      celery/utils/collections.py
  47. 255 0
      celery/utils/dispatch/LICENSE.python
  48. 8 7
      celery/utils/dispatch/license.txt
  49. 0 272
      celery/utils/dispatch/saferef.py
  50. 161 67
      celery/utils/dispatch/signal.py
  51. 70 0
      celery/utils/dispatch/weakref_backports.py
  52. 41 4
      celery/utils/functional.py
  53. 2 0
      celery/utils/log.py
  54. 2 0
      celery/utils/nodenames.py
  55. 107 29
      celery/utils/saferepr.py
  56. 2 1
      celery/utils/term.py
  57. 2 0
      celery/utils/text.py
  58. 1 0
      celery/utils/threads.py
  59. 2 0
      celery/utils/time.py
  60. 14 4
      celery/worker/consumer/consumer.py
  61. 11 4
      celery/worker/consumer/events.py
  62. 1 1
      celery/worker/pidbox.py
  63. 2 3
      celery/worker/request.py
  64. 6 6
      celery/worker/state.py
  65. 1 1
      docs/getting-started/brokers/rabbitmq.rst
  66. 1 1
      docs/getting-started/brokers/redis.rst
  67. 4 4
      docs/getting-started/first-steps-with-celery.rst
  68. 2 2
      docs/includes/installation.txt
  69. 1 1
      docs/includes/introduction.txt
  70. 0 11
      docs/internals/reference/celery.utils.dispatch.saferef.rst
  71. 11 0
      docs/internals/reference/celery.utils.dispatch.weakref_backports.rst
  72. 1 1
      docs/internals/reference/index.rst
  73. 59 0
      docs/sec/CELERYSA-0003.txt
  74. 2 2
      docs/userguide/calling.rst
  75. 23 10
      docs/userguide/configuration.rst
  76. 15 8
      docs/userguide/daemonizing.rst
  77. 6 6
      docs/userguide/routing.rst
  78. 1 1
      docs/userguide/security.rst
  79. 1 1
      docs/userguide/tasks.rst
  80. 45 0
      docs/userguide/testing.rst
  81. 1 1
      docs/userguide/workers.rst
  82. 7 7
      docs/whatsnew-4.0.rst
  83. 1 1
      requirements/default.txt
  84. 1 0
      requirements/extras/sqs.txt
  85. 1 1
      setup.cfg
  86. 12 5
      t/unit/app/test_amqp.py
  87. 14 1
      t/unit/app/test_app.py
  88. 7 0
      t/unit/app/test_registry.py
  89. 8 0
      t/unit/backends/test_redis.py
  90. 1 0
      t/unit/bin/test_base.py
  91. 1 1
      t/unit/bin/test_events.py
  92. 6 4
      t/unit/conftest.py
  93. 2 0
      t/unit/events/test_snapshot.py
  94. 3 2
      t/unit/security/test_key.py
  95. 1 1
      t/unit/tasks/test_canvas.py
  96. 2 0
      t/unit/tasks/test_context.py
  97. 8 0
      t/unit/tasks/test_result.py
  98. 3 4
      t/unit/utils/test_dispatcher.py
  99. 84 5
      t/unit/utils/test_functional.py
  100. 0 86
      t/unit/utils/test_saferef.py

+ 5 - 4
.bumpversion.cfg

@@ -1,14 +1,15 @@
 [bumpversion]
-current_version = 4.0.0rc2
+current_version = 4.0.2
 commit = True
 tag = True
 parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?P<releaselevel>[a-z]+)?
-serialize =
-    {major}.{minor}.{patch}{releaselevel}
-    {major}.{minor}.{patch}
+serialize = 
+	{major}.{minor}.{patch}{releaselevel}
+	{major}.{minor}.{patch}
 
 [bumpversion:file:celery/__init__.py]
 
 [bumpversion:file:docs/includes/introduction.txt]
 
 [bumpversion:file:README.rst]
+

+ 0 - 3
.travis.yml

@@ -21,9 +21,6 @@ env:
     - TOXENV=pypy-unit PYPY_VERSION="5.3"
     - TOXENV=pypy-integration-rabbitmq PYPY_VERSION="5.3"
     - TOXENV=pypy-integration-redis PYPY_VERSION="5.3"
-    - TOXENV=pypy3-unit
-    - TOXENV=pypy3-integration-rabbitmq
-    - TOXENV=pypy3-integration-redis
     - TOXENV=flake8
     - TOXENV=flakeplus
     - TOXENV=apicheck

+ 5 - 0
CONTRIBUTORS.txt

@@ -220,3 +220,8 @@ Andrew Stewart, 2016/07/04
 Xin Li, 2016/08/03
 Alli Witheford, 2016/09/29
 Marat Sharafutdinov, 2016/11/04
+Viktor Holmqvist, 2016/12/02
+Rick Wargo, 2016/12/02
+zhengxiaowai, 2016/12/07
+Michael Howitz, 2016/12/08
+Andreas Pelme, 2016/12/13

+ 189 - 0
Changelog

@@ -8,6 +8,195 @@ This document contains change notes for bugfix releases in
 the 4.0.x series (latentcall), please see :ref:`whatsnew-4.0` for
 an overview of what's new in Celery 4.0.
 
+.. _version-4.0.2:
+
+4.0.2
+=====
+:release-date: 2016-12-15 03:40 PM PST
+:release-by: Ask Solem
+
+- **Requirements**
+
+    - Now depends on :ref:`Kombu 4.0.2 <kombu:version-4.0.2>`.
+
+- **Tasks**: Fixed problem with JSON serialization of `group`
+  (``keys must be string`` error, Issue #3688).
+
+- **Worker**: Fixed JSON serialization issue when using ``inspect active``
+  and friends (Issue #3667).
+
+- **App**: Fixed saferef errors when using signals (Issue #3670).
+
+- **Prefork**: Fixed bug with pack requiring bytes argument
+  on Python 2.7.5 and earlier (Issue #3674).
+
+- **Tasks**: Saferepr did not handle unicode in bytestrings on Python 2
+  (Issue #3676).
+
+- **Testing**: Added new ``celery_worker_paremeters`` fixture.
+
+    Contributed by **Michael Howitz**.
+
+- **Tasks**: Added new ``app`` argument to ``GroupResult.restore``
+  (Issue #3669).
+
+    This makes the restore method behave the same way as the ``GroupResult``
+    constructor.
+
+    Contributed by **Andreas Pelme**.
+
+- **Tasks**: Fixed type checking crash when task takes ``*args`` on Python 3
+  (Issue #3678).
+
+- Documentation and examples improvements by:
+
+    - **BLAGA Razvan-Paul**
+    - **Michael Howitz**
+    - :github_user:`paradox41`
+
+.. _version-4.0.1:
+
+4.0.1
+=====
+:release-date: 2016-12-08 05:22 PM PST
+:release-by: Ask Solem
+
+* [Security: `CELERYSA-0003`_] Insecure default configuration
+
+    The default :setting:`accept_content` setting was set to allow
+    deserialization of pickled messages in Celery 4.0.0.
+
+    The insecure default has been fixed in 4.0.1, and you can also
+    configure the 4.0.0 version to explicitly only allow json serialized
+    messages:
+
+    .. code-block:: python
+
+        app.conf.accept_content = ['json']
+
+.. _`CELERYSA-0003`:
+    https://github.com/celery/celery/tree/master/docs/sec/CELERYSA-0003.txt
+
+- **Tasks**: Added new method to register class-based tasks (Issue #3615).
+
+    To register a class based task you should now call ``app.register_task``:
+
+    .. code-block:: python
+
+        from celery import Celery, Task
+
+        app = Celery()
+
+        class CustomTask(Task):
+
+            def run(self):
+                return 'hello'
+
+        app.register_task(CustomTask())
+
+- **Tasks**: Argument checking now supports keyword-only arguments on Python3
+  (Issue #3658).
+
+    Contributed by :github_user:`sww`.
+
+- **Tasks**: The ``task-sent`` event was not being sent even if
+  configured to do so (Issue #3646).
+
+- **Worker**: Fixed AMQP heartbeat support for eventlet/gevent pools
+  (Issue #3649).
+
+- **App**: ``app.conf.humanize()`` would not work if configuration
+  not finalized (Issue #3652).
+
+- **Utils**: ``saferepr`` attempted to show iterables as lists
+  and mappings as dicts.
+
+- **Utils**: ``saferepr`` did not handle unicode-errors
+  when attempting to format ``bytes`` on Python 3 (Issue #3610).
+
+- **Utils**: ``saferepr`` should now properly represent byte strings
+  with non-ascii characters (Issue #3600).
+
+- **Results**: Fixed bug in elasticsearch where _index method missed
+  the body argument (Issue #3606).
+
+    Fix contributed by **何翔宇** (Sean Ho).
+
+- **Canvas**: Fixed :exc:`ValueError` in chord with single task header
+  (Issue #3608).
+
+    Fix contributed by **Viktor Holmqvist**.
+
+- **Task**: Ensure class-based task has name prior to registration
+  (Issue #3616).
+
+    Fix contributed by **Rick Wargo**.
+
+- **Beat**: Fixed problem with strings in shelve (Issue #3644).
+
+    Fix contributed by **Alli**.
+
+- **Worker**: Fixed :exc:`KeyError` in ``inspect stats`` when ``-O`` argument
+  set to something other than ``fast`` or ``fair`` (Issue #3621).
+
+- **Task**: Retried tasks were no longer sent to the original queue
+  (Issue #3622).
+
+- **Worker**: Python 3: Fixed None/int type comparison in
+  :file:`apps/worker.py` (Issue #3631).
+
+- **Results**: Redis has a new :setting:`redis_socket_connect_timeout`
+  setting.
+
+- **Results**: Redis result backend passed the ``socket_connect_timeout``
+  argument to UNIX socket based connections by mistake, causing a crash.
+
+- **Worker**: Fixed missing logo in worker splash screen when running on
+  Python 3.x (Issue #3627).
+
+    Fix contributed by **Brian Luan**.
+
+- **Deps**: Fixed ``celery[redis]`` bundle installation (Issue #3643).
+
+    Fix contributed by **Rémi Marenco**.
+
+- **Deps**: Bundle ``celery[sqs]`` now also requires :pypi:`pycurl`
+  (Issue #3619).
+
+- **Worker**: Hard time limits were no longer being respected (Issue #3618).
+
+- **Worker**: Soft time limit log showed ``Trues`` instead of the number
+  of seconds.
+
+- **App**: ``registry_cls`` argument no longer had any effect (Issue #3613).
+
+- **Worker**: Event producer now uses ``connection_for_Write`` (Issue #3525).
+
+- **Results**: Redis/memcache backends now uses :setting:`result_expires`
+  to expire chord counter (Issue #3573).
+
+    Contributed by **Tayfun Sen**.
+
+- **Django**: Fixed command for upgrading settings with Django (Issue #3563).
+
+    Fix contributed by **François Voron**.
+
+- **Testing**: Added a ``celery_parameters`` test fixture to be able to use
+  customized ``Celery`` init parameters. (#3626)
+
+    Contributed by **Steffen Allner**.
+
+- Documentation improvements contributed by
+
+    - :github_user:`csfeathers`
+    - **Moussa Taifi**
+    - **Yuhannaa**
+    - **Laurent Peuch**
+    - **Christian**
+    - **Bruno Alla**
+    - **Steven Johns**
+    - :github_user:`tnir`
+    - **GDR!**
 
 .. _version-4.0.0:
 

+ 1 - 1
README.rst

@@ -2,7 +2,7 @@
 
 |build-status| |license| |wheel| |pyversion| |pyimp|
 
-:Version: 4.0.0 (latentcall)
+:Version: 4.0.2 (latentcall)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: https://github.com/celery/celery/

+ 1 - 1
celery/__init__.py

@@ -12,7 +12,7 @@ from collections import namedtuple
 
 SERIES = 'latentcall'
 
-__version__ = '4.0.0'
+__version__ = '4.0.2'
 __author__ = 'Ask Solem'
 __contact__ = 'ask@celeryproject.org'
 __homepage__ = 'http://celeryproject.org'

+ 3 - 0
celery/_state.py

@@ -65,6 +65,8 @@ class _TLS(threading.local):
     #: sets this, so it will always contain the last instantiated app,
     #: and is the default app returned by :func:`app_or_default`.
     current_app = None
+
+
 _tls = _TLS()
 
 _task_stack = LocalStack()
@@ -187,6 +189,7 @@ def disable_trace():
     global app_or_default
     app_or_default = _app_or_default
 
+
 if os.environ.get('CELERY_TRACE_APP'):  # pragma: no cover
     enable_trace()
 else:

+ 21 - 17
celery/app/amqp.py

@@ -58,15 +58,16 @@ class Queues(dict):
 
     def __init__(self, queues=None, default_exchange=None,
                  create_missing=True, ha_policy=None, autoexchange=None,
-                 max_priority=None):
+                 max_priority=None, default_routing_key=None):
         dict.__init__(self)
         self.aliases = WeakValueDictionary()
         self.default_exchange = default_exchange
+        self.default_routing_key = default_routing_key
         self.create_missing = create_missing
         self.ha_policy = ha_policy
         self.autoexchange = Exchange if autoexchange is None else autoexchange
         self.max_priority = max_priority
-        if isinstance(queues, (tuple, list)):
+        if queues is not None and not isinstance(queues, Mapping):
             queues = {q.name: q for q in queues}
         for name, q in (queues or {}).items():
             self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
@@ -108,6 +109,20 @@ class Queues(dict):
         """
         if not isinstance(queue, Queue):
             return self.add_compat(queue, **kwargs)
+        return self._add(queue)
+
+    def add_compat(self, name, **options):
+        # docs used to use binding_key as routing key
+        options.setdefault('routing_key', options.get('binding_key'))
+        if options['routing_key'] is None:
+            options['routing_key'] = name
+        return self._add(Queue.from_dict(name, **options))
+
+    def _add(self, queue):
+        if not queue.routing_key:
+            if queue.exchange is None or queue.exchange.name == '':
+                queue.exchange = self.default_exchange
+            queue.routing_key = self.default_routing_key
         if self.ha_policy:
             if queue.queue_arguments is None:
                 queue.queue_arguments = {}
@@ -119,18 +134,6 @@ class Queues(dict):
         self[queue.name] = queue
         return queue
 
-    def add_compat(self, name, **options):
-        # docs used to use binding_key as routing key
-        options.setdefault('routing_key', options.get('binding_key'))
-        if options['routing_key'] is None:
-            options['routing_key'] = name
-        if self.ha_policy is not None:
-            self._set_ha_policy(options.setdefault('queue_arguments', {}))
-        if self.max_priority is not None:
-            self._set_max_priority(options.setdefault('queue_arguments', {}))
-        q = self[name] = Queue.from_dict(name, **options)
-        return q
-
     def _set_ha_policy(self, args):
         policy = self.ha_policy
         if isinstance(policy, (list, tuple)):
@@ -253,6 +256,7 @@ class AMQP:
         # Create new :class:`Queues` instance, using queue defaults
         # from the current configuration.
         conf = self.app.conf
+        default_routing_key = conf.task_default_routing_key
         if create_missing is None:
             create_missing = conf.task_create_missing_queues
         if ha_policy is None:
@@ -262,12 +266,12 @@ class AMQP:
         if not queues and conf.task_default_queue:
             queues = (Queue(conf.task_default_queue,
                             exchange=self.default_exchange,
-                            routing_key=conf.task_default_routing_key),)
+                            routing_key=default_routing_key),)
         autoexchange = (self.autoexchange if autoexchange is None
                         else autoexchange)
         return self.queues_cls(
             queues, self.default_exchange, create_missing,
-            ha_policy, autoexchange, max_priority,
+            ha_policy, autoexchange, max_priority, default_routing_key,
         )
 
     def Router(self, queues=None, create_missing=None):
@@ -538,7 +542,7 @@ class AMQP:
                     'routing_key': routing_key,
                 })
                 evd.publish('task-sent', sent_event,
-                            self, retry=retry, retry_policy=retry_policy)
+                            producer, retry=retry, retry_policy=retry_policy)
             return ret
         return send_task_message
 

+ 26 - 6
celery/app/base.py

@@ -256,7 +256,7 @@ class Celery:
         self._pending = deque()
         self._tasks = tasks
         if not isinstance(self._tasks, TaskRegistry):
-            self._tasks = TaskRegistry(self._tasks or {})
+            self._tasks = self.registry_cls(self._tasks or {})
 
         # If the class defines a custom __reduce_args__ we need to use
         # the old way of pickling apps: pickling a list of
@@ -288,10 +288,13 @@ class Celery:
         # Signals
         if self.on_configure is None:
             # used to be a method pre 4.0
-            self.on_configure = Signal()
-        self.on_after_configure = Signal()
-        self.on_after_finalize = Signal()
-        self.on_after_fork = Signal()
+            self.on_configure = Signal(name='app.on_configure')
+        self.on_after_configure = Signal(
+            name='app.on_after_configure',
+            providing_args={'source'},
+        )
+        self.on_after_finalize = Signal(name='app.on_after_finalize')
+        self.on_after_fork = Signal(name='app.on_after_fork')
 
         self.on_init()
         _register_app(self)
@@ -470,6 +473,23 @@ class Celery:
             task = self._tasks[name]
         return task
 
+    def register_task(self, task):
+        """Utility for registering a task-based class.
+
+        Note:
+            This is here for compatibility with old Celery 1.0
+            style task classes, you should not need to use this for
+            new projects.
+        """
+        if not task.name:
+            task_cls = type(task)
+            task.name = self.gen_task_name(
+                task_cls.__name__, task_cls.__module__)
+        self.tasks[task.name] = task
+        task._app = self
+        task.bind(self)
+        return task
+
     def gen_task_name(self, name, module):
         return gen_task_name(self, name, module)
 
@@ -704,7 +724,7 @@ class Celery:
         )
 
         if connection:
-            producer = amqp.Producer(connection)
+            producer = amqp.Producer(connection, auto_declare=False)
         with self.producer_or_acquire(producer) as P:
             with P.connection._reraise_as_library_errors():
                 self.backend.on_task_call(P, task_id)

+ 5 - 1
celery/app/defaults.py

@@ -20,7 +20,7 @@ elif is_pypy:
     else:
         DEFAULT_POOL = 'prefork'
 
-DEFAULT_ACCEPT_CONTENT = ['json', 'pickle', 'msgpack', 'yaml']
+DEFAULT_ACCEPT_CONTENT = ['json']
 DEFAULT_PROCESS_LOG_FMT = """
     [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
 """.strip()
@@ -69,6 +69,7 @@ class Option:
         return '<Option: type->{0} default->{1!r}>'.format(self.type,
                                                            self.default)
 
+
 NAMESPACES = Namespace(
     accept_content=Option(DEFAULT_ACCEPT_CONTENT, type='list', old=OLD_NS),
     enable_utc=Option(True, type='bool'),
@@ -154,6 +155,7 @@ NAMESPACES = Namespace(
         password=Option(type='string'),
         port=Option(type='int'),
         socket_timeout=Option(120.0, type='float'),
+        socket_connect_timeout=Option(None, type='float'),
     ),
     result=Namespace(
         __old__=old_ns('celery_result'),
@@ -306,6 +308,8 @@ def flatten(d, root='', keyfilter=_flatten_keys):
             else:
                 for ret in keyfilter(ns, key, opt):
                     yield ret
+
+
 DEFAULTS = {
     key: opt.default for key, opt in flatten(NAMESPACES)
 }

+ 6 - 2
celery/app/registry.py

@@ -3,7 +3,7 @@
 import inspect
 from importlib import import_module
 from celery._state import get_current_app
-from celery.exceptions import NotRegistered
+from celery.exceptions import NotRegistered, InvalidTaskError
 
 __all__ = ['TaskRegistry']
 
@@ -20,8 +20,12 @@ class TaskRegistry(dict):
         """Register a task in the task registry.
 
         The task will be automatically instantiated if not already an
-        instance.
+        instance. Name must be configured prior to registration.
         """
+        if task.name is None:
+            raise InvalidTaskError(
+                'Task class {0!r} must specify .name attribute'.format(
+                    type(task).__name__))
         self[task.name] = inspect.isclass(task) and task() or task
 
     def unregister(self, name):

+ 12 - 5
celery/app/task.py

@@ -117,7 +117,6 @@ class Context:
             'expires': self.expires,
             'soft_time_limit': limit_soft,
             'time_limit': limit_hard,
-            'reply_to': self.reply_to,
             'headers': self.headers,
             'retries': self.retries,
             'reply_to': self.reply_to,
@@ -555,9 +554,17 @@ class Task:
         args = request.args if args is None else args
         kwargs = request.kwargs if kwargs is None else kwargs
         options = request.as_execution_options()
-        options.update(
-            {'queue': queue} if queue else (request.delivery_info or {}),
-        )
+        if queue:
+            options['queue'] = queue
+        else:
+            delivery_info = request.delivery_info or {}
+            exchange = delivery_info.get('exchange')
+            routing_key = delivery_info.get('routing_key')
+            if exchange == '' and routing_key:
+                # sent to anon-exchange
+                options['queue'] = routing_key
+            else:
+                options.update(delivery_info)
         return self.signature(
             args, kwargs, options, type=self, **extra_options
         )
@@ -989,4 +996,4 @@ class Task:
     @property
     def __name__(self):
         return self.__class__.__name__
-BaseTask = Task  # compat alias
+BaseTask = Task  # noqa: E305 XXX compat alias

+ 3 - 4
celery/app/trace.py

@@ -40,7 +40,6 @@ from celery.utils.saferepr import saferepr
 from celery.utils.serialization import (
     get_pickleable_exception, get_pickled_exception, get_pickleable_etype,
 )
-from celery.utils.text import truncate
 
 __all__ = [
     'TraceInfo', 'build_tracer', 'trace_task',
@@ -242,8 +241,8 @@ class TraceInfo:
 
 def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                  Info=TraceInfo, eager=False, propagate=False, app=None,
-                 monotonic=monotonic, truncate=truncate,
-                 trace_ok_t=trace_ok_t, IGNORE_STATES=IGNORE_STATES):
+                 monotonic=monotonic, trace_ok_t=trace_ok_t,
+                 IGNORE_STATES=IGNORE_STATES):
     """Return a function that traces task execution.
 
     Catches all exceptions and updates result backend with the
@@ -507,7 +506,7 @@ def _trace_task_ret(name, uuid, request, body, content_type,
     R, I, T, Rstr = trace_task(app.tasks[name],
                                uuid, args, kwargs, request, app=app)
     return (1, R, T) if I else (0, Rstr, T)
-trace_task_ret = _trace_task_ret
+trace_task_ret = _trace_task_ret  # noqa: E305
 
 
 def _fast_trace_task(task, uuid, request, body, content_type,

+ 10 - 0
celery/app/utils.py

@@ -160,9 +160,19 @@ class Settings(ConfigurationView):
         """
         return self['_'.join(part for part in parts if part)]
 
+    def finalize(self):
+        # See PendingConfiguration in celery/app/base.py
+        # first access will read actual configuration.
+        try:
+            self['__bogus__']
+        except KeyError:
+            pass
+        return self
+
     def table(self, with_defaults=False, censored=True):
         filt = filter_hidden_settings if censored else lambda v: v
         dict_members = dir(dict)
+        self.finalize()
         return filt({
             k: v for k, v in (
                 self if with_defaults else self.without_defaults()).items()

+ 16 - 10
celery/apps/worker.py

@@ -39,16 +39,6 @@ logger = get_logger(__name__)
 is_jython = sys.platform.startswith('java')
 is_pypy = hasattr(sys, 'pypy_version_info')
 
-
-def active_thread_count():
-    from threading import enumerate
-    return sum(1 for t in enumerate()
-               if not t.name.startswith('Dummy-'))
-
-
-def safe_say(msg):
-    print('\n{0}'.format(msg), file=sys.__stderr__)
-
 ARTLINES = [
     ' --------------',
     '---- **** -----',
@@ -86,6 +76,16 @@ EXTRA_INFO_FMT = """
 """
 
 
+def active_thread_count():
+    from threading import enumerate
+    return sum(1 for t in enumerate()
+               if not t.name.startswith('Dummy-'))
+
+
+def safe_say(msg):
+    print('\n{0}'.format(msg), file=sys.__stderr__)
+
+
 class Worker(WorkController):
     """Worker as a program."""
 
@@ -182,6 +182,8 @@ class Worker(WorkController):
         )
 
     def extra_info(self):
+        if self.loglevel is None:
+            return
         if self.loglevel <= logging.INFO:
             include_builtins = self.loglevel <= logging.DEBUG
             tasklist = self.tasklist(include_builtins=include_builtins)
@@ -279,6 +281,8 @@ def _shutdown_handler(worker, sig='TERM', how='Warm',
                 raise exc(exitcode)
     _handle_request.__name__ = str('worker_{0}'.format(how))
     platforms.signals[sig] = _handle_request
+
+
 install_worker_term_handler = partial(
     _shutdown_handler, sig='SIGTERM', how='Warm', exc=WorkerShutdown,
 )
@@ -295,6 +299,8 @@ else:  # pragma: no cover
 def on_SIGINT(worker):
     safe_say('worker: Hitting Ctrl+C again will terminate all running tasks!')
     install_worker_term_hard_handler(worker, sig='SIGINT')
+
+
 if not is_jython:  # pragma: no cover
     install_worker_int_handler = partial(
         _shutdown_handler, sig='SIGINT', callback=on_SIGINT,

+ 2 - 2
celery/backends/base.py

@@ -504,7 +504,7 @@ class SyncBackendMixin:
 
 class BaseBackend(Backend, SyncBackendMixin):
     """Base (synchronous) result backend."""
-BaseDictBackend = BaseBackend  # XXX compat
+BaseDictBackend = BaseBackend  # noqa: E305 XXX compat
 
 
 class BaseKeyValueStoreBackend(Backend):
@@ -744,7 +744,7 @@ class BaseKeyValueStoreBackend(Backend):
                 deps.delete()
                 self.client.delete(key)
         else:
-            self.expire(key, 86400)
+            self.expire(key, self.expires)
 
 
 class KeyValueStoreBackend(BaseKeyValueStoreBackend, SyncBackendMixin):

+ 7 - 1
celery/backends/cache.py

@@ -70,6 +70,9 @@ class DummyClient:
     def incr(self, key, delta=1):
         return self.cache.incr(key, delta)
 
+    def touch(self, key, expire):
+        pass
+
 
 backends = {
     'memcache': get_best_memcache,
@@ -120,13 +123,16 @@ class CacheBackend(KeyValueStoreBackend):
         return self.client.delete(key)
 
     def _apply_chord_incr(self, header, partial_args, group_id, body, **opts):
-        self.client.set(self.get_key_for_chord(group_id), 0, time=86400)
+        self.client.set(self.get_key_for_chord(group_id), 0, time=self.expires)
         return super()._apply_chord_incr(
             header, partial_args, group_id, body, **opts)
 
     def incr(self, key):
         return self.client.incr(key)
 
+    def expire(self, key, value):
+        return self.client.touch(key, value)
+
     @cached_property
     def client(self):
         return self.Client(self.servers, **self.options)

+ 1 - 0
celery/backends/elasticsearch.py

@@ -90,6 +90,7 @@ class ElasticsearchBackend(KeyValueStoreBackend):
         return self.server.index(
             index=self.index,
             doc_type=self.doc_type,
+            body=body,
             **kwargs
         )
 

+ 11 - 10
celery/backends/redis.py

@@ -115,13 +115,18 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
             self.max_connections)
         self._ConnectionPool = connection_pool
 
+        socket_timeout = _get('redis_socket_timeout')
+        socket_connect_timeout = _get('redis_socket_connect_timeout')
+
         self.connparams = {
             'host': _get('redis_host') or 'localhost',
             'port': _get('redis_port') or 6379,
             'db': _get('redis_db') or 0,
             'password': _get('redis_password'),
-            'socket_timeout': _get('redis_socket_timeout'),
             'max_connections': self.max_connections,
+            'socket_timeout': socket_timeout and float(socket_timeout),
+            'socket_connect_timeout':
+                socket_connect_timeout and float(socket_connect_timeout),
         }
         if url:
             self.connparams = self._params_from_url(url, self.connparams)
@@ -153,6 +158,7 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
             # host+port are invalid options when using this connection type.
             connparams.pop('host', None)
             connparams.pop('port', None)
+            connparams.pop('socket_connect_timeout')
         else:
             connparams['db'] = path
 
@@ -250,8 +256,8 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
                 .rpush(jkey, self.encode([1, tid, state, result]))          \
                 .llen(jkey)                                                 \
                 .get(tkey)                                                  \
-                .expire(jkey, 86400)                                        \
-                .expire(tkey, 86400)                                        \
+                .expire(jkey, self.expires)                                 \
+                .expire(tkey, self.expires)                                 \
                 .execute()
 
         totaldiff = int(totaldiff or 0)
@@ -286,14 +292,9 @@ class RedisBackend(base.BaseKeyValueStoreBackend, async.AsyncBackendMixin):
                 ChordError('Join error: {0!r}'.format(exc)),
             )
 
-    def _create_client(self, socket_timeout=None, socket_connect_timeout=None,
-                       **params):
+    def _create_client(self, **params):
         return self.redis.StrictRedis(
-            connection_pool=self.ConnectionPool(
-                socket_timeout=socket_timeout and float(socket_timeout),
-                socket_connect_timeout=socket_connect_timeout and float(
-                    socket_connect_timeout),
-                **params),
+            connection_pool=self.ConnectionPool(**params),
         )
 
     @property

+ 1 - 1
celery/beat.py

@@ -382,7 +382,7 @@ class Scheduler:
 
     @cached_property
     def producer(self):
-        return self.Producer(self._ensure_connected())
+        return self.Producer(self._ensure_connected(), auto_declare=False)
 
     @property
     def info(self):

+ 1 - 0
celery/bin/amqp.py

@@ -366,5 +366,6 @@ class amqp(Command):
 def main():
     amqp().execute_from_commandline()
 
+
 if __name__ == '__main__':  # pragma: no cover
     main()

+ 5 - 2
celery/bin/base.py

@@ -200,6 +200,9 @@ class Command:
 
     prog_name = 'celery'
 
+    #: Name of argparse option used for parsing positional args.
+    args_name = 'args'
+
     def __init__(self, app=None, get_app=None, no_color=False,
                  stdout=None, stderr=None, quiet=False, on_error=None,
                  on_usage_error=None):
@@ -398,7 +401,7 @@ class Command:
         # so we handle --version manually here.
         self.parser = self.create_parser(prog_name, command)
         options = vars(self.parser.parse_args(arguments))
-        return options, options.pop('args', None) or []
+        return options, options.pop(self.args_name, None) or []
 
     def create_parser(self, prog_name, command=None):
         # for compatibility with optparse usage.
@@ -419,7 +422,7 @@ class Command:
         if self.supports_args:
             # for backward compatibility with optparse, we automatically
             # add arbitrary positional args.
-            parser.add_argument('args', nargs='*')
+            parser.add_argument(self.args_name, nargs='*')
         return self.prepare_parser(parser)
 
     def _format_epilog(self, epilog):

+ 1 - 0
celery/bin/beat.py

@@ -126,5 +126,6 @@ class beat(Command):
 def main(app=None):
     beat(app=app).execute_from_commandline()
 
+
 if __name__ == '__main__':      # pragma: no cover
     main()

+ 3 - 0
celery/bin/call.py

@@ -17,6 +17,9 @@ class call(Command):
 
     args = '<task_name>'
 
+    # since we have an argument --args, we need to name this differently.
+    args_name = 'posargs'
+
     def add_arguments(self, parser):
         group = parser.add_argument_group('Calling Options')
         group.add_argument('--args', '-a',

+ 1 - 0
celery/bin/celeryd_detach.py

@@ -130,5 +130,6 @@ class detached_celeryd:
 def main(app=None):
     detached_celeryd(app).execute_from_commandline()
 
+
 if __name__ == '__main__':  # pragma: no cover
     main()

+ 1 - 0
celery/bin/events.py

@@ -172,5 +172,6 @@ def main():
     ev = events()
     ev.execute_from_commandline()
 
+
 if __name__ == '__main__':              # pragma: no cover
     main()

+ 1 - 0
celery/bin/multi.py

@@ -450,5 +450,6 @@ class MultiTool(TermLogger):
     def DOWN(self):
         return str(self.colored.magenta('DOWN'))
 
+
 if __name__ == '__main__':              # pragma: no cover
     main()

+ 1 - 4
celery/canvas.py

@@ -1103,9 +1103,6 @@ class group(Signature):
                                   chord=chord, root_id=root_id,
                                   parent_id=parent_id)
 
-    def __iter__(self):
-        return iter(self.tasks)
-
     def __repr__(self):
         if self.tasks:
             return remove_repeating_from_task(
@@ -1221,7 +1218,7 @@ class chord(Signature):
         if len(self.tasks) == 1:
             # chord([A], B) can be optimized as A | B
             # - Issue #3323
-            return (self.tasks[0].set(task_id=task_id) | body).apply_async(
+            return (self.tasks[0] | body).set(task_id=task_id).apply_async(
                 args, kwargs, **options)
         # chord([A, B, ...], C)
         return self.run(tasks, body, args, task_id=task_id, **options)

+ 8 - 6
celery/concurrency/asynpool.py

@@ -498,11 +498,11 @@ class AsynPool(_pool.Pool):
             _discard_tref(R._job)
         self.on_timeout_cancel = on_timeout_cancel
 
-    def _on_soft_timeout(self, job, soft, hard, hub, now=time.time):
+    def _on_soft_timeout(self, job, soft, hard, hub):
         # only used by async pool.
         if hard:
-            self._tref_for_id[job] = hub.call_at(
-                now() + (hard - soft), self._on_hard_timeout, job,
+            self._tref_for_id[job] = hub.call_later(
+                hard - soft, self._on_hard_timeout, job,
             )
         try:
             result = self._cache[job]
@@ -799,7 +799,7 @@ class AsynPool(_pool.Pool):
             # inqueues are writable.
             body = dumps(tup, protocol=protocol)
             body_size = len(body)
-            header = pack('>I', body_size)
+            header = pack(b'>I', body_size)
             # index 1,0 is the job ID.
             job = get_job(tup[1][0])
             job._payload = buf_t(header), buf_t(body), body_size
@@ -1077,7 +1077,9 @@ class AsynPool(_pool.Pool):
             'avg': per(total / len(self.write_stats) if total else 0, total),
             'all': ', '.join(per(v, total) for v in vals),
             'raw': ', '.join(map(str, vals)),
-            'strategy': SCHED_STRATEGY_TO_NAME[self.sched_strategy],
+            'strategy': SCHED_STRATEGY_TO_NAME.get(
+                self.sched_strategy, self.sched_strategy,
+            ),
             'inqueues': {
                 'total': len(self._all_inqueues),
                 'active': len(self._active_writes),
@@ -1231,7 +1233,7 @@ class AsynPool(_pool.Pool):
                         protocol=HIGHEST_PROTOCOL):
         body = dumps((type_, args), protocol=protocol)
         size = len(body)
-        header = pack('>I', size)
+        header = pack(b'>I', size)
         return header, body, size
 
     @classmethod

+ 1 - 1
celery/contrib/migrate.py

@@ -98,7 +98,7 @@ def migrate_tasks(source, dest, migrate=migrate_task, app=None,
     """Migrate tasks from one broker to another."""
     app = app_or_default(app)
     queues = prepare_queues(queues)
-    producer = app.amqp.Producer(dest)
+    producer = app.amqp.Producer(dest, auto_declare=False)
     migrate = partial(migrate, producer, queues=queues)
 
     def on_declare_queue(queue):

+ 49 - 6
celery/contrib/pytest.py

@@ -12,13 +12,18 @@ NO_WORKER = os.environ.get('NO_WORKER')
 
 
 @contextmanager
-def _create_app(request, enable_logging=False, use_trap=False, **config):
+def _create_app(request,
+                enable_logging=False,
+                use_trap=False,
+                parameters={},
+                **config):
     # type: (Any, **Any) -> Celery
     """Utility context used to setup Celery app for pytest fixtures."""
     test_app = TestApp(
         set_as_current=False,
         enable_logging=enable_logging,
         config=config,
+        **parameters
     )
     # request.module is not defined for session
     _module = getattr(request, 'module', None)
@@ -49,6 +54,7 @@ def use_celery_app_trap():
 @pytest.fixture(scope='session')
 def celery_session_app(request,
                        celery_config,
+                       celery_parameters,
                        celery_enable_logging,
                        use_celery_app_trap):
     # type: (Any) -> Celery
@@ -58,6 +64,7 @@ def celery_session_app(request,
     with _create_app(request,
                      enable_logging=celery_enable_logging,
                      use_trap=use_celery_app_trap,
+                     parameters=celery_parameters,
                      **config) as app:
         if not use_celery_app_trap:
             app.set_default()
@@ -66,15 +73,19 @@ def celery_session_app(request,
 
 
 @pytest.fixture(scope='session')
-def celery_session_worker(request, celery_session_app,
-                          celery_includes, celery_worker_pool):
+def celery_session_worker(request,
+                          celery_session_app,
+                          celery_includes,
+                          celery_worker_pool,
+                          celery_worker_parameters):
     # type: (Any, Celery, Sequence[str], str) -> WorkController
     """Session Fixture: Start worker that lives throughout test suite."""
     if not NO_WORKER:
         for module in celery_includes:
             celery_session_app.loader.import_task_module(module)
         with worker.start_worker(celery_session_app,
-                                 pool=celery_worker_pool) as w:
+                                 pool=celery_worker_pool,
+                                 **celery_worker_parameters) as w:
             yield w
 
 
@@ -118,9 +129,34 @@ def celery_config():
     return {}
 
 
+@pytest.fixture(scope='session')
+def celery_parameters():
+    # type: () -> Mapping[str, Any]
+    """Redefine this fixture to change the init parameters of test Celery app.
+
+    The dict returned by your fixture will then be used
+    as parameters when instantiating :class:`~celery.Celery`.
+    """
+    return {}
+
+
+@pytest.fixture(scope='session')
+def celery_worker_parameters():
+    # type: () -> Mapping[str, Any]
+    """Redefine this fixture to change the init parameters of Celery workers.
+
+    This can be used e. g. to define queues the worker will consume tasks from.
+
+    The dict returned by your fixture will then be used
+    as parameters when instantiating :class:`~celery.worker.WorkController`.
+    """
+    return {}
+
+
 @pytest.fixture()
 def celery_app(request,
                celery_config,
+               celery_parameters,
                celery_enable_logging,
                use_celery_app_trap):
     """Fixture creating a Celery application instance."""
@@ -129,18 +165,25 @@ def celery_app(request,
     with _create_app(request,
                      enable_logging=celery_enable_logging,
                      use_trap=use_celery_app_trap,
+                     parameters=celery_parameters,
                      **config) as app:
         yield app
 
 
 @pytest.fixture()
-def celery_worker(request, celery_app, celery_includes, celery_worker_pool):
+def celery_worker(request,
+                  celery_app,
+                  celery_includes,
+                  celery_worker_pool,
+                  celery_worker_parameters):
     # type: (Any, Celery, Sequence[str], str) -> WorkController
     """Fixture: Start worker in a thread, stop it when the test returns."""
     if not NO_WORKER:
         for module in celery_includes:
             celery_app.loader.import_task_module(module)
-        with worker.start_worker(celery_app, pool=celery_worker_pool) as w:
+        with worker.start_worker(celery_app,
+                                 pool=celery_worker_pool,
+                                 **celery_worker_parameters) as w:
             yield w
 
 

+ 2 - 1
celery/contrib/testing/app.py

@@ -14,7 +14,8 @@ DEFAULT_TEST_CONFIG = {
     'enable_utc': True,
     'timezone': 'UTC',
     'broker_url': 'memory://',
-    'result_backend': 'cache+memory://'
+    'result_backend': 'cache+memory://',
+    'broker_heartbeat': 0,
 }
 
 

+ 13 - 4
celery/contrib/testing/worker.py

@@ -7,12 +7,21 @@ from celery.result import allow_join_result, _set_task_join_will_block
 from celery.utils.dispatch import Signal
 from celery.utils.nodenames import anon_nodename
 
-test_worker_starting = Signal(providing_args=[])
-test_worker_started = Signal(providing_args=['worker', 'consumer'])
-test_worker_stopped = Signal(providing_args=['worker'])
-
 WORKER_LOGLEVEL = os.environ.get('WORKER_LOGLEVEL', 'error')
 
+test_worker_starting = Signal(
+    name='test_worker_starting',
+    providing_args={},
+)
+test_worker_started = Signal(
+    name='test_worker_started',
+    providing_args={'worker', 'consumer'},
+)
+test_worker_stopped = Signal(
+    name='test_worker_stopped',
+    providing_args={'worker'},
+)
+
 
 class TestWorkController(worker.WorkController):
     """Worker that can synchronize on being fully started."""

+ 2 - 1
celery/events/dispatcher.py

@@ -99,7 +99,8 @@ class EventDispatcher:
     def enable(self):
         self.producer = Producer(self.channel or self.connection,
                                  exchange=self.exchange,
-                                 serializer=self.serializer)
+                                 serializer=self.serializer,
+                                 auto_declare=False)
         self.enabled = True
         for callback in self.on_enabled:
             callback()

+ 1 - 0
celery/events/dumper.py

@@ -102,5 +102,6 @@ def evdump(app=None, out=sys.stdout):
         except conn.connection_errors + conn.channel_errors:
             dumper.say('-> Connection lost, attempting reconnect')
 
+
 if __name__ == '__main__':  # pragma: no cover
     evdump()

+ 4 - 4
celery/events/snapshot.py

@@ -25,8 +25,8 @@ class Polaroid:
     """Record event snapshots."""
 
     timer = None
-    shutter_signal = Signal(providing_args=('state',))
-    cleanup_signal = Signal()
+    shutter_signal = Signal(name='shutter_signal', providing_args={'state'})
+    cleanup_signal = Signal(name='cleanup_signal')
     clear_after = False
 
     _tref = None
@@ -56,13 +56,13 @@ class Polaroid:
 
     def cleanup(self):
         logger.debug('Cleanup: Running...')
-        self.cleanup_signal.send(None)
+        self.cleanup_signal.send(sender=self.state)
         self.on_cleanup()
 
     def shutter(self):
         if self.maxrate is None or self.maxrate.can_consume():
             logger.debug('Shutter: %s', self.state)
-            self.shutter_signal.send(self.state)
+            self.shutter_signal.send(sender=self.state)
             self.on_shutter(self.state)
 
     def capture(self):

+ 1 - 1
celery/events/state.py

@@ -98,7 +98,7 @@ class CallableDefaultdict(defaultdict):
 
     def __call__(self, *args, **kwargs):
         return self.fun(*args, **kwargs)
-Callable.register(CallableDefaultdict)
+Callable.register(CallableDefaultdict)  # noqa: E305
 
 
 @memoize(maxsize=1000, keyfun=lambda a, _: a[0])

+ 1 - 1
celery/exceptions.py

@@ -236,7 +236,7 @@ class CDeprecationWarning(DeprecationWarning):
 
 class WorkerTerminate(SystemExit):
     """Signals that the worker should terminate immediately."""
-SystemTerminate = WorkerTerminate  # XXX compat
+SystemTerminate = WorkerTerminate  # noqa: E305 XXX compat
 
 
 class WorkerShutdown(SystemExit):

+ 10 - 7
celery/loaders/base.py

@@ -9,6 +9,7 @@ import sys
 from datetime import datetime
 
 from kombu.utils import json
+from kombu.utils.objects import cached_property
 
 from celery import signals
 from celery.utils.collections import DictAttribute, force_mapping
@@ -104,13 +105,7 @@ class BaseLoader:
 
     def import_default_modules(self):
         signals.import_modules.send(sender=self.app)
-        return [
-            self.import_task_module(m) for m in (
-                tuple(self.builtin_modules) +
-                tuple(maybe_list(self.app.conf.imports)) +
-                tuple(maybe_list(self.app.conf.include))
-            )
-        ]
+        return [self.import_task_module(m) for m in self.default_modules]
 
     def init_worker(self):
         if not self.worker_initialized:
@@ -225,6 +220,14 @@ class BaseLoader:
             mod.__name__ for mod in autodiscover_tasks(packages or (),
                                                        related_name) if mod)
 
+    @cached_property
+    def default_modules(self):
+        return (
+            tuple(self.builtin_modules) +
+            tuple(maybe_list(self.app.conf.imports)) +
+            tuple(maybe_list(self.app.conf.include))
+        )
+
     @property
     def conf(self):
         """Loader configuration."""

+ 3 - 1
celery/local.py

@@ -292,7 +292,7 @@ class PromiseProxy(Proxy):
     promise will only evaluate it once.
     """
 
-    __slots__ = ('__pending__',)
+    __slots__ = ('__pending__', '__weakref__')
 
     def _get_current_object(self):
         try:
@@ -372,6 +372,7 @@ def maybe_evaluate(obj):
 
 # import fails in python 2.5. fallback to reduce in stdlib
 
+
 MODULE_DEPRECATED = """
 The module %s is deprecated and will be removed in a future version.
 """
@@ -402,6 +403,7 @@ def _compat_periodic_task_decorator(*args, **kwargs):
     from celery.task import periodic_task
     return periodic_task(*args, **kwargs)
 
+
 COMPAT_MODULES = {
     'celery': {
         'execute': {

+ 2 - 1
celery/platforms.py

@@ -226,7 +226,7 @@ class Pidfile:
                     "Inconsistency: Pidfile content doesn't match at re-read")
         finally:
             rfh.close()
-PIDFile = Pidfile  # compat alias
+PIDFile = Pidfile  # noqa: E305 XXX compat alias
 
 
 def create_pidlock(pidfile):
@@ -675,6 +675,7 @@ class Signals:
         for name, handler in dict(_d_ or {}, **sigmap).items():
             self[name] = handler
 
+
 signals = Signals()
 get_signal = signals.signum                   # compat
 install_signal_handler = signals.__setitem__  # compat

+ 4 - 4
celery/result.py

@@ -824,11 +824,11 @@ class GroupResult(ResultSet):
         return self.results
 
     @classmethod
-    def restore(cls, id, backend=None):
+    def restore(cls, id, backend=None, app=None):
         """Restore previously saved group result."""
-        return (
-            backend or (cls.app.backend if cls.app else current_app.backend)
-        ).restore_group(id)
+        app = app or cls.app
+        backend = backend or (app.backend if app else current_app.backend)
+        return backend.restore_group(id)
 
 
 @AbstractResult.register

+ 0 - 2
celery/security/serialization.py

@@ -2,9 +2,7 @@
 """Secure serializer."""
 from kombu.serialization import registry, dumps, loads
 from kombu.utils.encoding import bytes_to_str, str_to_bytes, ensure_bytes
-
 from celery.utils.serialization import b64encode, b64decode
-
 from .certificate import Certificate, FSCertStore
 from .key import PrivateKey
 from .utils import reraise_errors

+ 112 - 52
celery/signals.py

@@ -25,56 +25,116 @@ __all__ = [
     'eventlet_pool_postshutdown', 'eventlet_pool_apply',
 ]
 
-before_task_publish = Signal(providing_args=[
-    'body', 'exchange', 'routing_key', 'headers', 'properties',
-    'declare', 'retry_policy',
-])
-after_task_publish = Signal(providing_args=[
-    'body', 'exchange', 'routing_key',
-])
-task_prerun = Signal(providing_args=['task_id', 'task', 'args', 'kwargs'])
-task_postrun = Signal(providing_args=[
-    'task_id', 'task', 'args', 'kwargs', 'retval',
-])
-task_success = Signal(providing_args=['result'])
-task_retry = Signal(providing_args=[
-    'request', 'reason', 'einfo',
-])
-task_failure = Signal(providing_args=[
-    'task_id', 'exception', 'args', 'kwargs', 'traceback', 'einfo',
-])
-task_revoked = Signal(providing_args=[
-    'request', 'terminated', 'signum', 'expired',
-])
-task_rejected = Signal(providing_args=[
-    'message', 'exc',
-])
-task_unknown = Signal(providing_args=[
-    'message', 'exc', 'name', 'id',
-])
+# - Task
+before_task_publish = Signal(
+    name='before_task_publish',
+    providing_args={
+        'body', 'exchange', 'routing_key', 'headers',
+        'properties', 'declare', 'retry_policy',
+    },
+)
+after_task_publish = Signal(
+    name='after_task_publish',
+    providing_args={'body', 'exchange', 'routing_key'},
+)
+task_prerun = Signal(
+    name='task_prerun',
+    providing_args={'task_id', 'task', 'args', 'kwargs'},
+)
+task_postrun = Signal(
+    name='task_postrun',
+    providing_args={'task_id', 'task', 'args', 'kwargs', 'retval'},
+)
+task_success = Signal(
+    name='task_success',
+    providing_args={'result'},
+)
+task_retry = Signal(
+    name='task_retry',
+    providing_args={'request', 'reason', 'einfo'},
+)
+task_failure = Signal(
+    name='task_failure',
+    providing_args={
+        'task_id', 'exception', 'args', 'kwargs', 'traceback', 'einfo',
+    },
+)
+task_revoked = Signal(
+    name='task_revoked',
+    providing_args={
+        'request', 'terminated', 'signum', 'expired',
+    },
+)
+task_rejected = Signal(
+    name='task_rejected',
+    providing_args={'message', 'exc'},
+)
+task_unknown = Signal(
+    name='task_unknown',
+    providing_args={'message', 'exc', 'name', 'id'},
+)
+#: Deprecated, use after_task_publish instead.
+task_sent = Signal(
+    name='task_sent',
+    providing_args={
+        'task_id', 'task', 'args', 'kwargs', 'eta', 'taskset',
+    },
+)
 
-celeryd_init = Signal(providing_args=['instance', 'conf', 'options'])
-celeryd_after_setup = Signal(providing_args=['instance', 'conf'])
-import_modules = Signal(providing_args=[])
-worker_init = Signal(providing_args=[])
-worker_process_init = Signal(providing_args=[])
-worker_process_shutdown = Signal(providing_args=[])
-worker_ready = Signal(providing_args=[])
-worker_shutdown = Signal(providing_args=[])
-setup_logging = Signal(providing_args=[
-    'loglevel', 'logfile', 'format', 'colorize',
-])
-after_setup_logger = Signal(providing_args=[
-    'logger', 'loglevel', 'logfile', 'format', 'colorize',
-])
-after_setup_task_logger = Signal(providing_args=[
-    'logger', 'loglevel', 'logfile', 'format', 'colorize',
-])
-beat_init = Signal(providing_args=[])
-beat_embedded_init = Signal(providing_args=[])
-heartbeat_sent = Signal(providing_args=[])
-eventlet_pool_started = Signal(providing_args=[])
-eventlet_pool_preshutdown = Signal(providing_args=[])
-eventlet_pool_postshutdown = Signal(providing_args=[])
-eventlet_pool_apply = Signal(providing_args=['target', 'args', 'kwargs'])
-user_preload_options = Signal(providing_args=['app', 'options'])
+# - Prorgam: `celery worker`
+celeryd_init = Signal(
+    name='celeryd_init',
+    providing_args={'instance', 'conf', 'options'},
+)
+celeryd_after_setup = Signal(
+    name='celeryd_after_setup',
+    providing_args={'instance', 'conf'},
+)
+
+# - Worker
+import_modules = Signal(name='import_modules')
+worker_init = Signal(name='worker_init')
+worker_process_init = Signal(name='worker_process_init')
+worker_process_shutdown = Signal(name='worker_process_shutdown')
+worker_ready = Signal(name='worker_ready')
+worker_shutdown = Signal(name='worker_shutdown')
+heartbeat_sent = Signal(name='heartbeat_sent')
+
+# - Logging
+setup_logging = Signal(
+    name='setup_logging',
+    providing_args={
+        'loglevel', 'logfile', 'format', 'colorize',
+    },
+)
+after_setup_logger = Signal(
+    name='after_setup_logger',
+    providing_args={
+        'logger', 'loglevel', 'logfile', 'format', 'colorize',
+    },
+)
+after_setup_task_logger = Signal(
+    name='after_setup_task_logger',
+    providing_args={
+        'logger', 'loglevel', 'logfile', 'format', 'colorize',
+    },
+)
+
+# - Beat
+beat_init = Signal(name='beat_init')
+beat_embedded_init = Signal(name='beat_embedded_init')
+
+# - Eventlet
+eventlet_pool_started = Signal(name='eventlet_pool_started')
+eventlet_pool_preshutdown = Signal(name='eventlet_pool_preshutdown')
+eventlet_pool_postshutdown = Signal(name='eventlet_pool_postshutdown')
+eventlet_pool_apply = Signal(
+    name='eventlet_pool_apply',
+    providing_args={'target', 'args', 'kwargs'},
+)
+
+# - Programs
+user_preload_options = Signal(
+    name='user_preload_options',
+    providing_args={'app', 'options'},
+)

+ 1 - 0
celery/states.py

@@ -124,6 +124,7 @@ class state(str):
     def __le__(self, other: Any) -> bool:
         return precedence(self) >= precedence(other)
 
+
 #: Task state is unknown (assumed pending since you know the id).
 PENDING = 'PENDING'
 #: Task was received by a worker (only used in events).

+ 4 - 4
celery/utils/collections.py

@@ -3,7 +3,7 @@
 import time
 
 from collections import (
-    Mapping, MutableMapping, MutableSet, Sequence as _Sequence,
+    Mapping, MutableMapping, MutableSet,
     OrderedDict as _OrderedDict, deque,
 )
 from heapq import heapify, heappush, heappop
@@ -149,7 +149,7 @@ class DictAttribute:
     def values(self) -> Iterator[Any]:
         for key in self.keys():
             yield getattr(self.obj, key)
-MutableMapping.register(DictAttribute)
+MutableMapping.register(DictAttribute)  # noqa: E305
 
 
 class ChainMap(MutableMapping):
@@ -571,7 +571,7 @@ class LimitedSet:
     def _heap_overload(self) -> float:
         """Compute how much is heap bigger than data [percents]."""
         return len(self._heap) * 100 / max(len(self._data), 1) - 100
-MutableSet.register(LimitedSet)
+MutableSet.register(LimitedSet)  # noqa: E305
 
 
 class Evictable:
@@ -660,7 +660,7 @@ class Messagebuffer(Evictable):
     @property
     def _evictcount(self) -> int:
         return len(self)
-_Sequence.register(Messagebuffer)
+Sequence.register(Messagebuffer)  # noqa: E305
 
 
 class BufferMap(OrderedDict, Evictable):

+ 255 - 0
celery/utils/dispatch/LICENSE.python

@@ -0,0 +1,255 @@
+A. HISTORY OF THE SOFTWARE
+==========================
+
+Python was created in the early 1990s by Guido van Rossum at Stichting
+Mathematisch Centrum (CWI, see http://www.cwi.nl) in the Netherlands
+as a successor of a language called ABC.  Guido remains Python's
+principal author, although it includes many contributions from others.
+
+In 1995, Guido continued his work on Python at the Corporation for
+National Research Initiatives (CNRI, see http://www.cnri.reston.va.us)
+in Reston, Virginia where he released several versions of the
+software.
+
+In May 2000, Guido and the Python core development team moved to
+BeOpen.com to form the BeOpen PythonLabs team.  In October of the same
+year, the PythonLabs team moved to Digital Creations (now Zope
+Corporation, see http://www.zope.com).  In 2001, the Python Software
+Foundation (PSF, see http://www.python.org/psf/) was formed, a
+non-profit organization created specifically to own Python-related
+Intellectual Property.  Zope Corporation is a sponsoring member of
+the PSF.
+
+All Python releases are Open Source (see http://www.opensource.org for
+the Open Source Definition).  Historically, most, but not all, Python
+releases have also been GPL-compatible; the table below summarizes
+the various releases.
+
+    Release         Derived     Year        Owner       GPL-
+                    from                                compatible? (1)
+
+    0.9.0 thru 1.2              1991-1995   CWI         yes
+    1.3 thru 1.5.2  1.2         1995-1999   CNRI        yes
+    1.6             1.5.2       2000        CNRI        no
+    2.0             1.6         2000        BeOpen.com  no
+    1.6.1           1.6         2001        CNRI        yes (2)
+    2.1             2.0+1.6.1   2001        PSF         no
+    2.0.1           2.0+1.6.1   2001        PSF         yes
+    2.1.1           2.1+2.0.1   2001        PSF         yes
+    2.1.2           2.1.1       2002        PSF         yes
+    2.1.3           2.1.2       2002        PSF         yes
+    2.2 and above   2.1.1       2001-now    PSF         yes
+
+Footnotes:
+
+(1) GPL-compatible doesn't mean that we're distributing Python under
+    the GPL.  All Python licenses, unlike the GPL, let you distribute
+    a modified version without making your changes open source.  The
+    GPL-compatible licenses make it possible to combine Python with
+    other software that is released under the GPL; the others don't.
+
+(2) According to Richard Stallman, 1.6.1 is not GPL-compatible,
+    because its license has a choice of law clause.  According to
+    CNRI, however, Stallman's lawyer has told CNRI's lawyer that 1.6.1
+    is "not incompatible" with the GPL.
+
+Thanks to the many outside volunteers who have worked under Guido's
+direction to make these releases possible.
+
+
+B. TERMS AND CONDITIONS FOR ACCESSING OR OTHERWISE USING PYTHON
+===============================================================
+
+PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
+--------------------------------------------
+
+1. This LICENSE AGREEMENT is between the Python Software Foundation
+("PSF"), and the Individual or Organization ("Licensee") accessing and
+otherwise using this software ("Python") in source or binary form and
+its associated documentation.
+
+2. Subject to the terms and conditions of this License Agreement, PSF hereby
+grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
+analyze, test, perform and/or display publicly, prepare derivative works,
+distribute, and otherwise use Python alone or in any derivative version,
+provided, however, that PSF's License Agreement and PSF's notice of copyright,
+i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+2011, 2012, 2013, 2014, 2015, 2016 Python Software Foundation; All Rights
+Reserved" are retained in Python alone or in any derivative version prepared by
+Licensee.
+
+3. In the event Licensee prepares a derivative work that is based on
+or incorporates Python or any part thereof, and wants to make
+the derivative work available to others as provided herein, then
+Licensee hereby agrees to include in any such work a brief summary of
+the changes made to Python.
+
+4. PSF is making Python available to Licensee on an "AS IS"
+basis.  PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
+OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+7. Nothing in this License Agreement shall be deemed to create any
+relationship of agency, partnership, or joint venture between PSF and
+Licensee.  This License Agreement does not grant permission to use PSF
+trademarks or trade name in a trademark sense to endorse or promote
+products or services of Licensee, or any third party.
+
+8. By copying, installing or otherwise using Python, Licensee
+agrees to be bound by the terms and conditions of this License
+Agreement.
+
+
+BEOPEN.COM LICENSE AGREEMENT FOR PYTHON 2.0
+-------------------------------------------
+
+BEOPEN PYTHON OPEN SOURCE LICENSE AGREEMENT VERSION 1
+
+1. This LICENSE AGREEMENT is between BeOpen.com ("BeOpen"), having an
+office at 160 Saratoga Avenue, Santa Clara, CA 95051, and the
+Individual or Organization ("Licensee") accessing and otherwise using
+this software in source or binary form and its associated
+documentation ("the Software").
+
+2. Subject to the terms and conditions of this BeOpen Python License
+Agreement, BeOpen hereby grants Licensee a non-exclusive,
+royalty-free, world-wide license to reproduce, analyze, test, perform
+and/or display publicly, prepare derivative works, distribute, and
+otherwise use the Software alone or in any derivative version,
+provided, however, that the BeOpen Python License is retained in the
+Software, alone or in any derivative version prepared by Licensee.
+
+3. BeOpen is making the Software available to Licensee on an "AS IS"
+basis.  BEOPEN MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, BEOPEN MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF THE SOFTWARE WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+4. BEOPEN SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF THE
+SOFTWARE FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS
+AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THE SOFTWARE, OR ANY
+DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+5. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+6. This License Agreement shall be governed by and interpreted in all
+respects by the law of the State of California, excluding conflict of
+law provisions.  Nothing in this License Agreement shall be deemed to
+create any relationship of agency, partnership, or joint venture
+between BeOpen and Licensee.  This License Agreement does not grant
+permission to use BeOpen trademarks or trade names in a trademark
+sense to endorse or promote products or services of Licensee, or any
+third party.  As an exception, the "BeOpen Python" logos available at
+http://www.pythonlabs.com/logos.html may be used according to the
+permissions granted on that web page.
+
+7. By copying, installing or otherwise using the software, Licensee
+agrees to be bound by the terms and conditions of this License
+Agreement.
+
+
+CNRI LICENSE AGREEMENT FOR PYTHON 1.6.1
+---------------------------------------
+
+1. This LICENSE AGREEMENT is between the Corporation for National
+Research Initiatives, having an office at 1895 Preston White Drive,
+Reston, VA 20191 ("CNRI"), and the Individual or Organization
+("Licensee") accessing and otherwise using Python 1.6.1 software in
+source or binary form and its associated documentation.
+
+2. Subject to the terms and conditions of this License Agreement, CNRI
+hereby grants Licensee a nonexclusive, royalty-free, world-wide
+license to reproduce, analyze, test, perform and/or display publicly,
+prepare derivative works, distribute, and otherwise use Python 1.6.1
+alone or in any derivative version, provided, however, that CNRI's
+License Agreement and CNRI's notice of copyright, i.e., "Copyright (c)
+1995-2001 Corporation for National Research Initiatives; All Rights
+Reserved" are retained in Python 1.6.1 alone or in any derivative
+version prepared by Licensee.  Alternately, in lieu of CNRI's License
+Agreement, Licensee may substitute the following text (omitting the
+quotes): "Python 1.6.1 is made available subject to the terms and
+conditions in CNRI's License Agreement.  This Agreement together with
+Python 1.6.1 may be located on the Internet using the following
+unique, persistent identifier (known as a handle): 1895.22/1013.  This
+Agreement may also be obtained from a proxy server on the Internet
+using the following URL: http://hdl.handle.net/1895.22/1013".
+
+3. In the event Licensee prepares a derivative work that is based on
+or incorporates Python 1.6.1 or any part thereof, and wants to make
+the derivative work available to others as provided herein, then
+Licensee hereby agrees to include in any such work a brief summary of
+the changes made to Python 1.6.1.
+
+4. CNRI is making Python 1.6.1 available to Licensee on an "AS IS"
+basis.  CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON 1.6.1 WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+1.6.1 FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1,
+OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+7. This License Agreement shall be governed by the federal
+intellectual property law of the United States, including without
+limitation the federal copyright law, and, to the extent such
+U.S. federal law does not apply, by the law of the Commonwealth of
+Virginia, excluding Virginia's conflict of law provisions.
+Notwithstanding the foregoing, with regard to derivative works based
+on Python 1.6.1 that incorporate non-separable material that was
+previously distributed under the GNU General Public License (GPL), the
+law of the Commonwealth of Virginia shall govern this License
+Agreement only as to issues arising under or with respect to
+Paragraphs 4, 5, and 7 of this License Agreement.  Nothing in this
+License Agreement shall be deemed to create any relationship of
+agency, partnership, or joint venture between CNRI and Licensee.  This
+License Agreement does not grant permission to use CNRI trademarks or
+trade name in a trademark sense to endorse or promote products or
+services of Licensee, or any third party.
+
+8. By clicking on the "ACCEPT" button where indicated, or by copying,
+installing or otherwise using Python 1.6.1, Licensee agrees to be
+bound by the terms and conditions of this License Agreement.
+
+        ACCEPT
+
+
+CWI LICENSE AGREEMENT FOR PYTHON 0.9.0 THROUGH 1.2
+--------------------------------------------------
+
+Copyright (c) 1991 - 1995, Stichting Mathematisch Centrum Amsterdam,
+The Netherlands.  All rights reserved.
+
+Permission to use, copy, modify, and distribute this software and its
+documentation for any purpose and without fee is hereby granted,
+provided that the above copyright notice appear in all copies and that
+both that copyright notice and this permission notice appear in
+supporting documentation, and that the name of Stichting Mathematisch
+Centrum or CWI not be used in advertising or publicity pertaining to
+distribution of the software without specific, written prior
+permission.
+
+STICHTING MATHEMATISCH CENTRUM DISCLAIMS ALL WARRANTIES WITH REGARD TO
+THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
+FITNESS, IN NO EVENT SHALL STICHTING MATHEMATISCH CENTRUM BE LIABLE
+FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

+ 8 - 7
celery/utils/dispatch/license.txt

@@ -4,23 +4,23 @@ PyDispatcher License:
 
     Copyright (c) 2001-2003, Patrick K. O'Brien and Contributors
     All rights reserved.
-
+    
     Redistribution and use in source and binary forms, with or without
     modification, are permitted provided that the following conditions
     are met:
-
+    
         Redistributions of source code must retain the above copyright
         notice, this list of conditions and the following disclaimer.
-
+    
         Redistributions in binary form must reproduce the above
         copyright notice, this list of conditions and the following
         disclaimer in the documentation and/or other materials
         provided with the distribution.
-
+    
         The name of Patrick K. O'Brien, or the name of any Contributor,
-        may not be used to endorse or promote products derived from this
+        may not be used to endorse or promote products derived from this 
         software without specific prior written permission.
-
+    
     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
@@ -32,4 +32,5 @@ PyDispatcher License:
     HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
     STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
     ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-    OF THE POSSIBILITY OF SUCH DAMAGE.
+    OF THE POSSIBILITY OF SUCH DAMAGE. 
+

+ 0 - 272
celery/utils/dispatch/saferef.py

@@ -1,272 +0,0 @@
-# -*- coding: utf-8 -*-
-"""Safe weakrefs, originally from :pypi:`pyDispatcher`.
-
-Provides a way to safely weakref any function, including bound methods (which
-aren't handled by the core weakref module).
-"""
-import traceback
-import weakref
-
-__all__ = ['safe_ref']
-
-
-def safe_ref(target, on_delete=None):  # pragma: no cover
-    """Return a *safe* weak reference to a callable target.
-
-    Arguments:
-        target (Any): The object to be weakly referenced, if it's a
-            bound method reference, will create a :class:`BoundMethodWeakref`,
-            otherwise creates a simple :class:`weakref.ref`.
-
-        on_delete (Callable): If provided, will have a hard reference stored
-            to the callable to be called after the safe reference
-            goes out of scope with the reference object, (either a
-            :class:`weakref.ref` or a :class:`BoundMethodWeakref`) as argument.
-    """
-    if getattr(target, '__self__', None) is not None:
-        # Turn a bound method into a BoundMethodWeakref instance.
-        # Keep track of these instances for lookup by disconnect().
-        assert hasattr(target, '__func__'), \
-            """safe_ref target {0!r} has __self__, but no __func__: \
-            don't know how to create reference""".format(target)
-        return get_bound_method_weakref(target=target,
-                                        on_delete=on_delete)
-    if callable(on_delete):
-        return weakref.ref(target, on_delete)
-    else:
-        return weakref.ref(target)
-
-
-class BoundMethodWeakref:  # pragma: no cover
-    """'Safe' and reusable weak references to instance methods.
-
-    BoundMethodWeakref objects provide a mechanism for
-    referencing a bound method without requiring that the
-    method object itself (which is normally a transient
-    object) is kept alive.  Instead, the BoundMethodWeakref
-    object keeps weak references to both the object and the
-    function which together define the instance method.
-
-    Attributes:
-
-        key (str): the identity key for the reference, calculated
-            by the class's :meth:`calculate_key` method applied to the
-            target instance method.
-
-        deletion_methods (Sequence[Callable]): Callables taking
-            single argument, a reference to this object which
-            will be called when *either* the target object or
-            target function is garbage collected (i.e., when
-            this object becomes invalid).  These are specified
-            as the on_delete parameters of :func:`safe_ref` calls.
-
-        weak_self (weakref.ref): weak reference to the target object.
-
-        weak_fun (weakref.ref): weak reference to the target function
-
-        _all_instances (weakref.WeakValueDictionary):
-            class attribute pointing to all live
-            BoundMethodWeakref objects indexed by the class's
-            `calculate_key(target)` method applied to the target
-            objects.  This weak value dictionary is used to
-            short-circuit creation so that multiple references
-            to the same (object, function) pair produce the
-            same BoundMethodWeakref instance.
-    """
-
-    _all_instances = weakref.WeakValueDictionary()
-
-    def __new__(cls, target, on_delete=None, *arguments, **named):
-        """Create new instance or return current instance.
-
-        Note:
-            Basically this method of construction allows us to
-            short-circuit creation of references to already-
-            referenced instance methods.  The key corresponding
-            to the target is calculated, and if there's already
-            an existing reference, that is returned, with its
-            deletionMethods attribute updated.  Otherwise the
-            new instance is created and registered in the table
-            of already-referenced methods.
-        """
-        key = cls.calculate_key(target)
-        current = cls._all_instances.get(key)
-        if current is not None:
-            current.deletion_methods.append(on_delete)
-            return current
-        else:
-            base = super().__new__(cls)
-            cls._all_instances[key] = base
-            base.__init__(target, on_delete, *arguments, **named)
-            return base
-
-    def __init__(self, target, on_delete=None):
-        """Return a weak-reference-like instance for a bound method.
-
-        Arguments:
-            target (Any): The instance-method target for the weak
-                reference, must have `__self__` and `__func__` attributes
-                and be reconstructable via::
-
-                    target.__func__.__get__(target.__self__)
-
-                which is true of built-in instance methods.
-
-            on_delete (Callable): Optional callback which will be called
-                when this weak reference ceases to be valid
-                (i.e., either the object or the function is garbage
-                collected).  Should take a single argument,
-                which will be passed a pointer to this object.
-        """
-        def remove(weak, self=self):
-            """Set is_dead to true when method or instance is destroyed."""
-            methods = self.deletion_methods[:]
-            del(self.deletion_methods[:])
-            try:
-                del(self.__class__._all_instances[self.key])
-            except KeyError:
-                pass
-            for function in methods:
-                try:
-                    if callable(function):
-                        function(self)
-                except Exception as exc:
-                    try:
-                        traceback.print_exc()
-                    except AttributeError:
-                        print('Exception during saferef {0} cleanup function '
-                              '{1}: {2}'.format(self, function, exc))
-
-        self.deletion_methods = [on_delete]
-        self.key = self.calculate_key(target)
-        self.weak_self = weakref.ref(target.__self__, remove)
-        self.weak_fun = weakref.ref(target.__func__, remove)
-        self.self_name = str(target.__self__)
-        self.fun_name = str(target.__func__.__name__)
-
-    def calculate_key(cls, target):
-        """Calculate the reference key for this reference.
-
-        Returns:
-            Tuple[int, int]: Currently this is a two-tuple of
-                the `id()`'s of the target object and the target
-                function respectively.
-        """
-        return id(target.__self__), id(target.__func__)
-    calculate_key = classmethod(calculate_key)
-
-    def __str__(self):
-        return '{0}( {1}.{2} )'.format(
-            type(self).__name__,
-            self.self_name,
-            self.fun_name,
-        )
-
-    def __repr__(self):
-        return str(self)
-
-    def __bool__(self):
-        """Whether we're still a valid reference."""
-        return self() is not None
-    __nonzero__ = __bool__  # py2
-
-    def __call__(self):
-        """Return a strong reference to the bound method.
-
-        If the target cannot be retrieved, then will
-        return None, otherwise return a bound instance
-        method for our object and function.
-
-        Note:
-            You may call this method any number of times,
-            as it does not invalidate the reference.
-        """
-        target = self.weak_self()
-        if target is not None:
-            function = self.weak_fun()
-            if function is not None:
-                return function.__get__(target)
-
-
-class BoundNonDescriptorMethodWeakref(BoundMethodWeakref):  # pragma: no cover
-    """A specialized :class:`BoundMethodWeakref`.
-
-    For platforms where instance methods are not descriptors.
-
-    Warning:
-        It assumes that the function name and the target attribute name are
-        the same, instead of assuming that the function is a descriptor.
-        This approach is equally fast, but not 100% reliable because
-        functions can be stored on an attribute named differenty than the
-        function's name, such as in::
-
-            >>> class A:
-            ...     pass
-
-            >>> def foo(self):
-            ...     return 'foo'
-            >>> A.bar = foo
-
-        This shouldn't be a common use case.  So, on platforms where methods
-        aren't descriptors (e.g., Jython) this implementation has the
-        advantage of working in the most cases.
-    """
-
-    def __init__(self, target, on_delete=None):
-        """Return a weak-reference-like instance for a bound method.
-
-        Arguments:
-            target (Any): the instance-method target for the weak
-                reference, must have `__self__` and `__func__` attributes
-                and be reconstructable via::
-
-                    target.__func__.__get__(target.__self__)
-
-                which is true of built-in instance methods.
-
-            on_delete (Callable): Optional callback which will be called
-                when this weak reference ceases to be valid
-                (i.e., either the object or the function is garbage
-                collected).  Should take a single argument,
-                which will be passed a pointer to this object.
-        """
-        assert getattr(target.__self__, target.__name__) == target
-        super().__init__(target, on_delete)
-
-    def __call__(self):
-        """Return a strong reference to the bound method.
-
-        If the target cannot be retrieved, then will
-        return None, otherwise return a bound instance
-        method for our object and function.
-
-        Note:
-            You may call this method any number of times,
-            as it does not invalidate the reference.
-        """
-        target = self.weak_self()
-        if target is not None:
-            function = self.weak_fun()
-            if function is not None:
-                # Using curry() would be another option, but it erases the
-                # "signature" of the function.  That is, after a function is
-                # curried, the inspect module can't be used to determine how
-                # many arguments the function expects, nor what keyword
-                # arguments it supports, and pydispatcher needs this
-                # information.
-                return getattr(target, function.__name__)
-
-
-def get_bound_method_weakref(target, on_delete):  # pragma: no cover
-    """Instantiate the appropiate :class:`BoundMethodWeakRef`.
-
-    Depending on the details of the underlying class method
-    implementation.
-    """
-    if hasattr(target, '__get__'):
-        # target method is a descriptor, so the default implementation works:
-        return BoundMethodWeakref(target=target, on_delete=on_delete)
-    else:
-        # no luck, use the alternative implementation:
-        return BoundNonDescriptorMethodWeakref(
-            target=target, on_delete=on_delete)

+ 161 - 67
celery/utils/dispatch/signal.py

@@ -1,16 +1,23 @@
 # -*- coding: utf-8 -*-
 """Implementation of the Observer pattern."""
+import sys
+import threading
 import weakref
+import warnings
+from celery.exceptions import CDeprecationWarning
 from celery.local import PromiseProxy, Proxy
+from celery.utils.functional import fun_accepts_kwargs
 from celery.utils.log import get_logger
-from . import saferef
+try:
+    from weakref import WeakMethod
+except ImportError:
+    from .weakref_backports import WeakMethod  # noqa
 
 __all__ = ['Signal']
 
+PY3 = sys.version_info[0] >= 3
 logger = get_logger(__name__)
 
-WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
-
 
 def _make_id(target):  # pragma: no cover
     if isinstance(target, Proxy):
@@ -23,22 +30,41 @@ def _make_id(target):  # pragma: no cover
     return id(target)
 
 
-class Signal:  # pragma: no cover
-    """Observer pattern implementation.
+NONE_ID = _make_id(None)
+
+NO_RECEIVERS = object()
+
 
-    Arguments:
+class Signal:
+    """Create new signal.
+
+    Keyword Arguments:
         providing_args (List): A list of the arguments this signal can pass
             along in a :meth:`send` call.
+        use_caching (bool): Enable receiver cache.
+        name (str): Name of signal, used for debugging purposes.
     """
 
     #: Holds a dictionary of
     #: ``{receiverkey (id): weakref(receiver)}`` mappings.
     receivers = None
 
-    def __init__(self, providing_args=None):
+    def __init__(self, providing_args=None, use_caching=False, name=None):
         self.receivers = []
         self.providing_args = set(
             providing_args if providing_args is not None else [])
+        self.lock = threading.Lock()
+        self.use_caching = use_caching
+        self.name = name
+        # For convenience we create empty caches even if they are not used.
+        # A note about caching: if use_caching is defined, then for each
+        # distinct sender we cache the receivers that sender has in
+        # 'sender_receivers_cache'.  The cache is cleaned when .connect() or
+        # .disconnect() is called and populated on .send().
+        self.sender_receivers_cache = (
+            weakref.WeakKeyDictionary() if use_caching else {}
+        )
+        self._dead_receivers = False
 
     def _connect_proxy(self, fun, sender, weak, dispatch_uid):
         return self.connect(
@@ -54,8 +80,7 @@ class Signal:  # pragma: no cover
                 receive signals.  Receivers must be hashable objects.
 
                 if weak is :const:`True`, then receiver must be
-                weak-referenceable (more precisely :func:`saferef.safe_ref()`
-                must be able to create a reference to the receiver).
+                weak-referenceable.
 
                 Receivers must be able to accept keyword arguments.
 
@@ -64,7 +89,7 @@ class Signal:  # pragma: no cover
                 `dispatch_uid`.
 
             sender (Any): The sender to which the receiver should respond.
-                Must either be of type :class:`Signal`, or :const:`None` to
+                Must either be a Python object, or :const:`None` to
                 receive events from any sender.
 
             weak (bool): Whether to use weak references to the receiver.
@@ -79,39 +104,60 @@ class Signal:  # pragma: no cover
         def _handle_options(sender=None, weak=True, dispatch_uid=None):
 
             def _connect_signal(fun):
-                receiver = fun
-
-                if isinstance(sender, PromiseProxy):
-                    sender.__then__(
-                        self._connect_proxy, fun, sender, weak, dispatch_uid,
-                    )
-                    return fun
-
-                if dispatch_uid:
-                    lookup_key = (dispatch_uid, _make_id(sender))
-                else:
-                    lookup_key = (_make_id(receiver), _make_id(sender))
-
-                if weak:
-                    receiver = saferef.safe_ref(
-                        receiver, on_delete=self._remove_receiver,
-                    )
-
-                for r_key, _ in self.receivers:
-                    if r_key == lookup_key:
-                        break
-                else:
-                    self.receivers.append((lookup_key, receiver))
-
+                self._connect_signal(fun, sender, weak, dispatch_uid)
                 return fun
-
             return _connect_signal
 
         if args and callable(args[0]):
             return _handle_options(*args[1:], **kwargs)(args[0])
         return _handle_options(*args, **kwargs)
 
-    def disconnect(self, receiver=None, sender=None, weak=True,
+    def _connect_signal(self, receiver, sender, weak, dispatch_uid):
+        assert callable(receiver), 'Signal receivers must be callable'
+        if not fun_accepts_kwargs(receiver):
+            raise ValueError(
+                'Signal receiver must accept keyword arguments.')
+
+        if isinstance(sender, PromiseProxy):
+            sender.__then__(
+                self._connect_proxy, receiver, sender, weak, dispatch_uid,
+            )
+            return receiver
+
+        if dispatch_uid:
+            lookup_key = (dispatch_uid, _make_id(sender))
+        else:
+            lookup_key = (_make_id(receiver), _make_id(sender))
+
+        if weak:
+            ref = weakref.ref
+            receiver_object = receiver
+            # Check for bound methods
+            try:
+                receiver.__self__
+                receiver.__func__
+            except AttributeError:
+                pass
+            else:
+                ref = WeakMethod
+                receiver_object = receiver.__self__
+            if PY3:
+                receiver = ref(receiver)
+                weakref.finalize(receiver_object, self._remove_receiver)
+            else:
+                receiver = ref(receiver, self._remove_receiver)
+
+        with self.lock:
+            self._clear_dead_receivers()
+            for r_key, _ in self.receivers:
+                if r_key == lookup_key:
+                    break
+            else:
+                self.receivers.append((lookup_key, receiver))
+            self.sender_receivers_cache.clear()
+        return receiver
+
+    def disconnect(self, receiver=None, sender=None, weak=None,
                    dispatch_uid=None):
         """Disconnect receiver from sender for signal.
 
@@ -129,16 +175,29 @@ class Signal:  # pragma: no cover
             dispatch_uid (Hashable): The unique identifier of the receiver
                 to disconnect.
         """
+        if weak is not None:
+            warnings.warn(
+                'Passing `weak` to disconnect has no effect.',
+                CDeprecationWarning, stacklevel=2)
         if dispatch_uid:
             lookup_key = (dispatch_uid, _make_id(sender))
         else:
             lookup_key = (_make_id(receiver), _make_id(sender))
 
-        for index in range(len(self.receivers)):
-            (r_key, _) = self.receivers[index]
-            if r_key == lookup_key:
-                del self.receivers[index]
-                break
+        disconnected = False
+        with self.lock:
+            self._clear_dead_receivers()
+            for index in range(len(self.receivers)):
+                (r_key, _) = self.receivers[index]
+                if r_key == lookup_key:
+                    disconnected = True
+                    del self.receivers[index]
+                    break
+            self.sender_receivers_cache.clear()
+        return disconnected
+
+    def has_listeners(self, sender=None):
+        return bool(self._live_receivers(sender))
 
     def send(self, sender, **named):
         """Send signal from sender to all connected receivers.
@@ -156,53 +215,88 @@ class Signal:  # pragma: no cover
             List: of tuple pairs: `[(receiver, response), … ]`.
         """
         responses = []
-        if not self.receivers:
+        if not self.receivers or \
+                self.sender_receivers_cache.get(sender) is NO_RECEIVERS:
             return responses
 
-        for receiver in self._live_receivers(_make_id(sender)):
+        for receiver in self._live_receivers(sender):
             try:
                 response = receiver(signal=self, sender=sender, **named)
             except Exception as exc:  # pylint: disable=broad-except
+                if not hasattr(exc, '__traceback__'):
+                    exc.__traceback__ = sys.exc_info()[2]
                 logger.exception(
                     'Signal handler %r raised: %r', receiver, exc)
+                responses.append((receiver, exc))
             else:
                 responses.append((receiver, response))
         return responses
-
-    def _live_receivers(self, senderkey):
+    send_robust = send  # Compat with Django interface.
+
+    def _clear_dead_receivers(self):
+        # Warning: caller is assumed to hold self.lock
+        if self._dead_receivers:
+            self._dead_receivers = False
+            new_receivers = []
+            for r in self.receivers:
+                if isinstance(r[1], weakref.ReferenceType) and r[1]() is None:
+                    continue
+                new_receivers.append(r)
+            self.receivers = new_receivers
+
+    def _live_receivers(self, sender):
         """Filter sequence of receivers to get resolved, live receivers.
 
         This checks for weak references and resolves them, then returning only
         live receivers.
         """
-        none_senderkey = _make_id(None)
-        receivers = []
-
-        for (_, r_senderkey), receiver in self.receivers:
-            if r_senderkey == none_senderkey or r_senderkey == senderkey:
-                if isinstance(receiver, WEAKREF_TYPES):
-                    # Dereference the weak reference.
-                    receiver = receiver()
-                    if receiver is not None:
+        receivers = None
+        if self.use_caching and not self._dead_receivers:
+            receivers = self.sender_receivers_cache.get(sender)
+            # We could end up here with NO_RECEIVERS even if we do check this
+            # case in .send() prior to calling _Live_receivers()  due to
+            # concurrent .send() call.
+            if receivers is NO_RECEIVERS:
+                return []
+        if receivers is None:
+            with self.lock:
+                self._clear_dead_receivers()
+                senderkey = _make_id(sender)
+                receivers = []
+                for (receiverkey, r_senderkey), receiver in self.receivers:
+                    if r_senderkey == NONE_ID or r_senderkey == senderkey:
                         receivers.append(receiver)
-                else:
-                    receivers.append(receiver)
-        return receivers
+                if self.use_caching:
+                    if not receivers:
+                        self.sender_receivers_cache[sender] = NO_RECEIVERS
+                    else:
+                        # Note: we must cache the weakref versions.
+                        self.sender_receivers_cache[sender] = receivers
+        non_weak_receivers = []
+        for receiver in receivers:
+            if isinstance(receiver, weakref.ReferenceType):
+                # Dereference the weak reference.
+                receiver = receiver()
+                if receiver is not None:
+                    non_weak_receivers.append(receiver)
+            else:
+                non_weak_receivers.append(receiver)
+        return non_weak_receivers
 
-    def _remove_receiver(self, receiver):
+    def _remove_receiver(self, receiver=None):
         """Remove dead receivers from connections."""
-        to_remove = []
-        for key, connected_receiver in self.receivers:
-            if connected_receiver == receiver:
-                to_remove.append(key)
-        for key in to_remove:
-            for idx, (r_key, _) in enumerate(self.receivers):
-                if r_key == key:
-                    del self.receivers[idx]
+        # Mark that the self..receivers first has dead weakrefs. If so,
+        # we will clean those up in connect, disconnect and _live_receivers
+        # while holding self.lock.  Note that doing the cleanup here isn't a
+        # good idea, _remove_receiver() will be called as a side effect of
+        # garbage collection, and so the call can happen wh ile we are already
+        # holding self.lock.
+        self._dead_receivers = True
 
     def __repr__(self):
         """``repr(signal)``."""
-        return '<Signal: {0}>'.format(type(self).__name__)
+        return '<{0}: {1} providing_args={2!r}>'.format(
+            type(self).__name__, self.name, self.providing_args)
 
     def __str__(self):
         """``str(signal)``."""

+ 70 - 0
celery/utils/dispatch/weakref_backports.py

@@ -0,0 +1,70 @@
+"""Weakref compatibility.
+
+weakref_backports is a partial backport of the weakref module for Python
+versions below 3.4.
+
+Copyright (C) 2013 Python Software Foundation, see LICENSE.python for details.
+
+The following changes were made to the original sources during backporting:
+
+* Added ``self`` to ``super`` calls.
+* Removed ``from None`` when raising exceptions.
+"""
+from __future__ import absolute_import, unicode_literals
+from weakref import ref
+
+
+class WeakMethod(ref):
+    """Weak reference to bound method.
+
+    A custom :class:`weakref.ref` subclass which simulates a weak reference
+    to a bound method, working around the lifetime problem of bound methods.
+    """
+
+    __slots__ = '_func_ref', '_meth_type', '_alive', '__weakref__'
+
+    def __new__(cls, meth, callback=None):
+        try:
+            obj = meth.__self__
+            func = meth.__func__
+        except AttributeError:
+            raise TypeError(
+                "Argument should be a bound method, not {0}".format(
+                    type(meth)))
+
+        def _cb(arg):
+            # The self-weakref trick is needed to avoid creating a
+            # reference cycle.
+            self = self_wr()
+            if self._alive:
+                self._alive = False
+                if callback is not None:
+                    callback(self)
+        self = ref.__new__(cls, obj, _cb)
+        self._func_ref = ref(func, _cb)
+        self._meth_type = type(meth)
+        self._alive = True
+        self_wr = ref(self)
+        return self
+
+    def __call__(self):
+        obj = super(WeakMethod, self).__call__()
+        func = self._func_ref()
+        if obj is not None and func is not None:
+            return self._meth_type(func, obj)
+
+    def __eq__(self, other):
+        if not isinstance(other, WeakMethod):
+            return False
+        if not self._alive or not other._alive:
+            return self is other
+        return ref.__eq__(self, other) and self._func_ref == other._func_ref
+
+    def __ne__(self, other):
+        if not isinstance(other, WeakMethod):
+            return True
+        if not self._alive or not other._alive:
+            return self is not other
+        return ref.__ne__(self, other) or self._func_ref != other._func_ref
+
+    __hash__ = ref.__hash__

+ 41 - 4
celery/utils/functional.py

@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 """Functional-style utilties."""
+import inspect
 import sys
-
 from collections import UserList
 from functools import partial
 from inspect import FullArgSpec, getfullargspec, isfunction
@@ -24,7 +24,7 @@ __all__ = [
     'LRUCache', 'is_list', 'maybe_list', 'memoize', 'mlazy', 'noop',
     'first', 'firstmethod', 'chunks', 'padlist', 'mattrgetter', 'uniq',
     'regen', 'dictfilter', 'lazy', 'maybe_evaluate', 'head_from_fun',
-    'maybe',
+    'maybe', 'fun_accepts_kwargs',
 ]
 
 FUNHEAD_TEMPLATE = """
@@ -237,11 +237,28 @@ def _argsfromspec(spec: FullArgSpec, replace_defaults: bool=True) -> str:
         optional = list(zip(spec.args[-split:], defaults))
     else:
         positional, optional = spec.args, []
+
+    varargs = spec.varargs
+    varkw = spec.varkw
+    if spec.kwonlydefaults:
+        split = len(spec.kwonlydefaults)
+        kwonlyargs = spec.kwonlyargs[:-split]
+        if replace_defaults:
+            kwonlyargs_optional = [
+                (kw, i) for i, kw in enumerate(spec.kwonlyargs[-split:])]
+        else:
+            kwonlyargs_optional = list(spec.kwonlydefaults.items())
+    else:
+        kwonlyargs, kwonlyargs_optional = spec.kwonlyargs, []
+
     return ', '.join(filter(None, [
         ', '.join(positional),
         ', '.join('{0}={1}'.format(k, v) for k, v in optional),
-        '*{0}'.format(spec.varargs) if spec.varargs else None,
-        '**{0}'.format(spec.varkw) if spec.varkw else None,
+        '*{0}'.format(varargs) if varargs else None,
+        '**{0}'.format(varkw) if varkw else None,
+        '*' if (kwonlyargs or kwonlyargs_optional) and not varargs else None,
+        ', '.join(kwonlyargs) if kwonlyargs else None,
+        ', '.join('{0}="{1}"'.format(k, v) for k, v in kwonlyargs_optional),
     ]))
 
 
@@ -289,6 +306,26 @@ def fun_takes_argument(name: str, fun: Callable,
     )
 
 
+if hasattr(inspect, 'signature'):
+    def fun_accepts_kwargs(fun):
+        """Return true if function accepts arbitrary keyword arguments."""
+        return any(
+            p for p in inspect.signature(fun).parameters.values()
+            if p.kind == p.VAR_KEYWORD
+        )
+else:
+    def fun_accepts_kwargs(fun):  # noqa
+        """Return true if function accepts arbitrary keyword arguments."""
+        try:
+            argspec = inspect.getargspec(fun)
+        except TypeError:
+            try:
+                argspec = inspect.getargspec(fun.__call__)
+            except (TypeError, AttributeError):
+                return
+        return not argspec or argspec[2] is not None
+
+
 def maybe(typ, val):
     """Call typ on value if val is defined."""
     return typ(val) if val is not None else val

+ 2 - 0
celery/utils/log.py

@@ -101,6 +101,8 @@ def get_logger(name: Union[str, logging.Logger]) -> logging.Logger:
     if logging.root not in (l, l.parent) and l is not base_logger:
         l = _using_logger_parent(base_logger, l)
     return l
+
+
 task_logger = get_logger('celery.task')
 worker_logger = get_logger('celery.worker')
 

+ 2 - 0
celery/utils/nodenames.py

@@ -85,6 +85,8 @@ def _fmt_process_index(prefix: str='', default: str='0') -> str:
     from .log import current_process_index
     index = current_process_index()
     return '{0}{1}'.format(prefix, index) if index else default
+
+
 _fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
 
 

+ 107 - 29
celery/utils/saferepr.py

@@ -10,32 +10,49 @@ Differences from regular :func:`repr`:
 
 Very slow with no limits, super quick with limits.
 """
-from collections import Iterable, Mapping, deque, namedtuple
-
+import traceback
+from collections import Mapping, deque, namedtuple
 from decimal import Decimal
 from itertools import chain
 from numbers import Number
 from pprint import _recursion
 from typing import (
-    Any, Callable, Iterator, MutableSequence, Optional, Set, Sequence, Tuple,
+    Any, AnyStr, Callable, Iterator, Set, Sequence, Tuple,
 )
-
-from kombu.utils.encoding import bytes_to_str
-
-from .text import truncate, truncate_bytes
+from .text import truncate
 
 __all__ = ['saferepr', 'reprstream']
 
 # pylint: disable=redefined-outer-name
 # We cache globals and attribute lookups, so disable this warning.
 
+#: Node representing literal text.
+#:   - .value: is the literal text value
+#:   - .truncate: specifies if this text can be truncated, for things like
+#:                LIT_DICT_END this will be False, as we always display
+#:                the ending brackets, e.g:  [[[1, 2, 3, ...,], ..., ]]
+#:   - .direction: If +1 the current level is increment by one,
+#:                 if -1 the current level is decremented by one, and
+#:                 if 0 the current level is unchanged.
 _literal = namedtuple('_literal', ('value', 'truncate', 'direction'))
+
+#: Node representing a dictionary key.
 _key = namedtuple('_key', ('value',))
+
+#: Node representing quoted text, e.g. a string value.
 _quoted = namedtuple('_quoted', ('value',))
+
+
+#: Recursion protection.
 _dirty = namedtuple('_dirty', ('objid',))
 
+#: Types that are repsented as chars.
 chars_t = (bytes, str)
+
+#: Types that are regarded as safe to call repr on.
 safe_t = (Number,)
+
+#: Set types.
 set_t = (frozenset, set)
 
 LIT_DICT_START = _literal('{', False, +1)
@@ -51,8 +68,8 @@ LIT_TUPLE_END = _literal(')', False, -1)
 LIT_TUPLE_END_SV = _literal(',)', False, -1)
 
 
-def saferepr(o: Any, maxlen: Optional[int]=None,
-             maxlevels: int=3, seen: Optional[Set]=None) -> str:
+def saferepr(o: Any, maxlen: int = None,
+             maxlevels: int=3, seen: Set = None) -> str:
     """Safe version of :func:`repr`.
 
     Warning:
@@ -66,8 +83,8 @@ def saferepr(o: Any, maxlen: Optional[int]=None,
 
 
 def _chaindict(mapping: Mapping,
-               LIT_DICT_KVSEP: str=LIT_DICT_KVSEP,
-               LIT_LIST_SEP: str=LIT_LIST_SEP) -> Iterator[Any]:
+               LIT_DICT_KVSEP: _literal = LIT_DICT_KVSEP,
+               LIT_LIST_SEP: _literal = LIT_LIST_SEP) -> Iterator[Any]:
     size = len(mapping)
     for i, (k, v) in enumerate(mapping.items()):
         yield _key(k)
@@ -77,7 +94,8 @@ def _chaindict(mapping: Mapping,
             yield LIT_LIST_SEP
 
 
-def _chainlist(it: Sequence, LIT_LIST_SEP: str=LIT_LIST_SEP) -> Iterator[Any]:
+def _chainlist(it: Sequence,
+               LIT_LIST_SEP: _literal = LIT_LIST_SEP) -> Iterator[Any]:
     size = len(it)
     for i, v in enumerate(it):
         yield v
@@ -85,12 +103,69 @@ def _chainlist(it: Sequence, LIT_LIST_SEP: str=LIT_LIST_SEP) -> Iterator[Any]:
             yield LIT_LIST_SEP
 
 
-def _repr_empty_set(s: Any) -> str:
+def _repr_empty_set(s: Set) -> str:
     return '%s()' % (type(s).__name__,)
 
 
-def _saferepr(o: Any, maxlen: Optional[int]=None,
-              maxlevels: int=3, seen: Optional[Set]=None) -> str:
+def _safetext(val: AnyStr) -> str:
+    if isinstance(val, bytes):
+        try:
+            val.encode('utf-8')
+        except UnicodeDecodeError:
+            # is bytes with unrepresentable characters, attempt
+            # to convert back to unicode
+            return val.decode('utf-8', errors='backslashreplace')
+    return val
+
+
+def _format_binary_bytes(val: bytes, maxlen: int,
+                         ellipsis: str = '...') -> str:
+    if maxlen and len(val) > maxlen:
+        # we don't want to copy all the data, just take what we need.
+        chunk = memoryview(val)[:maxlen].tobytes()
+        return _bytes_prefix("'{0}{1}'".format(
+            _repr_binary_bytes(chunk), ellipsis))
+    return _bytes_prefix("'{0}'".format(_repr_binary_bytes(val)))
+
+
+def _bytes_prefix(s: str) -> str:
+    return 'b' + s
+
+
+def _repr_binary_bytes(val: bytes) -> str:
+    try:
+        return val.decode('utf-8')
+    except UnicodeDecodeError:
+        # possibly not unicode, but binary data so format as hex.
+        try:
+            ashex = val.hex
+        except AttributeError:  # pragma: no cover
+            # Python 3.4
+            return val.decode('utf-8', errors='replace')
+        else:
+            # Python 3.5+
+            return ashex()
+
+
+def _format_chars(val: AnyStr, maxlen: int) -> str:
+    if isinstance(val, bytes):  # pragma: no cover
+        return _format_binary_bytes(val, maxlen)
+    else:
+        return "'{0}'".format(truncate(val, maxlen))
+
+
+def _repr(obj: Any) -> str:
+    try:
+        return repr(obj)
+    except Exception as exc:
+        return '<Unrepresentable {0!r}{1:#x}: {2!r} {3!r}>'.format(
+            type(obj), id(obj), exc, '\n'.join(traceback.format_stack()))
+
+
+def _saferepr(o: Any,
+              maxlen: int = None,
+              maxlevels: int = 3,
+              seen: Set = None) -> str:
     stack = deque([iter([o])])
     for token, it in reprstream(stack, seen=seen, maxlevels=maxlevels):
         if maxlen is not None and maxlen <= 0:
@@ -104,11 +179,9 @@ def _saferepr(o: Any, maxlen: Optional[int]=None,
         elif isinstance(token, _key):
             val = saferepr(token.value, maxlen, maxlevels)
         elif isinstance(token, _quoted):
-            val = token.value
-            if isinstance(val, bytes):
-                val = "b'%s'" % (bytes_to_str(truncate_bytes(val, maxlen)),)
+            val = _format_chars(token.value, maxlen)
         else:
-            val = truncate(token, maxlen)
+            val = _safetext(truncate(token, maxlen))
         yield val
         if maxlen is not None:
             maxlen -= len(val)
@@ -119,8 +192,11 @@ def _saferepr(o: Any, maxlen: Optional[int]=None,
                 yield rest2.value
 
 
-def _reprseq(val: Any, lit_start: str, lit_end: str, builtin_type: Any,
-             chainer: Callable) -> Tuple[Any, Any, Any]:
+def _reprseq(val: Any,
+             lit_start: _literal,
+             lit_end: _literal,
+             builtin_type: Any,
+             chainer: Any) -> Tuple[Any, Any, Any]:
     if type(val) is builtin_type:  # noqa
         return lit_start, lit_end, chainer(val)
     return (
@@ -130,9 +206,11 @@ def _reprseq(val: Any, lit_start: str, lit_end: str, builtin_type: Any,
     )
 
 
-def reprstream(stack: MutableSequence, seen: Optional[Set]=None,
-               maxlevels: int=3, level: int=0,
-               isinstance: Callable=isinstance) -> Iterator[Any]:
+def reprstream(stack: deque,
+               seen: Set = None,
+               maxlevels: int = 3,
+               level: int = 0,
+               isinstance: Callable = isinstance) -> Iterator[Any]:
     """Streaming repr, yielding tokens."""
     seen = seen or set()
     append = stack.append
@@ -155,13 +233,13 @@ def reprstream(stack: MutableSequence, seen: Optional[Set]=None,
             elif isinstance(val, _key):
                 yield val, it
             elif isinstance(val, Decimal):
-                yield repr(val), it
+                yield _repr(val), it
             elif isinstance(val, safe_t):
                 yield str(val), it
             elif isinstance(val, chars_t):
                 yield _quoted(val), it
             elif isinstance(val, range):  # pragma: no cover
-                yield repr(val), it
+                yield _repr(val), it
             else:
                 if isinstance(val, set_t):
                     if not val:
@@ -175,15 +253,15 @@ def reprstream(stack: MutableSequence, seen: Optional[Set]=None,
                         LIT_TUPLE_START,
                         LIT_TUPLE_END_SV if len(val) == 1 else LIT_TUPLE_END,
                         _chainlist(val))
-                elif isinstance(val, Mapping):
+                elif isinstance(val, dict):
                     lit_start, lit_end, val = (
                         LIT_DICT_START, LIT_DICT_END, _chaindict(val))
-                elif isinstance(val, Iterable):
+                elif isinstance(val, list):
                     lit_start, lit_end, val = (
                         LIT_LIST_START, LIT_LIST_END, _chainlist(val))
                 else:
                     # other type of object
-                    yield repr(val), it
+                    yield _repr(val), it
                     continue
 
                 if maxlevels and level >= maxlevels:

+ 2 - 1
celery/utils/term.py

@@ -172,7 +172,8 @@ def supports_images() -> bool:
 
 def _read_as_base64(path: str) -> bytes:
     with codecs.open(path, mode='rb') as fh:
-        return base64.b64encode(fh.read())
+        encoded = base64.b64encode(fh.read())
+        return encoded if type(encoded) == 'str' else encoded.decode('ascii')
 
 
 def imgcat(path: str,

+ 2 - 0
celery/utils/text.py

@@ -53,6 +53,8 @@ def join(l: Sequence[str], sep: str = '\n') -> str:
 def ensure_sep(sep: str, s: str, n: int = 2) -> str:
     """Ensure text s ends in separator sep'."""
     return s + sep * (n - s.count(sep))
+
+
 ensure_newlines = partial(ensure_sep, '\n')
 
 

+ 1 - 0
celery/utils/threads.py

@@ -323,6 +323,7 @@ class _FastLocalStack(threading.local):
     def __len__(self) -> int:
         return len(self.stack)
 
+
 if USE_FAST_LOCALS:  # pragma: no cover
     LocalStack = _FastLocalStack
 else:

+ 2 - 0
celery/utils/time.py

@@ -141,6 +141,8 @@ class _Zone:
     @cached_property
     def utc(self) -> tzinfo:
         return self.get_timezone('UTC')
+
+
 timezone = _Zone()
 
 

+ 14 - 4
celery/worker/consumer/consumer.py

@@ -391,13 +391,25 @@ class Consumer:
             self.pool.flush()
 
     def connect(self):
-        """Establish the broker connection.
+        """Establish the broker connection used for consuming tasks.
 
         Retries establishing the connection if the
         :setting:`broker_connection_retry` setting is enabled
         """
-        conn = self.app.connection_for_read(heartbeat=self.amqheartbeat)
+        conn = self.connection_for_read(heartbeat=self.amqheartbeat)
+        if self.hub:
+            conn.transport.register_with_event_loop(conn.connection, self.hub)
+        return conn
+
+    def connection_for_read(self, heartbeat=None):
+        return self.ensure_connected(
+            self.app.connection_for_read(heartbeat=heartbeat))
 
+    def connection_for_write(self, heartbeat=None):
+        return self.ensure_connected(
+            self.app.connection_for_write(heartbeat=heartbeat))
+
+    def ensure_connected(self, conn):
         # Callback called for each retry while the connection
         # can't be established.
         def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
@@ -417,8 +429,6 @@ class Consumer:
             _error_handler, self.app.conf.broker_connection_max_retries,
             callback=maybe_shutdown,
         )
-        if self.hub:
-            conn.transport.register_with_event_loop(conn.connection, self.hub)
         return conn
 
     def _flush_events(self):

+ 11 - 4
celery/worker/consumer/events.py

@@ -14,8 +14,11 @@ class Events(bootsteps.StartStopStep):
 
     requires = (Connection,)
 
-    def __init__(self, c, task_events=True,
-                 without_heartbeat=False, without_gossip=False, **kwargs):
+    def __init__(self, c,
+                 task_events=True,
+                 without_heartbeat=False,
+                 without_gossip=False,
+                 **kwargs):
         self.groups = None if task_events else ['worker']
         self.send_events = (
             task_events or
@@ -29,8 +32,12 @@ class Events(bootsteps.StartStopStep):
         # flush events sent while connection was down.
         prev = self._close(c)
         dis = c.event_dispatcher = c.app.events.Dispatcher(
-            c.connect(), hostname=c.hostname,
-            enabled=self.send_events, groups=self.groups,
+            c.connection_for_write(),
+            hostname=c.hostname,
+            enabled=self.send_events,
+            groups=self.groups,
+            # we currently only buffer events when the event loop is enabled
+            # XXX This excludes eventlet/gevent, which should actually buffer.
             buffer_group=['task'] if c.hub else None,
             on_send_buffered=c.on_send_event_buffered if c.hub else None,
         )

+ 1 - 1
celery/worker/pidbox.py

@@ -104,7 +104,7 @@ class gPidbox(Pidbox):
         shutdown = self._node_shutdown = threading.Event()
         stopped = self._node_stopped = threading.Event()
         try:
-            with c.connect() as connection:
+            with c.connection_for_read() as connection:
                 info('pidbox: Connected to %s.', connection.as_uri())
                 self._do_reset(c, connection)
                 while not shutdown.is_set() and c.connection:

+ 2 - 3
celery/worker/request.py

@@ -50,7 +50,7 @@ def __optimize__():
     global _does_info
     _does_debug = logger.isEnabledFor(logging.DEBUG)
     _does_info = logger.isEnabledFor(logging.INFO)
-__optimize__()
+__optimize__()  # noqa: E305
 
 # Localize
 tz_or_local = timezone.tz_or_local
@@ -299,7 +299,7 @@ class Request:
         task_ready(self)
         if soft:
             warn('Soft time limit (%ss) exceeded for %s[%s]',
-                 soft, self.name, self.id)
+                 timeout, self.name, self.id)
             exc = SoftTimeLimitExceeded(soft)
         else:
             error('Hard time limit (%ss) exceeded for %s[%s]',
@@ -404,7 +404,6 @@ class Request:
             'args': self.argsrepr,
             'kwargs': self.kwargsrepr,
             'type': self.type,
-            'body': self.body,
             'hostname': self.hostname,
             'time_start': self.time_start,
             'acknowledged': self.acknowledged,

+ 6 - 6
celery/worker/state.py

@@ -217,22 +217,22 @@ class Persistent:
     def _sync_with(self, d):
         self._revoked_tasks.purge()
         d.update({
-            b'__proto__': 3,
-            b'zrevoked': self.compress(self._dumps(self._revoked_tasks)),
-            b'clock': self.clock.forward() if self.clock else 0,
+            str('__proto__'): 3,
+            str('zrevoked'): self.compress(self._dumps(self._revoked_tasks)),
+            str('clock'): self.clock.forward() if self.clock else 0,
         })
         return d
 
     def _merge_clock(self, d):
         if self.clock:
-            d[b'clock'] = self.clock.adjust(d.get(b'clock') or 0)
+            d[str('clock')] = self.clock.adjust(d.get(str('clock')) or 0)
 
     def _merge_revoked(self, d):
         try:
-            self._merge_revoked_v3(d[b'zrevoked'])
+            self._merge_revoked_v3(d[str('zrevoked')])
         except KeyError:
             try:
-                self._merge_revoked_v2(d.pop(b'revoked'))
+                self._merge_revoked_v2(d.pop(str('revoked')))
             except KeyError:
                 pass
         # purge expired items at boot

+ 1 - 1
docs/getting-started/brokers/rabbitmq.rst

@@ -38,7 +38,7 @@ see `Installing RabbitMQ on macOS`_.
     :command:`rabbitmqctl` then this blog post can help you identify
     the source of the problem:
 
-        http://somic.org/2009/02/19/on-rabbitmqctl-and-badrpcnodedown/
+        http://www.somic.org/2009/02/19/on-rabbitmqctl-and-badrpcnodedown/
 
 .. _rabbitmq-configuration:
 

+ 1 - 1
docs/getting-started/brokers/redis.rst

@@ -15,7 +15,7 @@ the ``celery[redis]`` :ref:`bundle <bundles>`:
 
 .. code-block:: console
 
-    $ pip install -U celery[redis]
+    $ pip install -U "celery[redis]"
 
 .. _broker-redis-configuration:
 

+ 4 - 4
docs/getting-started/first-steps-with-celery.rst

@@ -118,7 +118,7 @@ Let's create the file :file:`tasks.py`:
 
     from celery import Celery
 
-    app = Celery('tasks', broker='amqp://guest@localhost//')
+    app = Celery('tasks', broker='pyamqp://guest@localhost//')
 
     @app.task
     def add(x, y):
@@ -221,14 +221,14 @@ you choose to use a configuration module):
 
 .. code-block:: python
 
-    app = Celery('tasks', backend='rpc://', broker='amqp://')
+    app = Celery('tasks', backend='rpc://', broker='pyamqp://')
 
 Or if you want to use Redis as the result backend, but still use RabbitMQ as
 the message broker (a popular combination):
 
 .. code-block:: python
 
-    app = Celery('tasks', backend='redis://localhost', broker='amqp://')
+    app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')
 
 To read more about result backends please see :ref:`task-result-backends`.
 
@@ -337,7 +337,7 @@ current directory or on the Python path, it could look like this:
 
 .. code-block:: python
 
-    broker_url = 'amqp://'
+    broker_url = 'pyamqp://'
     result_backend = 'rpc://'
 
     task_serializer = 'json'

+ 2 - 2
docs/includes/installation.txt

@@ -67,7 +67,7 @@ Transports and Backends
 :``celery[sqs]``:
     for using Amazon SQS as a message transport (*experimental*).
 
-:``celery[tblib``]
+:``celery[tblib]``:
     for using the :setting:`task_remote_tracebacks` feature.
 
 :``celery[memcache]``:
@@ -103,7 +103,7 @@ Transports and Backends
 :``celery[consul]``:
     for using the Consul.io Key/Value store as a message transport or result backend (*experimental*).
 
-:``celery[django]``
+:``celery[django]``:
     specifies the lowest version possible for Django support.
 
     You should probably not use this in your requirements, it's here

+ 1 - 1
docs/includes/introduction.txt

@@ -1,4 +1,4 @@
-:Version: 4.0.0 (latentcall)
+:Version: 4.0.2 (latentcall)
 :Web: http://celeryproject.org/
 :Download: http://pypi.python.org/pypi/celery/
 :Source: https://github.com/celery/celery/

+ 0 - 11
docs/internals/reference/celery.utils.dispatch.saferef.rst

@@ -1,11 +0,0 @@
-==========================================================
- ``celery.utils.dispatch.saferef``
-==========================================================
-
-.. contents::
-    :local:
-.. currentmodule:: celery.utils.dispatch.saferef
-
-.. automodule:: celery.utils.dispatch.saferef
-    :members:
-    :undoc-members:

+ 11 - 0
docs/internals/reference/celery.utils.dispatch.weakref_backports.rst

@@ -0,0 +1,11 @@
+====================================================
+ ``celery.utils.dispatch.weakref_backports``
+====================================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.utils.dispatch.weakref_backports
+
+.. automodule:: celery.utils.dispatch.weakref_backports
+    :members:
+    :undoc-members:

+ 1 - 1
docs/internals/reference/index.rst

@@ -69,6 +69,6 @@
     celery.utils.text
     celery.utils.dispatch
     celery.utils.dispatch.signal
-    celery.utils.dispatch.saferef
+    celery.utils.dispatch.weakref_backports
     celery.platforms
     celery._state

+ 59 - 0
docs/sec/CELERYSA-0003.txt

@@ -0,0 +1,59 @@
+=========================================
+ CELERYSA-0003: Celery Security Advisory
+=========================================
+:contact: security@celeryproject.org
+:CVE id: TBA
+:date: 2016-12-08 05:00:00 p.m. PST
+
+Details
+=======
+
+:package: celery
+:vulnerability: Configuration Error
+:problem type: remote
+:risk: low
+:versions-affected: 4.0.0
+
+Description
+===========
+
+The default configuration in Celery 4.0.0 allowed for deserialization
+of pickled messages, even if the software is configured to send
+messages in the JSON format.
+
+The particular configuration in question is the `accept_content` setting,
+which by default was set to:
+
+    app.conf.accept_content = ['json', 'pickle', 'msgpack', 'yaml']
+
+The risk is still set to low considering that an attacker would require access
+to the message broker used to send messages to Celery workers.
+
+Systems affected
+================
+
+Users of Celery version 4.0.0 with no explicit accept_content setting set.
+
+Solution
+========
+
+To work around the issue you can explicitly configure the accept_content
+setting:
+
+    app.conf.accept_content = ['json']
+
+Or you can upgrade to the Celery 4.0.1 version:
+
+    $ pip install -U celery
+
+Distribution package maintainers are urged to provide their users
+with updated packages.
+
+Please direct questions to the celery-users mailing-list:
+http://groups.google.com/group/celery-users/,
+
+or if you're planning to report a new security related issue we request that
+you keep the information confidential by contacting
+security@celeryproject.org instead.
+
+Thank you!

+ 2 - 2
docs/userguide/calling.rst

@@ -413,11 +413,11 @@ Data transferred between clients and workers needs to be serialized,
 so every message in Celery has a ``content_type`` header that
 describes the serialization method used to encode it.
 
-The default serializer is :mod:`pickle`, but you can
+The default serializer is `JSON`, but you can
 change this using the :setting:`task_serializer` setting,
 or for each individual task, or even per message.
 
-There's built-in support for :mod:`pickle`, `JSON`, `YAML`
+There's built-in support for `JSON`, :mod:`pickle`, `YAML`
 and ``msgpack``, and you can also add your own custom serializers by registering
 them into the Kombu serializer registry
 

+ 23 - 10
docs/userguide/configuration.rst

@@ -888,6 +888,18 @@ Default: No limit.
 Maximum number of connections available in the Redis connection
 pool used for sending and retrieving results.
 
+.. setting:: redis_socket_connect_timeout
+
+``redis_socket_connect_timeout``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 5.0.1
+
+Default: :const:`None`
+
+Socket timeout for connections to Redis from the result backend
+in seconds (int/float)
+
 .. setting:: redis_socket_timeout
 
 ``redis_socket_timeout``
@@ -895,8 +907,8 @@ pool used for sending and retrieving results.
 
 Default: 5.0 seconds.
 
-Socket timeout for connections to Redis from the result backend
-in seconds (int/float)
+Socket timeout for reading/writing operations to the Redis server
+in seconds (int/float), used by the redis result backend.
 
 .. _conf-cassandra-result-backend:
 
@@ -1351,9 +1363,9 @@ Where ``myapp.tasks.route_task`` could be:
 
 .. code-block:: python
 
-    def route_task(self, name, args, kwargs, options, task=None, **kwargs):
-        if task == 'celery.ping':
-            return {'queue': 'default'}
+    def route_task(self, name, args, kwargs, options, task=None, **kw):
+            if task == 'celery.ping':
+                return {'queue': 'default'}
 
 ``route_task`` may return a string or a dict. A string then means
 it's a queue name in :setting:`task_queues`, a dict means it's a custom route.
@@ -1570,9 +1582,8 @@ is optional, and defaults to the specific transports default values.
 
 The transport part is the broker implementation to use, and the
 default is ``amqp``, (uses ``librabbitmq`` if installed or falls back to
-``pyamqp``). There are also many other choices, including;
-``redis``, ``beanstalk``, ``sqlalchemy``, ``django``, ``mongodb``,
-and ``couchdb``.
+``pyamqp``). There are also other choices available, including;
+``redis://``, ``sqs://``, and ``qpid://``.
 
 The scheme can also be a fully qualified path to your own transport
 implementation::
@@ -2281,9 +2292,11 @@ See :ref:`beat-entries`.
 ``beat_scheduler``
 ~~~~~~~~~~~~~~~~~~
 
-Default: ``"celery.beat:PersistentScheduer"``.
+Default: ``"celery.beat:PersistentScheduler"``.
 
-The default scheduler class.
+The default scheduler class. May be set to
+``"django_celery_beat.schedulers:DatabaseScheduler"`` for instance,
+if used alongside `django-celery-beat` extension.
 
 Can also be set via the :option:`celery beat -S` argument.
 

+ 15 - 8
docs/userguide/daemonizing.rst

@@ -380,14 +380,14 @@ This is an example systemd file:
   Group=celery
   EnvironmentFile=-/etc/conf.d/celery
   WorkingDirectory=/opt/celery
-  ExecStart=/bin/sh '${CELERY_BIN} multi start $CELERYD_NODES \
-    -A $CELERY_APP --logfile=${CELERYD_LOG_FILE} \
-    --pidfile=${CELERYD_PID_FILE} $CELERYD_OPTS'
-  ExecStop=/bin/sh '${CELERY_BIN} multi stopwait $CELERYD_NODES \
+  ExecStart=/bin/sh -c '${CELERY_BIN} multi start ${CELERYD_NODES} \
+    -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \
+    --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
+  ExecStop=/bin/sh -c '${CELERY_BIN} multi stopwait ${CELERYD_NODES} \
     --pidfile=${CELERYD_PID_FILE}'
-  ExecReload=/bin/sh '${CELERY_BIN} multi restart $CELERYD_NODES \
-    -A $CELERY_APP --pidfile=${CELERYD_PID_FILE} --logfile=${CELERYD_LOG_FILE} \
-    --loglevel="${CELERYD_LOG_LEVEL}" $CELERYD_OPTS'
+  ExecReload=/bin/sh -c '${CELERY_BIN} multi restart ${CELERYD_NODES} \
+    -A ${CELERY_APP} --pidfile=${CELERYD_PID_FILE} \
+    --logfile=${CELERYD_LOG_FILE} --loglevel=${CELERYD_LOG_LEVEL} ${CELERYD_OPTS}'
 
   [Install]
   WantedBy=multi-user.target
@@ -430,6 +430,12 @@ This is an example configuration for a Python project:
     # Absolute or relative path to the 'celery' command:
     CELERY_BIN="/usr/local/bin/celery"
     #CELERY_BIN="/virtualenvs/def/bin/celery"
+    
+    # App instance to use
+    # comment out this line if you don't use an app
+    CELERY_APP="proj"
+    # or fully qualified:
+    #CELERY_APP="proj.tasks:app"
 
     # How to call manage.py
     CELERYD_MULTI="multi"
@@ -440,8 +446,9 @@ This is an example configuration for a Python project:
     # - %n will be replaced with the first part of the nodename.
     # - %I will be replaced with the current child process index
     #   and is important when using the prefork pool to avoid race conditions.
-    CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
     CELERYD_PID_FILE="/var/run/celery/%n.pid"
+    CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
+    CELERYD_LOG_LEVEL="INFO"
 
 Running the worker with superuser privileges (root)
 ======================================================================

+ 6 - 6
docs/userguide/routing.rst

@@ -590,15 +590,15 @@ Routers
 A router is a function that decides the routing options for a task.
 
 All you need to define a new router is to define a function with
-the signature ``(name, args, kwargs, options, task=None, **kwargs)``:
+the signature ``(name, args, kwargs, options, task=None, **kw)``:
 
 .. code-block:: python
 
-    def route_task(name, args, kwargs, options, task=None, **kwargs):
-        if name == 'myapp.tasks.compress_video':
-            return {'exchange': 'video',
-                    'exchange_type': 'topic',
-                    'routing_key': 'video.compress'}
+    def route_task(name, args, kwargs, options, task=None, **kw):
+            if name == 'myapp.tasks.compress_video':
+                return {'exchange': 'video',
+                        'exchange_type': 'topic',
+                        'routing_key': 'video.compress'}
 
 If you return the ``queue`` key, it'll expand with the defined settings of
 that queue in :setting:`task_queues`:

+ 1 - 1
docs/userguide/security.rst

@@ -246,4 +246,4 @@ that can be used.
 
 .. rubric:: Footnotes
 
-.. [*] http://nadiana.com/python-pickle-insecure
+.. [*] https://blog.nelhage.com/2011/03/exploiting-pickle/

+ 1 - 1
docs/userguide/tasks.rst

@@ -38,7 +38,7 @@ as...
    a :sig:`SIGSEGV` (segmentation fault) or similar signals to the process.
 #. We assume that a system administrator deliberately killing the task
    does not want it to automatically restart.
-#. A task that allocates to much memory is in danger of triggering the kernel
+#. A task that allocates too much memory is in danger of triggering the kernel
    OOM killer, the same may happen again.
 #. A task that always fails when redelivered may cause a high-frequency
    message loop taking down the system.

+ 45 - 0
docs/userguide/testing.rst

@@ -193,6 +193,51 @@ Example:
             'result_backend': 'rpc',
         }
 
+
+``celery_parameters`` - Override to setup Celery test app parameters.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You can redefine this fixture to change the ``__init__`` parameters of test
+Celery app. In contrast to :func:`celery_config`, these are directly passed to
+when instantiating :class:`~celery.Celery`.
+
+The config returned by your fixture will then be used
+to configure the :func:`celery_app`, and :func:`celery_session_app` fixtures.
+
+Example:
+
+.. code-block:: python
+
+    @pytest.fixture(scope='session')
+    def celery_parameters():
+        return {
+            'task_cls':  my.package.MyCustomTaskClass,
+            'strict_typing': False,
+        }
+
+``celery_worker_parameters`` - Override to setup Celery worker parameters.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You can redefine this fixture to change the ``__init__`` parameters of test
+Celery workers. These are directly passed to
+:class:`~celery.worker.WorkController` when it is instantiated.
+
+The config returned by your fixture will then be used
+to configure the :func:`celery_worker`, and :func:`celery_session_worker`
+fixtures.
+
+Example:
+
+.. code-block:: python
+
+    @pytest.fixture(scope='session')
+    def celery_worker_parameters():
+        return {
+            'queues':  ('high-prio', 'low-prio'),
+            'exclude_queues': ('celery'),
+        }
+
+
 ``celery_enable_logging`` - Override to enable logging in embedded workers.
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

+ 1 - 1
docs/userguide/workers.rst

@@ -1057,7 +1057,7 @@ This command will gracefully shut down the worker remotely:
 .. code-block:: pycon
 
     >>> app.control.broadcast('shutdown') # shutdown all workers
-    >>> app.control.broadcast('shutdown, destination='worker1@example.com')
+    >>> app.control.broadcast('shutdown', destination='worker1@example.com')
 
 .. control:: ping
 

+ 7 - 7
docs/whatsnew-4.0.rst

@@ -222,7 +222,7 @@ to a model change, and you wish to cancel the task if the transaction is
 rolled back, or ensure the task is only executed after the changes have been
 written to the database.
 
-``transaction.on_commit`` enables you to solve this problem by adding
+``transaction.atomic`` enables you to solve this problem by adding
 the task as a callback to be called only when the transaction is committed.
 
 Example usage:
@@ -240,7 +240,7 @@ Example usage:
             article = Article.objects.create(**request.POST)
             # send this task only if the rest of the transaction succeeds.
             transaction.on_commit(partial(
-                send_article_created_notification.delay, article_id=article.pk)
+                send_article_created_notification.delay, article_id=article.pk))
             Log.objects.create(type=Log.ARTICLE_CREATED, object_pk=article.pk)
 
 Removed features
@@ -376,7 +376,7 @@ lowercase and some setting names have been renamed for consistency.
 
 This change is fully backwards compatible so you can still use the uppercase
 setting names, but we would like you to upgrade as soon as possible and
-you can this automatically using the :program:`celery upgrade settings`
+you can do this automatically using the :program:`celery upgrade settings`
 command:
 
 .. code-block:: console
@@ -401,7 +401,7 @@ and save a backup in :file:`proj/settings.py.orig`.
 
     .. code-block:: console
 
-        $ celery upgrade settings --django proj/settings.py
+        $ celery upgrade settings proj/settings.py --django
 
     After upgrading the settings file, you need to set the prefix explicitly
     in your ``proj/celery.py`` module:
@@ -551,7 +551,7 @@ these manually:
     class CustomTask(Task):
         def run(self):
             print('running')
-    app.tasks.register(CustomTask())
+    app.register_task(CustomTask())
 
 The best practice is to use custom task classes only for overriding
 general behavior, and then using the task decorator to realize the task:
@@ -760,7 +760,7 @@ some long-requested features:
 
             def run(self, fun, *args, **kwargs):
                 return fun(*args, **kwargs)
-        call_as_task = app.tasks.register(call_as_task())
+        call_as_task = app.register_task(call_as_task())
 
 - New ``argsrepr`` and ``kwargsrepr`` fields contain textual representations
   of the task arguments (possibly truncated) for use in logs, monitors, etc.
@@ -1576,7 +1576,7 @@ Execution Pools
 - **Eventlet/Gevent**: now enables AMQP heartbeat (Issue #3338).
 
 - **Eventlet/Gevent**: Fixed race condition leading to "simultaneous read"
-  errors (Issue #2812).
+  errors (Issue #2755).
 
 - **Prefork**: Prefork pool now uses ``poll`` instead of ``select`` where
   available (Issue #2373).

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz>dev
 billiard>=3.5.0.2,<3.6.0
-kombu>=4.0,<5.0
+kombu>=4.0.2,<5.0

+ 1 - 0
requirements/extras/sqs.txt

@@ -1 +1,2 @@
 boto>=2.13.3
+pycurl

+ 1 - 1
setup.cfg

@@ -18,7 +18,7 @@ ignore = D102,D104,D203,D105,D213
 [bdist_rpm]
 requires = pytz >= 2016.7
            billiard >= 3.5.0.2
-           kombu >= 4.0.0
+           kombu >= 4.0.2
 
 [wheel]
 universal = 1

+ 12 - 5
t/unit/app/test_amqp.py

@@ -125,7 +125,7 @@ class test_Queues:
         ex = Exchange('fff', 'fanout')
         q = Queues(default_exchange=ex)
         q.add(Queue('foo'))
-        assert q['foo'].exchange.name == ''
+        assert q['foo'].exchange.name == 'fff'
 
     def test_alias(self):
         q = Queues()
@@ -208,6 +208,9 @@ class test_AMQP:
         self.simple_message = self.app.amqp.as_task_v2(
             uuid(), 'foo', create_sent_event=True,
         )
+        self.simple_message_no_sent_event = self.app.amqp.as_task_v2(
+            uuid(), 'foo', create_sent_event=False,
+        )
 
     def test_kwargs_must_be_mapping(self):
         with pytest.raises(TypeError):
@@ -235,14 +238,16 @@ class test_AMQP:
     def test_send_task_message__properties(self):
         prod = Mock(name='producer')
         self.app.amqp.send_task_message(
-            prod, 'foo', self.simple_message, foo=1, retry=False,
+            prod, 'foo', self.simple_message_no_sent_event,
+            foo=1, retry=False,
         )
         assert prod.publish.call_args[1]['foo'] == 1
 
     def test_send_task_message__headers(self):
         prod = Mock(name='producer')
         self.app.amqp.send_task_message(
-            prod, 'foo', self.simple_message, headers={'x1x': 'y2x'},
+            prod, 'foo', self.simple_message_no_sent_event,
+            headers={'x1x': 'y2x'},
             retry=False,
         )
         assert prod.publish.call_args[1]['headers']['x1x'] == 'y2x'
@@ -250,7 +255,8 @@ class test_AMQP:
     def test_send_task_message__queue_string(self):
         prod = Mock(name='producer')
         self.app.amqp.send_task_message(
-            prod, 'foo', self.simple_message, queue='foo', retry=False,
+            prod, 'foo', self.simple_message_no_sent_event,
+            queue='foo', retry=False,
         )
         kwargs = prod.publish.call_args[1]
         assert kwargs['routing_key'] == 'foo'
@@ -271,7 +277,8 @@ class test_AMQP:
     def test_send_task_message__with_delivery_mode(self):
         prod = Mock(name='producer')
         self.app.amqp.send_task_message(
-            prod, 'foo', self.simple_message, delivery_mode=33, retry=False,
+            prod, 'foo', self.simple_message_no_sent_event,
+            delivery_mode=33, retry=False,
         )
         assert prod.publish.call_args[1]['delivery_mode'] == 33
 

+ 14 - 1
t/unit/app/test_app.py

@@ -30,6 +30,7 @@ class ObjectConfig:
     FOO = 1
     BAR = 2
 
+
 object_config = ObjectConfig()
 dict_config = dict(FOO=10, BAR=20)
 
@@ -844,7 +845,8 @@ class test_App:
         self.app.amqp = Mock(name='amqp')
         self.app.amqp.Producer.attach_mock(ContextMock(), 'return_value')
         self.app.send_task('foo', (1, 2), connection=connection, router=router)
-        self.app.amqp.Producer.assert_called_with(connection)
+        self.app.amqp.Producer.assert_called_with(
+            connection, auto_declare=False)
         self.app.amqp.send_task_message.assert_called_with(
             self.app.amqp.Producer(), 'foo',
             self.app.amqp.create_task_message())
@@ -896,6 +898,17 @@ class test_App:
         beat = self.app.Beat()
         assert isinstance(beat, Beat)
 
+    def test_registry_cls(self):
+
+        class TaskRegistry(self.app.registry_cls):
+            pass
+
+        class CustomCelery(type(self.app)):
+            registry_cls = TaskRegistry
+
+        app = CustomCelery(set_as_current=False)
+        assert isinstance(app.tasks, TaskRegistry)
+
 
 class test_defaults:
 

+ 7 - 0
t/unit/app/test_registry.py

@@ -1,5 +1,6 @@
 import pytest
 from celery.app.registry import _unpickle_task, _unpickle_task_v2
+from celery.exceptions import InvalidTaskError
 
 
 def returns():
@@ -23,6 +24,9 @@ class test_TaskRegistry:
 
     def setup(self):
         self.mytask = self.app.task(name='A', shared=False)(returns)
+        self.missing_name_task = self.app.task(
+            name=None, shared=False)(returns)
+        self.missing_name_task.name = None  # name is overridden with path
         self.myperiodic = self.app.task(
             name='B', shared=False, type='periodic',
         )(returns)
@@ -44,6 +48,9 @@ class test_TaskRegistry:
         self.assert_register_unregister_cls(r, self.mytask)
         self.assert_register_unregister_cls(r, self.myperiodic)
 
+        with pytest.raises(InvalidTaskError):
+            r.register(self.missing_name_task)
+
         r.register(self.myperiodic)
         r.unregister(self.myperiodic.name)
         assert self.myperiodic not in r

+ 8 - 0
t/unit/backends/test_redis.py

@@ -147,6 +147,8 @@ class test_RedisBackend:
             self.Backend(app=self.app)
 
     def test_url(self):
+        self.app.conf.redis_socket_timeout = 30.0
+        self.app.conf.redis_socket_connect_timeout = 100.0
         x = self.Backend(
             'redis://:bosco@vandelay.com:123//1', app=self.app,
         )
@@ -155,8 +157,12 @@ class test_RedisBackend:
         assert x.connparams['db'] == 1
         assert x.connparams['port'] == 123
         assert x.connparams['password'] == 'bosco'
+        assert x.connparams['socket_timeout'] == 30.0
+        assert x.connparams['socket_connect_timeout'] == 100.0
 
     def test_socket_url(self):
+        self.app.conf.redis_socket_timeout = 30.0
+        self.app.conf.redis_socket_connect_timeout = 100.0
         x = self.Backend(
             'socket:///tmp/redis.sock?virtual_host=/3', app=self.app,
         )
@@ -166,6 +172,8 @@ class test_RedisBackend:
                 redis.UnixDomainSocketConnection)
         assert 'host' not in x.connparams
         assert 'port' not in x.connparams
+        assert x.connparams['socket_timeout'] == 30.0
+        assert 'socket_connect_timeout' not in x.connparams
         assert x.connparams['db'] == 3
 
     def test_conf_raises_KeyError(self):

+ 1 - 0
t/unit/bin/test_base.py

@@ -11,6 +11,7 @@ from celery.bin.base import (
 class MyApp:
     user_options = {'preload': None}
 
+
 APP = MyApp()  # <-- Used by test_with_custom_app
 
 

+ 1 - 1
t/unit/bin/test_events.py

@@ -30,7 +30,7 @@ class MockCommand:
 
 def proctitle(prog, info=None):
     proctitle.last = (prog, info)
-proctitle.last = ()
+proctitle.last = ()  # noqa: E305
 
 
 class test_events:

+ 6 - 4
t/unit/conftest.py

@@ -5,7 +5,6 @@ import sys
 import threading
 import warnings
 
-from functools import partial
 from importlib import import_module
 
 from case import Mock
@@ -93,12 +92,15 @@ def reset_cache_backend_state(celery_app):
 def assert_signal_called(signal, **expected):
     """Context that verifes signal is called before exiting."""
     handler = Mock()
-    call_handler = partial(handler)
-    signal.connect(call_handler)
+
+    def on_call(**kwargs):
+        return handler(**kwargs)
+
+    signal.connect(on_call)
     try:
         yield handler
     finally:
-        signal.disconnect(call_handler)
+        signal.disconnect(on_call)
     handler.assert_called_with(signal=signal, **expected)
 
 

+ 2 - 0
t/unit/events/test_snapshot.py

@@ -10,6 +10,8 @@ class MockTimer:
     def call_repeatedly(self, secs, fun, *args, **kwargs):
         self.installed.append(fun)
         return Mock(name='TRef')
+
+
 timer = MockTimer()
 
 

+ 3 - 2
t/unit/security/test_key.py

@@ -1,5 +1,6 @@
 import pytest
 from celery.exceptions import SecurityError
+from celery.five import bytes_if_py2
 from celery.security.key import PrivateKey
 from . import CERT1, KEY1, KEY2
 from .case import SecurityCase
@@ -25,6 +26,6 @@ class test_PrivateKey(SecurityCase):
 
     def test_sign(self):
         pkey = PrivateKey(KEY1)
-        pkey.sign('test', b'sha1')
+        pkey.sign('test', bytes_if_py2('sha1'))
         with pytest.raises(ValueError):
-            pkey.sign('test', b'unknown')
+            pkey.sign('test', bytes_if_py2('unknown'))

+ 1 - 1
t/unit/tasks/test_canvas.py

@@ -533,7 +533,7 @@ class test_group(CanvasCase):
 
     def test_iter(self):
         g = group([self.add.s(i, i) for i in range(10)])
-        assert list(iter(g)) == g.tasks
+        assert list(iter(g)) == list(g.keys())
 
     @staticmethod
     def helper_test_get_delay(result):

+ 2 - 0
t/unit/tasks/test_context.py

@@ -14,6 +14,8 @@ def get_context_as_dict(ctx, getter=getattr):
             continue   # Ignore methods and other non-trivial types
         defaults[attr_name] = attr
     return defaults
+
+
 default_context = get_context_as_dict(Context())
 
 

+ 8 - 0
t/unit/tasks/test_result.py

@@ -15,6 +15,7 @@ from celery.result import (
     AsyncResult,
     EagerResult,
     ResultSet,
+    GroupResult,
     result_from_tuple,
     assert_will_not_block,
 )
@@ -555,6 +556,13 @@ class test_GroupResult:
         with pytest.raises(AttributeError):
             self.app.GroupResult.restore(ts.id, backend=object())
 
+    def test_restore_app(self):
+        subs = [MockAsyncResultSuccess(uuid(), app=self.app)]
+        ts = self.app.GroupResult(uuid(), subs)
+        ts.save()
+        restored = GroupResult.restore(ts.id, app=self.app)
+        assert restored.id == ts.id
+
     def test_join_native(self):
         backend = SimpleBackend()
         results = [self.app.AsyncResult(uuid(), backend=backend)

+ 3 - 4
t/unit/utils/test_dispatcher.py

@@ -37,7 +37,8 @@ class Callable:
     def a(self, val, **kwargs):
         return val
 
-a_signal = Signal(providing_args=['val'])
+
+a_signal = Signal(providing_args=['val'], use_caching=False)
 
 
 class test_Signal:
@@ -45,11 +46,9 @@ class test_Signal:
 
     def _testIsClean(self, signal):
         """Assert that everything has been cleaned up automatically"""
+        assert not signal.has_listeners()
         assert signal.receivers == []
 
-        # force cleanup just in case
-        signal.receivers = []
-
     def test_exact(self):
         a_signal.connect(receiver_1_arg, sender=self)
         try:

+ 84 - 5
t/unit/utils/test_functional.py

@@ -2,6 +2,7 @@ import pytest
 from kombu.utils.functional import lazy
 from celery.utils.functional import (
     DummyContext,
+    fun_accepts_kwargs,
     fun_takes_argument,
     head_from_fun,
     firstmethod,
@@ -158,15 +159,23 @@ class test_head_from_fun:
         g(1, 2)
         g(1, 2, kwarg=3)
 
+    def test_regression_3678(self):
+        local = {}
+        fun = ('def f(foo, *args, bar=""):'
+               '    return foo, args, bar')
+        exec(fun, {}, local)
+
+        g = head_from_fun(local['f'])
+        g(1)
+        g(1, 2, 3, 4, bar=100)
+        with pytest.raises(TypeError):
+            g(bar=100)
+
     def test_from_fun_with_hints(self):
         local = {}
         fun = ('def f_hints(x: int, y: int, kwarg: int=1):'
                '    pass')
-        try:
-            exec(fun, {}, local)
-        except SyntaxError:
-            # py2
-            return
+        exec(fun, {}, local)
         f_hints = local['f_hints']
 
         g = head_from_fun(f_hints)
@@ -175,6 +184,21 @@ class test_head_from_fun:
         g(1, 2)
         g(1, 2, kwarg=3)
 
+    def test_from_fun_forced_kwargs(self):
+        local = {}
+        fun = ('def f_kwargs(*, a, b="b", c=None):'
+               '    return')
+        exec(fun, {}, local)
+        f_kwargs = local['f_kwargs']
+
+        g = head_from_fun(f_kwargs)
+        with pytest.raises(TypeError):
+            g(1)
+
+        g(a=1)
+        g(a=1, b=2)
+        g(a=1, b=2, c=3)
+
 
 class test_fun_takes_argument:
 
@@ -222,3 +246,58 @@ def test_seq_concat_item(a, b, expected):
     res = seq_concat_item(a, b)
     assert type(res) is type(expected)  # noqa
     assert res == expected
+
+
+class StarKwargsCallable(object):
+
+    def __call__(self, **kwargs):
+        return 1
+
+
+class StarArgsStarKwargsCallable(object):
+
+    def __call__(self, *args, **kwargs):
+        return 1
+
+
+class StarArgsCallable(object):
+
+    def __call__(self, *args):
+        return 1
+
+
+class ArgsCallable(object):
+
+    def __call__(self, a, b):
+        return 1
+
+
+class ArgsStarKwargsCallable(object):
+
+    def __call__(self, a, b, **kwargs):
+        return 1
+
+
+class test_fun_accepts_kwargs:
+
+    @pytest.mark.parametrize('fun', [
+        lambda a, b, **kwargs: 1,
+        lambda *args, **kwargs: 1,
+        lambda foo=1, **kwargs: 1,
+        StarKwargsCallable(),
+        StarArgsStarKwargsCallable(),
+        ArgsStarKwargsCallable(),
+    ])
+    def test_accepts(self, fun):
+        assert fun_accepts_kwargs(fun)
+
+    @pytest.mark.parametrize('fun', [
+        lambda a: 1,
+        lambda a, b: 1,
+        lambda *args: 1,
+        lambda a, kw1=1, kw2=2: 1,
+        StarArgsCallable(),
+        ArgsCallable(),
+    ])
+    def test_rejects(self, fun):
+        assert not fun_accepts_kwargs(fun)

+ 0 - 86
t/unit/utils/test_saferef.py

@@ -1,86 +0,0 @@
-from celery.utils.dispatch.saferef import safe_ref
-
-
-class Class1:
-
-    def x(self):
-        pass
-
-
-def fun(obj):
-    pass
-
-
-class Class2:
-
-    def __call__(self, obj):
-        pass
-
-
-class test_safe_ref:
-
-    def setup(self):
-        ts = []
-        ss = []
-        for x in range(5000):
-            t = Class1()
-            ts.append(t)
-            s = safe_ref(t.x, self._closure)
-            ss.append(s)
-        ts.append(fun)
-        ss.append(safe_ref(fun, self._closure))
-        for x in range(30):
-            t = Class2()
-            ts.append(t)
-            s = safe_ref(t, self._closure)
-            ss.append(s)
-        self.ts = ts
-        self.ss = ss
-        self.closureCount = 0
-
-    def test_in(self):
-        """test_in
-
-        Test the "in" operator for safe references (cmp)
-
-        """
-        for t in self.ts[:50]:
-            assert safe_ref(t.x) in self.ss
-
-    def test_valid(self):
-        """test_value
-
-        Test that the references are valid (return instance methods)
-
-        """
-        for s in self.ss:
-            assert s()
-
-    def test_shortcircuit(self):
-        """test_shortcircuit
-
-        Test that creation short-circuits to reuse existing references
-
-        """
-        sd = {}
-        for s in self.ss:
-            sd[s] = 1
-        for t in self.ts:
-            if hasattr(t, 'x'):
-                assert safe_ref(t.x) in sd
-            else:
-                assert safe_ref(t) in sd
-
-    def test_representation(self):
-        """test_representation
-
-        Test that the reference object's representation works
-
-        XXX Doesn't currently check the results, just that no error
-            is raised
-        """
-        repr(self.ss[-1])
-
-    def _closure(self, ref):
-        """Dumb utility mechanism to increment deletion counter"""
-        self.closureCount += 1

Bu fark içinde çok fazla dosya değişikliği olduğu için bazı dosyalar gösterilmiyor