Browse Source

Fixes build and flakes

Ask Solem 8 years ago
parent
commit
c64bf07bfe

+ 2 - 2
celery/events/snapshot.py

@@ -57,13 +57,13 @@ class Polaroid(object):
 
     def cleanup(self):
         logger.debug('Cleanup: Running...')
-        self.cleanup_signal.send(None)
+        self.cleanup_signal.send(sender=self.state)
         self.on_cleanup()
 
     def shutter(self):
         if self.maxrate is None or self.maxrate.can_consume():
             logger.debug('Shutter: %s', self.state)
-            self.shutter_signal.send(self.state)
+            self.shutter_signal.send(sender=self.state)
             self.on_shutter(self.state)
 
     def capture(self):

+ 1 - 1
celery/local.py

@@ -310,7 +310,7 @@ class PromiseProxy(Proxy):
     promise will only evaluate it once.
     """
 
-    __slots__ = ('__pending__',)
+    __slots__ = ('__pending__', '__weakref__')
 
     def _get_current_object(self):
         try:

+ 20 - 7
celery/signals.py

@@ -86,19 +86,28 @@ task_sent = Signal(
 celeryd_init = Signal(
     name='celeryd_init',
     providing_args={'instance', 'conf', 'options'},
+    use_caching=False,
 )
 celeryd_after_setup = Signal(
     name='celeryd_after_setup',
     providing_args={'instance', 'conf'},
+    use_caching=False,
 )
 
 # - Worker
 import_modules = Signal(name='import_modules')
-worker_init = Signal(name='worker_init')
-worker_process_init = Signal(name='worker_process_init')
-worker_process_shutdown = Signal(name='worker_process_shutdown')
-worker_ready = Signal(name='worker_ready')
-worker_shutdown = Signal(name='worker_shutdown')
+worker_init = Signal(name='worker_init', use_caching=False)
+# use_caching must be false when sender is None.
+worker_process_init = Signal(
+    name='worker_process_init',
+    use_caching=False,
+)
+worker_process_shutdown = Signal(
+    name='worker_process_shutdown',
+    use_caching=False,
+)
+worker_ready = Signal(name='worker_ready', use_caching=False)
+worker_shutdown = Signal(name='worker_shutdown', use_caching=False)
 heartbeat_sent = Signal(name='heartbeat_sent')
 
 # - Logging
@@ -107,23 +116,26 @@ setup_logging = Signal(
     providing_args={
         'loglevel', 'logfile', 'format', 'colorize',
     },
+    use_caching=False,
 )
 after_setup_logger = Signal(
     name='after_setup_logger',
     providing_args={
         'logger', 'loglevel', 'logfile', 'format', 'colorize',
     },
+    use_caching=False,
 )
 after_setup_task_logger = Signal(
     name='after_setup_task_logger',
     providing_args={
         'logger', 'loglevel', 'logfile', 'format', 'colorize',
     },
+    use_caching=False,
 )
 
 # - Beat
-beat_init = Signal(name='beat_init')
-beat_embedded_init = Signal(name='beat_embedded_init')
+beat_init = Signal(name='beat_init', use_caching=False)
+beat_embedded_init = Signal(name='beat_embedded_init', use_caching=False)
 
 # - Eventlet
 eventlet_pool_started = Signal(name='eventlet_pool_started')
@@ -138,4 +150,5 @@ eventlet_pool_apply = Signal(
 user_preload_options = Signal(
     name='user_preload_options',
     providing_args={'app', 'options'},
+    use_caching=False,
 )

+ 0 - 1
celery/utils/dispatch/signal.py

@@ -119,7 +119,6 @@ class Signal(object):  # pragma: no cover
             raise ValueError(
                 'Signal receiver must accept keyword arguments.')
 
-
         if isinstance(sender, PromiseProxy):
             sender.__then__(
                 self._connect_proxy, receiver, sender, weak, dispatch_uid,

+ 2 - 0
celery/utils/functional.py

@@ -286,12 +286,14 @@ def fun_takes_argument(name, fun, position=None):
 
 if IS_PY3:
     def fun_accepts_kwargs(fun):
+        """Return true if function accepts arbitrary keyword arguments."""
         return any(
             p for p in inspect.signature(fun).parameters.values()
             if p.kind == p.VAR_KEYWORD
         )
 else:
     def fun_accepts_kwargs(fun):  # noqa
+        """Return true if function accepts arbitrary keyword arguments."""
         try:
             argspec = inspect.getargspec(fun)
         except TypeError:

+ 0 - 1
celery/worker/consumer/consumer.py

@@ -436,7 +436,6 @@ class Consumer(object):
             conn.transport.register_with_event_loop(conn.connection, self.hub)
         return conn
 
-
     def _flush_events(self):
         if self.event_dispatcher:
             self.event_dispatcher.flush()

+ 11 - 0
docs/internals/reference/celery.utils.dispatch.weakref_backports.rst

@@ -0,0 +1,11 @@
+====================================================
+ ``celery.utils.dispatch.weakref_backports``
+====================================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.utils.dispatch.weakref_backports
+
+.. automodule:: celery.utils.dispatch.weakref_backports
+    :members:
+    :undoc-members:

+ 1 - 1
docs/internals/reference/index.rst

@@ -69,6 +69,6 @@
     celery.utils.text
     celery.utils.dispatch
     celery.utils.dispatch.signal
-    celery.utils.dispatch.saferef
+    celery.utils.dispatch.weakref_backports
     celery.platforms
     celery._state

+ 2 - 1
t/unit/app/test_app.py

@@ -847,7 +847,8 @@ class test_App:
         self.app.amqp = Mock(name='amqp')
         self.app.amqp.Producer.attach_mock(ContextMock(), 'return_value')
         self.app.send_task('foo', (1, 2), connection=connection, router=router)
-        self.app.amqp.Producer.assert_called_with(connection)
+        self.app.amqp.Producer.assert_called_with(
+            connection, auto_declare=False)
         self.app.amqp.send_task_message.assert_called_with(
             self.app.amqp.Producer(), 'foo',
             self.app.amqp.create_task_message())

+ 6 - 4
t/unit/conftest.py

@@ -7,7 +7,6 @@ import sys
 import threading
 import warnings
 
-from functools import partial
 from importlib import import_module
 
 from case import Mock
@@ -95,12 +94,15 @@ def reset_cache_backend_state(celery_app):
 def assert_signal_called(signal, **expected):
     """Context that verifes signal is called before exiting."""
     handler = Mock()
-    call_handler = partial(handler)
-    signal.connect(call_handler)
+
+    def on_call(**kwargs):
+        return handler(**kwargs)
+
+    signal.connect(on_call)
     try:
         yield handler
     finally:
-        signal.disconnect(call_handler)
+        signal.disconnect(on_call)
     handler.assert_called_with(signal=signal, **expected)
 
 

+ 1 - 1
t/unit/utils/test_dispatcher.py

@@ -38,7 +38,7 @@ class Callable(object):
     def a(self, val, **kwargs):
         return val
 
-a_signal = Signal(providing_args=['val'])
+a_signal = Signal(providing_args=['val'], use_caching=False)
 
 
 class test_Signal:

+ 56 - 0
t/unit/utils/test_functional.py

@@ -4,6 +4,7 @@ from kombu.utils.functional import lazy
 from celery.five import range, nextfun
 from celery.utils.functional import (
     DummyContext,
+    fun_accepts_kwargs,
     fun_takes_argument,
     head_from_fun,
     firstmethod,
@@ -224,3 +225,58 @@ def test_seq_concat_item(a, b, expected):
     res = seq_concat_item(a, b)
     assert type(res) is type(expected)  # noqa
     assert res == expected
+
+
+class StarKwargsCallable(object):
+
+    def __call__(self, **kwargs):
+        return 1
+
+
+class StarArgsStarKwargsCallable(object):
+
+    def __call__(self, *args, **kwargs):
+        return 1
+
+
+class StarArgsCallable(object):
+
+    def __call__(self, *args):
+        return 1
+
+
+class ArgsCallable(object):
+
+    def __call__(self, a, b):
+        return 1
+
+
+class ArgsStarKwargsCallable(object):
+
+    def __call__(self, a, b, **kwargs):
+        return 1
+
+
+class test_fun_accepts_kwargs:
+
+    @pytest.mark.parametrize('fun', [
+        lambda a, b, **kwargs: 1,
+        lambda *args, **kwargs: 1,
+        lambda foo=1, **kwargs: 1,
+        StarKwargsCallable,
+        StarArgsStarKwargsCallable,
+        ArgsStarKwargsCallable,
+    ])
+    def test_accepts(self, fun):
+        assert fun_accepts_kwargs(fun)
+
+    @pytest.mark.parametrize('fun', [
+        lambda a: 1,
+        lambda a, b: 1,
+        lambda *args: 1,
+        lambda a, kw1=1, kw2=2: 1,
+        StarArgsCallable,
+        ArgsCallable,
+    ])
+    def test_rejects(self, fun):
+        assert not fun_accepts_kwargs(fun)