Ask Solem преди 11 години
родител
ревизия
fecbdc5751

+ 1 - 1
celery/__init__.py

@@ -114,7 +114,6 @@ def maybe_patch_concurrency(argv=sys.argv,
         except KeyError:
         except KeyError:
             pass
             pass
         else:
         else:
-            print('PATCHING CONCURRENCY USING %r' % (patcher, ))
             patcher()
             patcher()
         # set up eventlet/gevent environments ASAP.
         # set up eventlet/gevent environments ASAP.
         from celery import concurrency
         from celery import concurrency
@@ -140,6 +139,7 @@ old_module, new_module = recreate_module(  # pragma: no cover
     __homepage__=__homepage__, __docformat__=__docformat__,
     __homepage__=__homepage__, __docformat__=__docformat__,
     VERSION=VERSION, SERIES=SERIES, VERSION_BANNER=VERSION_BANNER,
     VERSION=VERSION, SERIES=SERIES, VERSION_BANNER=VERSION_BANNER,
     maybe_patch_concurrency=maybe_patch_concurrency,
     maybe_patch_concurrency=maybe_patch_concurrency,
+    _find_option_with_arg=_find_option_with_arg,
 )
 )
 
 
 
 

+ 0 - 1
celery/concurrency/eventlet.py

@@ -8,7 +8,6 @@
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-import os
 import sys
 import sys
 
 
 __all__ = ['TaskPool']
 __all__ = ['TaskPool']

+ 1 - 2
celery/contrib/migrate.py

@@ -15,7 +15,6 @@ from itertools import cycle, islice
 
 
 from kombu import eventloop, Queue
 from kombu import eventloop, Queue
 from kombu.common import maybe_declare
 from kombu.common import maybe_declare
-from kombu.exceptions import StdChannelError
 from kombu.utils.encoding import ensure_bytes
 from kombu.utils.encoding import ensure_bytes
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
@@ -288,7 +287,7 @@ def start_filter(app, conn, filter, limit=None, timeout=1.0,
             _, mcount, _ = queue(consumer.channel).queue_declare(passive=True)
             _, mcount, _ = queue(consumer.channel).queue_declare(passive=True)
             if mcount:
             if mcount:
                 state.total_apx += mcount
                 state.total_apx += mcount
-        except conn.channel_errors + (StdChannelError, ):
+        except conn.channel_errors:
             pass
             pass
 
 
     # start migrating messages.
     # start migrating messages.

+ 1 - 1
celery/tests/app/test_control.py

@@ -91,7 +91,7 @@ class test_inspect(AppCase):
 
 
     @with_mock_broadcast
     @with_mock_broadcast
     def test_hello(self):
     def test_hello(self):
-        self.i.hello()
+        self.i.hello('george@vandelay.com')
         self.assertIn('hello', MockMailbox.sent)
         self.assertIn('hello', MockMailbox.sent)
 
 
     @with_mock_broadcast
     @with_mock_broadcast

+ 1 - 1
celery/tests/bin/test_base.py

@@ -222,7 +222,7 @@ class test_Command(AppCase):
     def test_setup_app_no_respect(self):
     def test_setup_app_no_respect(self):
         cmd = MockCommand()
         cmd = MockCommand()
         cmd.respects_app_option = False
         cmd.respects_app_option = False
-        with patch('celery.Celery') as cp:
+        with patch('celery.bin.base.Celery') as cp:
             cmd.setup_app_from_commandline(['--app=x.y:z'])
             cmd.setup_app_from_commandline(['--app=x.y:z'])
             self.assertTrue(cp.called)
             self.assertTrue(cp.called)
 
 

+ 0 - 5
celery/tests/bin/test_celery.py

@@ -40,11 +40,6 @@ class test__main__(AppCase):
             self.assertIn('command is deprecated', stdout.getvalue())
             self.assertIn('command is deprecated', stdout.getvalue())
             self.assertIn('YADDA YADDA', stdout.getvalue())
             self.assertIn('YADDA YADDA', stdout.getvalue())
 
 
-    def test_maybe_patch_concurrency(self):
-        with patch('celery.maybe_patch_concurrency') as _mpc:
-            __main__.maybe_patch_concurrency()
-            _mpc.assert_called_with(sys.argv, ['-P'], ['--pool'])
-
     def test_main(self):
     def test_main(self):
         with patch('celery.__main__.maybe_patch_concurrency') as mpc:
         with patch('celery.__main__.maybe_patch_concurrency') as mpc:
             with patch('celery.bin.celery.main') as main:
             with patch('celery.bin.celery.main') as main:

+ 0 - 1
celery/tests/concurrency/test_eventlet.py

@@ -1,6 +1,5 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-import os
 import sys
 import sys
 
 
 from nose import SkipTest
 from nose import SkipTest

+ 0 - 3
celery/tests/concurrency/test_gevent.py

@@ -1,8 +1,5 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-import os
-import sys
-
 from nose import SkipTest
 from nose import SkipTest
 from mock import Mock
 from mock import Mock
 
 

+ 3 - 2
celery/tests/contrib/test_migrate.py

@@ -3,8 +3,9 @@ from __future__ import absolute_import, unicode_literals
 from contextlib import contextmanager
 from contextlib import contextmanager
 from mock import patch
 from mock import patch
 
 
+from amqp import ChannelError
+
 from kombu import Connection, Producer, Queue, Exchange
 from kombu import Connection, Producer, Queue, Exchange
-from kombu.exceptions import StdChannelError
 
 
 from kombu.transport.virtual import QoS
 from kombu.transport.virtual import QoS
 
 
@@ -300,7 +301,7 @@ class test_migrate_tasks(AppCase):
 
 
             def effect(*args, **kwargs):
             def effect(*args, **kwargs):
                 if kwargs.get('passive'):
                 if kwargs.get('passive'):
-                    raise StdChannelError()
+                    raise ChannelError('some channel error')
                 return 0, 3, 0
                 return 0, 3, 0
             qd.side_effect = effect
             qd.side_effect = effect
             migrate_tasks(x, y, app=self.app)
             migrate_tasks(x, y, app=self.app)

+ 1 - 1
celery/tests/utils/test_platforms.py

@@ -7,6 +7,7 @@ import signal
 
 
 from mock import Mock, patch, call
 from mock import Mock, patch, call
 
 
+from celery import _find_option_with_arg
 from celery import platforms
 from celery import platforms
 from celery.five import open_fqdn
 from celery.five import open_fqdn
 from celery.platforms import (
 from celery.platforms import (
@@ -27,7 +28,6 @@ from celery.platforms import (
     LockFailed,
     LockFailed,
     setgroups,
     setgroups,
     _setgroups_hack,
     _setgroups_hack,
-    _find_option_with_arg,
     close_open_fds,
     close_open_fds,
 )
 )
 
 

+ 1 - 1
celery/tests/worker/test_consumer.py

@@ -247,7 +247,7 @@ class test_Mingle(AppCase):
             }
             }
 
 
             mingle.start(c)
             mingle.start(c)
-            I.hello.assert_called_with()
+            I.hello.assert_called_with(c.hostname, worker_state.revoked._data)
             c.app.clock.adjust.assert_has_calls([
             c.app.clock.adjust.assert_has_calls([
                 call(312), call(29),
                 call(312), call(29),
             ], any_order=True)
             ], any_order=True)

+ 2 - 1
celery/tests/worker/test_control.py

@@ -125,6 +125,7 @@ class test_ControlPanel(AppCase):
 
 
     def create_state(self, **kwargs):
     def create_state(self, **kwargs):
         kwargs.setdefault('app', self.app)
         kwargs.setdefault('app', self.app)
+        kwargs.setdefault('hostname', hostname)
         return AttributeDict(kwargs)
         return AttributeDict(kwargs)
 
 
     def create_panel(self, **kwargs):
     def create_panel(self, **kwargs):
@@ -165,7 +166,7 @@ class test_ControlPanel(AppCase):
         panel.state.app.clock.value = 313
         panel.state.app.clock.value = 313
         worker_state.revoked.add('revoked1')
         worker_state.revoked.add('revoked1')
         try:
         try:
-            x = panel.handle('hello')
+            x = panel.handle('hello', {'from_node': 'george@vandelay.com'})
             self.assertIn('revoked1', x['revoked'])
             self.assertIn('revoked1', x['revoked'])
             self.assertEqual(x['clock'], 314)  # incremented
             self.assertEqual(x['clock'], 314)  # incremented
         finally:
         finally:

+ 3 - 7
celery/tests/worker/test_hub.py

@@ -1,11 +1,7 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
-from kombu.async import (
-    Hub,
-    repr_flag,
-    _rcb,
-    READ, WRITE, ERR
-)
+from kombu.async import Hub, READ, WRITE, ERR
+from kombu.async.hub import repr_flag, _rcb
 from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
 from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
 
 
 from mock import Mock, call, patch
 from mock import Mock, call, patch
@@ -207,7 +203,7 @@ class test_Hub(Case):
 
 
         eback.side_effect = ValueError('foo')
         eback.side_effect = ValueError('foo')
         hub.scheduler = iter([(0, eback)])
         hub.scheduler = iter([(0, eback)])
-        with patch('celery.worker.hub.logger') as logger:
+        with patch('kombu.async.hub.logger') as logger:
             with self.assertRaises(StopIteration):
             with self.assertRaises(StopIteration):
                 hub.fire_timers()
                 hub.fire_timers()
             self.assertTrue(logger.error.called)
             self.assertTrue(logger.error.called)

+ 3 - 3
celery/tests/worker/test_worker.py

@@ -7,11 +7,11 @@ from collections import deque
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 from threading import Event
 from threading import Event
 
 
+from amqp import ChannelError
 from billiard.exceptions import WorkerLostError
 from billiard.exceptions import WorkerLostError
 from kombu import Connection
 from kombu import Connection
 from kombu.async import READ, ERR
 from kombu.async import READ, ERR
 from kombu.common import QoS, ignore_errors
 from kombu.common import QoS, ignore_errors
-from kombu.exceptions import StdChannelError
 from kombu.transport.base import Message
 from kombu.transport.base import Message
 from mock import call, Mock, patch
 from mock import call, Mock, patch
 
 
@@ -624,12 +624,12 @@ class test_Consumer(AppCase):
     def test_connect_errback(self, sleep, connect):
     def test_connect_errback(self, sleep, connect):
         l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
         l = MyKombuConsumer(self.buffer.put, timer=self.timer, app=self.app)
         from kombu.transport.memory import Transport
         from kombu.transport.memory import Transport
-        Transport.connection_errors = (StdChannelError, )
+        Transport.connection_errors = (ChannelError, )
 
 
         def effect():
         def effect():
             if connect.call_count > 1:
             if connect.call_count > 1:
                 return
                 return
-            raise StdChannelError()
+            raise ChannelError()
         connect.side_effect = effect
         connect.side_effect = effect
         l.connect()
         l.connect()
         connect.assert_called_with()
         connect.assert_called_with()

+ 2 - 1
celery/worker/control.py

@@ -249,7 +249,8 @@ def dump_revoked(state, **kwargs):
 def hello(state, from_node, revoked=None, **kwargs):
 def hello(state, from_node, revoked=None, **kwargs):
     if from_node != state.hostname:
     if from_node != state.hostname:
         logger.info('sync with %s', from_node)
         logger.info('sync with %s', from_node)
-        worker_state.revoked.update(revoked)
+        if revoked:
+            worker_state.revoked.update(revoked)
         return {'revoked': worker_state.revoked._data,
         return {'revoked': worker_state.revoked._data,
                 'clock': state.app.clock.forward()}
                 'clock': state.app.clock.forward()}