|
@@ -209,21 +209,18 @@ class Queues(bootsteps.Component):
|
|
|
|
|
|
def create(self, w):
|
|
|
BucketType = TaskBucket
|
|
|
- w.start_mediator = True
|
|
|
+ w.start_mediator = w.pool_cls.requires_mediator
|
|
|
if not w.pool_cls.rlimit_safe:
|
|
|
- w.start_mediator = False
|
|
|
BucketType = AsyncTaskBucket
|
|
|
process_task = w.process_task
|
|
|
if w.use_eventloop:
|
|
|
- w.start_mediator = True # Need async write, fixed in 3.1
|
|
|
BucketType = AsyncTaskBucket
|
|
|
if w.pool_putlocks and w.pool_cls.uses_semaphore:
|
|
|
process_task = w.process_task_sem
|
|
|
- if w.disable_rate_limits:
|
|
|
+ if w.disable_rate_limits or not w.start_mediator:
|
|
|
w.ready_queue = FastQueue()
|
|
|
- if getattr(w.pool_cls, 'no_threads', False): # temp fix
|
|
|
+ if not w.start_mediator:
|
|
|
w.ready_queue.put = process_task
|
|
|
- w.start_mediator = False
|
|
|
else:
|
|
|
w.ready_queue = BucketType(
|
|
|
task_registry=w.app.tasks, callback=process_task, worker=w,
|