Browse Source

Removes worker Queues bootstep

Ask Solem 9 years ago
parent
commit
fc38ae3b2e

+ 12 - 14
celery/tests/worker/test_components.py

@@ -4,24 +4,12 @@ from __future__ import absolute_import
 # here to complete coverage.  Should move everyting to this module at some
 # here to complete coverage.  Should move everyting to this module at some
 # point [-ask]
 # point [-ask]
 
 
-from celery.worker.components import (
-    Queues,
-    Pool,
-)
+from celery.platforms import IS_WINDOWS
+from celery.worker.components import Pool
 
 
 from celery.tests.case import AppCase, Mock
 from celery.tests.case import AppCase, Mock
 
 
 
 
-class test_Queues(AppCase):
-
-    def test_create_when_eventloop(self):
-        w = Mock()
-        w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
-        q = Queues(w)
-        q.create(w)
-        self.assertIs(w.process_task, w._process_task_sem)
-
-
 class test_Pool(AppCase):
 class test_Pool(AppCase):
 
 
     def test_close_terminate(self):
     def test_close_terminate(self):
@@ -36,3 +24,13 @@ class test_Pool(AppCase):
         w.pool = None
         w.pool = None
         comp.close(w)
         comp.close(w)
         comp.terminate(w)
         comp.terminate(w)
+
+    def test_create_when_eventloop(self):
+        if IS_WINDOWS:
+            raise SkipTest('Win32')
+        w = Mock()
+        w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
+        comp = Pool(w)
+        pool = w.pool = Mock()
+        comp.create(w)
+        self.assertIs(w.process_task, w._process_task_sem)

+ 6 - 6
celery/tests/worker/test_worker.py

@@ -1133,12 +1133,6 @@ class test_WorkController(AppCase):
         for step in worker.steps:
         for step in worker.steps:
             self.assertTrue(step.terminate.call_count)
             self.assertTrue(step.terminate.call_count)
 
 
-    def test_Queues_pool_no_sem(self):
-        w = Mock()
-        w.pool_cls.uses_semaphore = False
-        components.Queues(w).create(w)
-        self.assertIs(w.process_task, w._process_task)
-
     def test_Hub_crate(self):
     def test_Hub_crate(self):
         w = Mock()
         w = Mock()
         x = components.Hub(w)
         x = components.Hub(w)
@@ -1153,6 +1147,12 @@ class test_WorkController(AppCase):
         pool = components.Pool(w)
         pool = components.Pool(w)
         pool.create(w)
         pool.create(w)
 
 
+    def test_Pool_pool_no_sem(self):
+        w = Mock()
+        w.pool_cls.uses_semaphore = False
+        components.Pool(w).create(w)
+        self.assertIs(w.process_task, w._process_task)
+
     def test_Pool_create(self):
     def test_Pool_create(self):
         from kombu.async.semaphore import LaxBoundedSemaphore
         from kombu.async.semaphore import LaxBoundedSemaphore
         w = Mock()
         w = Mock()

+ 0 - 1
celery/worker/__init__.py

@@ -81,7 +81,6 @@ class WorkController(object):
         name = 'Worker'
         name = 'Worker'
         default_steps = {
         default_steps = {
             'celery.worker.components:Hub',
             'celery.worker.components:Hub',
-            'celery.worker.components:Queues',
             'celery.worker.components:Pool',
             'celery.worker.components:Pool',
             'celery.worker.components:Beat',
             'celery.worker.components:Beat',
             'celery.worker.components:Timer',
             'celery.worker.components:Timer',

+ 8 - 16
celery/worker/components.py

@@ -19,9 +19,11 @@ from celery import bootsteps
 from celery._state import _set_task_join_will_block
 from celery._state import _set_task_join_will_block
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
 from celery.five import string_t
 from celery.five import string_t
+from celery.platforms import IS_WINDOWS
 from celery.utils.log import worker_logger as logger
 from celery.utils.log import worker_logger as logger
 
 
-__all__ = ['Timer', 'Hub', 'Queues', 'Pool', 'Beat', 'StateDB', 'Consumer']
+
+__all__ = ['Timer', 'Hub', 'Pool', 'Beat', 'StateDB', 'Consumer']
 
 
 ERR_B_GREEN = """\
 ERR_B_GREEN = """\
 -B option doesn't work with eventlet/gevent pools: \
 -B option doesn't work with eventlet/gevent pools: \
@@ -96,19 +98,6 @@ class Hub(bootsteps.StartStopStep):
             pool.Lock = DummyLock
             pool.Lock = DummyLock
 
 
 
 
-class Queues(bootsteps.Step):
-    """This bootstep initializes the internal queues
-    used by the worker."""
-    label = 'Queues (intra)'
-    requires = (Hub,)
-
-    def create(self, w):
-        w.process_task = w._process_task
-        if w.use_eventloop:
-            if w.pool_putlocks and w.pool_cls.uses_semaphore:
-                w.process_task = w._process_task_sem
-
-
 class Pool(bootsteps.StartStopStep):
 class Pool(bootsteps.StartStopStep):
     """Bootstep managing the worker pool.
     """Bootstep managing the worker pool.
 
 
@@ -123,7 +112,7 @@ class Pool(bootsteps.StartStopStep):
         * min_concurrency
         * min_concurrency
 
 
     """
     """
-    requires = (Queues,)
+    requires = (Hub,)
 
 
     def __init__(self, w, autoscale=None, autoreload=None,
     def __init__(self, w, autoscale=None, autoreload=None,
                  no_execv=False, optimization=None, **kwargs):
                  no_execv=False, optimization=None, **kwargs):
@@ -151,14 +140,17 @@ class Pool(bootsteps.StartStopStep):
     def create(self, w, semaphore=None, max_restarts=None):
     def create(self, w, semaphore=None, max_restarts=None):
         if w.app.conf.CELERYD_POOL in ('eventlet', 'gevent'):
         if w.app.conf.CELERYD_POOL in ('eventlet', 'gevent'):
             warnings.warn(UserWarning(W_POOL_SETTING))
             warnings.warn(UserWarning(W_POOL_SETTING))
-        threaded = not w.use_eventloop
+        threaded = not w.use_eventloop or IS_WINDOWS
         procs = w.min_concurrency
         procs = w.min_concurrency
         forking_enable = w.no_execv if w.force_execv else True
         forking_enable = w.no_execv if w.force_execv else True
+        w.process_task = w._process_task
         if not threaded:
         if not threaded:
             semaphore = w.semaphore = LaxBoundedSemaphore(procs)
             semaphore = w.semaphore = LaxBoundedSemaphore(procs)
             w._quick_acquire = w.semaphore.acquire
             w._quick_acquire = w.semaphore.acquire
             w._quick_release = w.semaphore.release
             w._quick_release = w.semaphore.release
             max_restarts = 100
             max_restarts = 100
+            if w.pool_putlocks and w.pool_cls.uses_semaphore:
+                w.process_task = w._process_task_sem
         allow_restart = self.autoreload_enabled or w.pool_restarts
         allow_restart = self.autoreload_enabled or w.pool_restarts
         pool = w.pool = self.instantiate(
         pool = w.pool = self.instantiate(
             w.pool_cls, w.min_concurrency,
             w.pool_cls, w.min_concurrency,