Browse Source

Merge branch 'master' into 4.0

Ask Solem 8 years ago
parent
commit
f1626033d6

+ 2 - 0
CONTRIBUTORS.txt

@@ -223,3 +223,5 @@ Marat Sharafutdinov, 2016/11/04
 Viktor Holmqvist, 2016/12/02
 Rick Wargo, 2016/12/02
 zhengxiaowai, 2016/12/07
+Michael Howitz, 2016/12/08
+Andreas Pelme, 2016/12/13

+ 46 - 0
Changelog

@@ -8,6 +8,52 @@ This document contains change notes for bugfix releases in
 the 4.0.x series (latentcall), please see :ref:`whatsnew-4.0` for
 an overview of what's new in Celery 4.0.
 
+.. _version-4.0.2:
+
+4.0.2
+=====
+:release-date: 2016-12-15 03:40 PM PST
+:release-by: Ask Solem
+
+- **Requirements**
+
+    - Now depends on :ref:`Kombu 4.0.2 <kombu:version-4.0.2>`.
+
+- **Tasks**: Fixed problem with JSON serialization of `group` (``keys must be
+    string`` error, Issue #3688).
+
+- **Worker**: Fixed JSON serialization issue when using ``inspect active``
+  and friends (Issue #3667).
+
+- **App**: Fixed saferef errors when using signals (Issue #3670).
+
+- **Prefork**: Fixed bug with pack requiring bytes argument
+  on Python 2.7.5 and earlier (Issue #3674).
+
+- **Tasks**: Saferepr did not handle unicode in bytestrings on Python 2
+  (Issue #3676).
+
+- **Testing**: Added new ``celery_worker_paremeters`` fixture.
+
+    Contributed by **Michael Howitz**.
+
+- **Tasks**: Added new ``app`` argument to ``GroupResult.restore``
+  (Issue #3669).
+
+    This makes the restore method behave the same way as the ``GroupResult``
+    constructor.
+
+    Contributed by **Andreas Pelme**.
+
+- **Tasks**: Fixed type checking crash when task takes ``*args`` on Python 3
+  (Issue #3678).
+
+- Documentation and examples improvements by:
+
+    - **BLAGA Razvan-Paul**
+    - **Michael Howitz**
+    - :github_user:`paradox41`
+
 .. _version-4.0.1:
 
 4.0.1

+ 0 - 3
celery/canvas.py

@@ -1123,9 +1123,6 @@ class group(Signature):
                                   chord=chord, root_id=root_id,
                                   parent_id=parent_id)
 
-    def __iter__(self):
-        return iter(self.tasks)
-
     def __repr__(self):
         if self.tasks:
             return remove_repeating_from_task(

+ 2 - 2
celery/concurrency/asynpool.py

@@ -812,7 +812,7 @@ class AsynPool(_pool.Pool):
             # inqueues are writable.
             body = dumps(tup, protocol=protocol)
             body_size = len(body)
-            header = pack('>I', body_size)
+            header = pack(b'>I', body_size)
             # index 1,0 is the job ID.
             job = get_job(tup[1][0])
             job._payload = buf_t(header), buf_t(body), body_size
@@ -1252,7 +1252,7 @@ class AsynPool(_pool.Pool):
                         protocol=HIGHEST_PROTOCOL):
         body = dumps((type_, args), protocol=protocol)
         size = len(body)
-        header = pack('>I', size)
+        header = pack(b'>I', size)
         return header, body, size
 
     @classmethod

+ 28 - 5
celery/contrib/pytest.py

@@ -74,15 +74,19 @@ def celery_session_app(request,
 
 
 @pytest.fixture(scope='session')
-def celery_session_worker(request, celery_session_app,
-                          celery_includes, celery_worker_pool):
+def celery_session_worker(request,
+                          celery_session_app,
+                          celery_includes,
+                          celery_worker_pool,
+                          celery_worker_parameters):
     # type: (Any, Celery, Sequence[str], str) -> WorkController
     """Session Fixture: Start worker that lives throughout test suite."""
     if not NO_WORKER:
         for module in celery_includes:
             celery_session_app.loader.import_task_module(module)
         with worker.start_worker(celery_session_app,
-                                 pool=celery_worker_pool) as w:
+                                 pool=celery_worker_pool,
+                                 **celery_worker_parameters) as w:
             yield w
 
 
@@ -137,6 +141,19 @@ def celery_parameters():
     return {}
 
 
+@pytest.fixture(scope='session')
+def celery_worker_parameters():
+    # type: () -> Mapping[str, Any]
+    """Redefine this fixture to change the init parameters of Celery workers.
+
+    This can be used e. g. to define queues the worker will consume tasks from.
+
+    The dict returned by your fixture will then be used
+    as parameters when instantiating :class:`~celery.worker.WorkController`.
+    """
+    return {}
+
+
 @pytest.fixture()
 def celery_app(request,
                celery_config,
@@ -155,13 +172,19 @@ def celery_app(request,
 
 
 @pytest.fixture()
-def celery_worker(request, celery_app, celery_includes, celery_worker_pool):
+def celery_worker(request,
+                  celery_app,
+                  celery_includes,
+                  celery_worker_pool,
+                  celery_worker_parameters):
     # type: (Any, Celery, Sequence[str], str) -> WorkController
     """Fixture: Start worker in a thread, stop it when the test returns."""
     if not NO_WORKER:
         for module in celery_includes:
             celery_app.loader.import_task_module(module)
-        with worker.start_worker(celery_app, pool=celery_worker_pool) as w:
+        with worker.start_worker(celery_app,
+                                 pool=celery_worker_pool,
+                                 **celery_worker_parameters) as w:
             yield w
 
 

+ 4 - 4
celery/result.py

@@ -867,11 +867,11 @@ class GroupResult(ResultSet):
         return self.results
 
     @classmethod
-    def restore(cls, id, backend=None):
+    def restore(cls, id, backend=None, app=None):
         """Restore previously saved group result."""
-        return (
-            backend or (cls.app.backend if cls.app else current_app.backend)
-        ).restore_group(id)
+        app = app or cls.app
+        backend = backend or (app.backend if app else current_app.backend)
+        return backend.restore_group(id)
 
 
 @Thenable.register

+ 7 - 20
celery/signals.py

@@ -86,28 +86,19 @@ task_sent = Signal(
 celeryd_init = Signal(
     name='celeryd_init',
     providing_args={'instance', 'conf', 'options'},
-    use_caching=False,
 )
 celeryd_after_setup = Signal(
     name='celeryd_after_setup',
     providing_args={'instance', 'conf'},
-    use_caching=False,
 )
 
 # - Worker
 import_modules = Signal(name='import_modules')
-worker_init = Signal(name='worker_init', use_caching=False)
-# use_caching must be false when sender is None.
-worker_process_init = Signal(
-    name='worker_process_init',
-    use_caching=False,
-)
-worker_process_shutdown = Signal(
-    name='worker_process_shutdown',
-    use_caching=False,
-)
-worker_ready = Signal(name='worker_ready', use_caching=False)
-worker_shutdown = Signal(name='worker_shutdown', use_caching=False)
+worker_init = Signal(name='worker_init')
+worker_process_init = Signal(name='worker_process_init')
+worker_process_shutdown = Signal(name='worker_process_shutdown')
+worker_ready = Signal(name='worker_ready')
+worker_shutdown = Signal(name='worker_shutdown')
 heartbeat_sent = Signal(name='heartbeat_sent')
 
 # - Logging
@@ -116,26 +107,23 @@ setup_logging = Signal(
     providing_args={
         'loglevel', 'logfile', 'format', 'colorize',
     },
-    use_caching=False,
 )
 after_setup_logger = Signal(
     name='after_setup_logger',
     providing_args={
         'logger', 'loglevel', 'logfile', 'format', 'colorize',
     },
-    use_caching=False,
 )
 after_setup_task_logger = Signal(
     name='after_setup_task_logger',
     providing_args={
         'logger', 'loglevel', 'logfile', 'format', 'colorize',
     },
-    use_caching=False,
 )
 
 # - Beat
-beat_init = Signal(name='beat_init', use_caching=False)
-beat_embedded_init = Signal(name='beat_embedded_init', use_caching=False)
+beat_init = Signal(name='beat_init')
+beat_embedded_init = Signal(name='beat_embedded_init')
 
 # - Eventlet
 eventlet_pool_started = Signal(name='eventlet_pool_started')
@@ -150,5 +138,4 @@ eventlet_pool_apply = Signal(
 user_preload_options = Signal(
     name='user_preload_options',
     providing_args={'app', 'options'},
-    use_caching=False,
 )

+ 1 - 1
celery/utils/dispatch/signal.py

@@ -52,7 +52,7 @@ class Signal(object):  # pragma: no cover
     #: ``{receiverkey (id): weakref(receiver)}`` mappings.
     receivers = None
 
-    def __init__(self, providing_args=None, use_caching=True, name=None):
+    def __init__(self, providing_args=None, use_caching=False, name=None):
         self.receivers = []
         self.providing_args = set(
             providing_args if providing_args is not None else [])

+ 5 - 3
celery/utils/functional.py

@@ -235,6 +235,8 @@ def _argsfromspec(spec, replace_defaults=True):
     else:
         positional, optional = spec.args, []
 
+    varargs = spec.varargs
+    varkw = spec.varkw
     if spec.kwonlydefaults:
         split = len(spec.kwonlydefaults)
         kwonlyargs = spec.kwonlyargs[:-split]
@@ -249,9 +251,9 @@ def _argsfromspec(spec, replace_defaults=True):
     return ', '.join(filter(None, [
         ', '.join(positional),
         ', '.join('{0}={1}'.format(k, v) for k, v in optional),
-        '*{0}'.format(spec.varargs) if spec.varargs else None,
-        '**{0}'.format(spec.varkw) if spec.varkw else None,
-        '*' if kwonlyargs or kwonlyargs_optional else None,
+        '*{0}'.format(varargs) if varargs else None,
+        '**{0}'.format(varkw) if varkw else None,
+        '*' if (kwonlyargs or kwonlyargs_optional) and not varargs else None,
         ', '.join(kwonlyargs) if kwonlyargs else None,
         ', '.join('{0}="{1}"'.format(k, v) for k, v in kwonlyargs_optional),
     ]))

+ 8 - 3
celery/utils/saferepr.py

@@ -139,8 +139,13 @@ def _format_binary_bytes(val, maxlen, ellipsis='...'):
     if maxlen and len(val) > maxlen:
         # we don't want to copy all the data, just take what we need.
         chunk = memoryview(val)[:maxlen].tobytes()
-        return "b'{0}{1}'".format(_repr_binary_bytes(chunk), ellipsis)
-    return "b'{0}'".format(_repr_binary_bytes(val))
+        return _bytes_prefix("'{0}{1}'".format(
+            _repr_binary_bytes(chunk), ellipsis))
+    return _bytes_prefix("'{0}'".format(_repr_binary_bytes(val)))
+
+
+def _bytes_prefix(s):
+    return 'b' + s if IS_PY3 else s
 
 
 def _repr_binary_bytes(val):
@@ -161,7 +166,7 @@ def _repr_binary_bytes(val):
 
 def _format_chars(val, maxlen):
     # type: (AnyStr, int) -> str
-    if IS_PY3 and isinstance(val, bytes):  # pragma: no cover
+    if isinstance(val, bytes):  # pragma: no cover
         return _format_binary_bytes(val, maxlen)
     else:
         return "'{0}'".format(truncate(val, maxlen))

+ 0 - 1
celery/worker/request.py

@@ -408,7 +408,6 @@ class Request(object):
             'args': self.argsrepr,
             'kwargs': self.kwargsrepr,
             'type': self.type,
-            'body': self.body,
             'hostname': self.hostname,
             'time_start': self.time_start,
             'acknowledged': self.acknowledged,

+ 22 - 0
docs/userguide/testing.rst

@@ -215,6 +215,28 @@ Example:
             'strict_typing': False,
         }
 
+``celery_worker_parameters`` - Override to setup Celery worker parameters.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+You can redefine this fixture to change the ``__init__`` parameters of test
+Celery workers. These are directly passed to
+:class:`~celery.worker.WorkController` when it is instantiated.
+
+The config returned by your fixture will then be used
+to configure the :func:`celery_worker`, and :func:`celery_session_worker`
+fixtures.
+
+Example:
+
+.. code-block:: python
+
+    @pytest.fixture(scope='session')
+    def celery_worker_parameters():
+        return {
+            'queues':  ('high-prio', 'low-prio'),
+            'exclude_queues': ('celery'),
+        }
+
 
 ``celery_enable_logging`` - Override to enable logging in embedded workers.
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz>dev
 billiard>=3.5.0.2,<3.6.0
-kombu>=4.0.1,<5.0
+kombu>=4.0.2,<5.0

+ 1 - 1
setup.cfg

@@ -18,7 +18,7 @@ ignore = D102,D104,D203,D105,D213
 [bdist_rpm]
 requires = pytz >= 2016.7
            billiard >= 3.5.0.2
-           kombu >= 4.0.1
+           kombu >= 4.0.2
 
 [wheel]
 universal = 1

+ 1 - 1
t/unit/tasks/test_canvas.py

@@ -534,7 +534,7 @@ class test_group(CanvasCase):
 
     def test_iter(self):
         g = group([self.add.s(i, i) for i in range(10)])
-        assert list(iter(g)) == g.tasks
+        assert list(iter(g)) == list(g.keys())
 
     @staticmethod
     def helper_test_get_delay(result):

+ 8 - 0
t/unit/tasks/test_result.py

@@ -19,6 +19,7 @@ from celery.result import (
     AsyncResult,
     EagerResult,
     ResultSet,
+    GroupResult,
     result_from_tuple,
     assert_will_not_block,
 )
@@ -615,6 +616,13 @@ class test_GroupResult:
         with pytest.raises(AttributeError):
             self.app.GroupResult.restore(ts.id, backend=object())
 
+    def test_restore_app(self):
+        subs = [MockAsyncResultSuccess(uuid(), app=self.app)]
+        ts = self.app.GroupResult(uuid(), subs)
+        ts.save()
+        restored = GroupResult.restore(ts.id, app=self.app)
+        assert restored.id == ts.id
+
     def test_join_native(self):
         backend = SimpleBackend()
         results = [self.app.AsyncResult(uuid(), backend=backend)

+ 16 - 10
t/unit/utils/test_functional.py

@@ -162,15 +162,25 @@ class test_head_from_fun:
         g(1, 2)
         g(1, 2, kwarg=3)
 
+    @skip.unless_python3()
+    def test_regression_3678(self):
+        local = {}
+        fun = ('def f(foo, *args, bar=""):'
+               '    return foo, args, bar')
+        exec(fun, {}, local)
+
+        g = head_from_fun(local['f'])
+        g(1)
+        g(1, 2, 3, 4, bar=100)
+        with pytest.raises(TypeError):
+            g(bar=100)
+
+    @skip.unless_python3()
     def test_from_fun_with_hints(self):
         local = {}
         fun = ('def f_hints(x: int, y: int, kwarg: int=1):'
                '    pass')
-        try:
-            exec(fun, {}, local)
-        except SyntaxError:
-            # py2
-            return
+        exec(fun, {}, local)
         f_hints = local['f_hints']
 
         g = head_from_fun(f_hints)
@@ -184,11 +194,7 @@ class test_head_from_fun:
         local = {}
         fun = ('def f_kwargs(*, a, b="b", c=None):'
                '    return')
-        try:
-            exec(fun, {}, local)
-        except SyntaxError:
-            # Python 2.
-            return
+        exec(fun, {}, local)
         f_kwargs = local['f_kwargs']
 
         g = head_from_fun(f_kwargs)

+ 5 - 1
t/unit/utils/test_saferepr.py

@@ -62,7 +62,8 @@ def old_repr(s):
             RE_OLD_SET_REPR.sub(
                 RE_OLD_SET_REPR_REPLACE,
                 RE_OLD_SET_CUSTOM_REPR.sub(
-                    RE_OLD_SET_CUSTOM_REPR_REPLACE, repr(s).replace("u'", "'"),
+                    RE_OLD_SET_CUSTOM_REPR_REPLACE,
+                    repr(s).replace("u'", "'"),
                 )
             ),
         ),
@@ -228,3 +229,6 @@ class test_saferepr:
             def __repr__(self):
                 raise KeyError('foo')
         assert 'Unrepresentable' in saferepr(O())
+
+    def test_bytes_with_unicode_py2_and_3(self):
+        assert saferepr([b'foo', 'a®rgs'.encode('utf-8')])