Browse Source

Tests passing

Ask Solem 12 years ago
parent
commit
0334b70801
3 changed files with 25 additions and 6 deletions
  1. 1 0
      celery/events/__init__.py
  2. 16 3
      celery/tests/worker/test_worker.py
  3. 8 3
      celery/worker/__init__.py

+ 1 - 0
celery/events/__init__.py

@@ -214,6 +214,7 @@ class EventReceiver(object):
                             queues=[self.queue], no_ack=True)
         consumer.register_callback(self._receive)
         consumer.consume()
+
         try:
             if wakeup:
                 self.wakeup_workers(channel=consumer.channel)

+ 16 - 3
celery/tests/worker/test_worker.py

@@ -24,7 +24,7 @@ from celery.task import task as task_dec
 from celery.task import periodic_task as periodic_task_dec
 from celery.utils import uuid
 from celery.worker import WorkController, Queues, Timers, EvLoop, Pool
-from celery.worker.buckets import FastQueue
+from celery.worker.buckets import FastQueue, AsyncTaskBucket
 from celery.worker.job import Request
 from celery.worker.consumer import BlockingConsumer
 from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX, CLOSE
@@ -959,15 +959,28 @@ class test_WorkController(AppCase):
         self.assertIsNone(worker.mediator)
         self.assertEqual(worker.ready_queue.put, worker.process_task)
 
+    def test_enable_rate_limits_eventloop(self):
+        try:
+            worker = self.create_worker(disable_rate_limits=False,
+                                        use_eventloop=True,
+                                        pool_cls='processes')
+        except ImportError:
+            raise SkipTest('multiprocessing not supported')
+        self.assertIsInstance(worker.ready_queue, AsyncTaskBucket)
+        self.assertFalse(worker.mediator)
+        self.assertNotEqual(worker.ready_queue.put, worker.process_task)
+
+
     def test_disable_rate_limits_processes(self):
         try:
             worker = self.create_worker(disable_rate_limits=True,
+                                        use_eventloop=False,
                                         pool_cls='processes')
         except ImportError:
             raise SkipTest('multiprocessing not supported')
         self.assertIsInstance(worker.ready_queue, FastQueue)
-        self.assertTrue(worker.mediator)
-        self.assertNotEqual(worker.ready_queue.put, worker.process_task)
+        self.assertFalse(worker.mediator)
+        self.assertEqual(worker.ready_queue.put, worker.process_task)
 
     def test_process_task_sem(self):
         worker = self.worker

+ 8 - 3
celery/worker/__init__.py

@@ -206,12 +206,13 @@ class Queues(bootsteps.Component):
 
     def create(self, w):
         BucketType = TaskBucket
-        w.start_mediator = True
+        w.start_mediator = not w.disable_rate_limits
         if not w.pool_cls.rlimit_safe:
             w.start_mediator = False
             BucketType = AsyncTaskBucket
         process_task = w.process_task
         if w.use_eventloop:
+            w.start_mediator = False
             BucketType = AsyncTaskBucket
             if w.pool_putlocks and w.pool_cls.uses_semaphore:
                 process_task = w.process_task_sem
@@ -312,7 +313,8 @@ class WorkController(configurated):
     _running = 0
 
     def __init__(self, loglevel=None, hostname=None, ready_callback=noop,
-                 queues=None, app=None, pidfile=None, **kwargs):
+                 queues=None, app=None, pidfile=None, use_eventloop=None,
+                 **kwargs):
         self.app = app_or_default(app or self.app)
 
         self._shutdown_complete = Event()
@@ -328,7 +330,10 @@ class WorkController(configurated):
         self.pidlock = None
         # this connection is not established, only used for params
         self._conninfo = self.app.connection()
-        self.use_eventloop = self.should_use_eventloop()
+        self.use_eventloop = (
+            self.should_use_eventloop() if use_eventloop is None
+            else use_eventloop
+        )
 
         # Update celery_include to have all known task modules, so that we
         # ensure all task modules are imported in case an execv happens.