|
@@ -83,6 +83,7 @@ import threading
|
|
|
from time import sleep
|
|
|
from Queue import Empty
|
|
|
|
|
|
+from billiard.exceptions import WorkerLostError
|
|
|
from kombu.utils.encoding import safe_repr
|
|
|
|
|
|
from celery.app import app_or_default
|
|
@@ -172,7 +173,8 @@ class Component(StartStopComponent):
|
|
|
pool=w.pool,
|
|
|
timer=w.timer,
|
|
|
app=w.app,
|
|
|
- controller=w)
|
|
|
+ controller=w,
|
|
|
+ use_eventloop=w.use_eventloop)
|
|
|
return c
|
|
|
|
|
|
|
|
@@ -303,7 +305,7 @@ class Consumer(object):
|
|
|
def __init__(self, ready_queue,
|
|
|
init_callback=noop, send_events=False, hostname=None,
|
|
|
initial_prefetch_count=2, pool=None, app=None,
|
|
|
- timer=None, controller=None, **kwargs):
|
|
|
+ timer=None, controller=None, use_eventloop=False, **kwargs):
|
|
|
self.app = app_or_default(app)
|
|
|
self.connection = None
|
|
|
self.task_consumer = None
|
|
@@ -318,6 +320,7 @@ class Consumer(object):
|
|
|
self.heart = None
|
|
|
self.pool = pool
|
|
|
self.timer = timer or timer2.default_timer
|
|
|
+ self.use_eventloop = use_eventloop
|
|
|
pidbox_state = AttributeDict(app=self.app,
|
|
|
hostname=self.hostname,
|
|
|
listener=self, # pre 2.2
|
|
@@ -331,6 +334,8 @@ class Consumer(object):
|
|
|
|
|
|
self._does_info = logger.isEnabledFor(logging.INFO)
|
|
|
self.strategies = {}
|
|
|
+ if self.use_eventloop:
|
|
|
+ self.hub = Hub(self.timer)
|
|
|
|
|
|
def update_strategies(self):
|
|
|
S = self.strategies
|
|
@@ -358,23 +363,47 @@ class Consumer(object):
|
|
|
|
|
|
def consume_messages(self, sleep=sleep, min=min, Empty=Empty):
|
|
|
"""Consume messages forever (or until an exception is raised)."""
|
|
|
- self.task_consumer.consume()
|
|
|
- debug("Ready to accept tasks!")
|
|
|
|
|
|
- with Hub(self.timer) as hub:
|
|
|
+ with self.hub as hub:
|
|
|
qos = self.qos
|
|
|
+ concurrency = self.pool.num_processes
|
|
|
update_qos = qos.update
|
|
|
- update_fds = hub.update
|
|
|
+ update_readers = hub.update_readers
|
|
|
fdmap = hub.fdmap
|
|
|
poll = hub.poller.poll
|
|
|
fire_timers = hub.fire_timers
|
|
|
scheduled = hub.timer._queue
|
|
|
transport = self.connection.transport
|
|
|
on_poll_start = transport.on_poll_start
|
|
|
-
|
|
|
- self.task_consumer.callbacks.append(fire_timers)
|
|
|
-
|
|
|
- update_fds(self.connection.eventmap, self.pool.eventmap)
|
|
|
+ strategies = self.strategies
|
|
|
+ buffer = []
|
|
|
+
|
|
|
+ def flush_buffer():
|
|
|
+ for name, body, message in buffer:
|
|
|
+ try:
|
|
|
+ strategies[name](message, body, message.ack_log_error)
|
|
|
+ except KeyError, exc:
|
|
|
+ self.handle_unknown_task(body, message, exc)
|
|
|
+ except InvalidTaskError, exc:
|
|
|
+ self.handle_invalid_task(body, message, exc)
|
|
|
+ buffer[:] = []
|
|
|
+
|
|
|
+ def on_task_received(body, message):
|
|
|
+ try:
|
|
|
+ name = body["task"]
|
|
|
+ except (KeyError, TypeError):
|
|
|
+ return self.handle_unknown_message(body, message)
|
|
|
+ bufferlen = len(buffer)
|
|
|
+ buffer.append((name, body, message))
|
|
|
+ if bufferlen + 1 >= 4:
|
|
|
+ flush_buffer()
|
|
|
+ if bufferlen:
|
|
|
+ fire_timers()
|
|
|
+
|
|
|
+ if not self.pool.did_start_ok():
|
|
|
+ raise WorkerLostError("Could not start worker processes")
|
|
|
+
|
|
|
+ update_readers(self.connection.eventmap, self.pool.readers)
|
|
|
for handler, interval in self.pool.timers.iteritems():
|
|
|
self.timer.apply_interval(interval * 1000.0, handler)
|
|
|
|
|
@@ -387,6 +416,10 @@ class Consumer(object):
|
|
|
self.pool.on_process_down = on_process_down
|
|
|
|
|
|
transport.on_poll_init(hub.poller)
|
|
|
+ self.task_consumer.callbacks = [on_task_received]
|
|
|
+ self.task_consumer.consume()
|
|
|
+
|
|
|
+ debug("Ready to accept tasks!")
|
|
|
|
|
|
while self._state != CLOSE and self.connection:
|
|
|
# shutdown if signal handlers told us to.
|
|
@@ -402,18 +435,21 @@ class Consumer(object):
|
|
|
if qos.prev != qos.value:
|
|
|
update_qos()
|
|
|
|
|
|
- update_fds(on_poll_start())
|
|
|
+ update_readers(on_poll_start())
|
|
|
if fdmap:
|
|
|
- for fileno, event in poll(time_to_sleep) or ():
|
|
|
- try:
|
|
|
- fdmap[fileno](fileno, event)
|
|
|
- except Empty:
|
|
|
- pass
|
|
|
- except socket.error:
|
|
|
- if self._state != CLOSE: # pragma: no cover
|
|
|
- raise
|
|
|
+ for timeout in (time_to_sleep, 0.001):
|
|
|
+ for fileno, event in poll(timeout) or ():
|
|
|
+ try:
|
|
|
+ fdmap[fileno](fileno, event)
|
|
|
+ except Empty:
|
|
|
+ break
|
|
|
+ except socket.error:
|
|
|
+ if self._state != CLOSE: # pragma: no cover
|
|
|
+ raise
|
|
|
+ if buffer:
|
|
|
+ flush_buffer()
|
|
|
else:
|
|
|
- sleep(min(time_to_sleep, 1))
|
|
|
+ sleep(min(time_to_sleep, 0.1))
|
|
|
|
|
|
def on_task(self, task):
|
|
|
"""Handle received task.
|
|
@@ -422,7 +458,6 @@ class Consumer(object):
|
|
|
otherwise we move it the ready queue for immediate processing.
|
|
|
|
|
|
"""
|
|
|
-
|
|
|
if task.revoked():
|
|
|
return
|
|
|
|
|
@@ -475,6 +510,18 @@ class Consumer(object):
|
|
|
safe_repr(message.content_encoding),
|
|
|
safe_repr(message.delivery_info))
|
|
|
|
|
|
+ def handle_unknown_message(self, body, message):
|
|
|
+ warn(UNKNOWN_FORMAT, self._message_report(body, message))
|
|
|
+ message.reject_log_error(logger, self.connection_errors)
|
|
|
+
|
|
|
+ def handle_unknown_task(self, body, message, exc):
|
|
|
+ error(UNKNOWN_TASK_ERROR, exc, safe_repr(body), exc_info=True)
|
|
|
+ message.reject_log_error(logger, self.connection_errors)
|
|
|
+
|
|
|
+ def handle_invalid_task(self, body, message, exc):
|
|
|
+ error(INVALID_TASK_ERROR, str(exc), safe_repr(body), exc_info=True)
|
|
|
+ message.reject_log_error(logger, self.connection_errors)
|
|
|
+
|
|
|
def receive_message(self, body, message):
|
|
|
"""Handles incoming messages.
|
|
|
|
|
@@ -485,18 +532,14 @@ class Consumer(object):
|
|
|
try:
|
|
|
name = body["task"]
|
|
|
except (KeyError, TypeError):
|
|
|
- warn(UNKNOWN_FORMAT, self._message_report(body, message))
|
|
|
- message.reject_log_error(logger, self.connection_errors)
|
|
|
- return
|
|
|
+ return self.handle_unknown_message(body, message)
|
|
|
|
|
|
try:
|
|
|
self.strategies[name](message, body, message.ack_log_error)
|
|
|
except KeyError, exc:
|
|
|
- error(UNKNOWN_TASK_ERROR, exc, safe_repr(body), exc_info=True)
|
|
|
- message.reject_log_error(logger, self.connection_errors)
|
|
|
+ self.handle_unknown_task(body, message, exc)
|
|
|
except InvalidTaskError, exc:
|
|
|
- error(INVALID_TASK_ERROR, str(exc), safe_repr(body), exc_info=True)
|
|
|
- message.reject_log_error(logger, self.connection_errors)
|
|
|
+ self.handle_invalid_task(body, message, exc)
|
|
|
|
|
|
def maybe_conn_error(self, fun):
|
|
|
"""Applies function but ignores any connection or channel
|
|
@@ -644,9 +687,6 @@ class Consumer(object):
|
|
|
self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
|
|
|
self.qos.update()
|
|
|
|
|
|
- # receive_message handles incoming messages.
|
|
|
- self.task_consumer.register_callback(self.receive_message)
|
|
|
-
|
|
|
# Setup the process mailbox.
|
|
|
self.reset_pidbox_node()
|
|
|
|
|
@@ -745,7 +785,10 @@ class Consumer(object):
|
|
|
class BlockingConsumer(Consumer):
|
|
|
|
|
|
def consume_messages(self):
|
|
|
+ # receive_message handles incoming messages.
|
|
|
+ self.task_consumer.register_callback(self.receive_message)
|
|
|
self.task_consumer.consume()
|
|
|
+
|
|
|
debug("Ready to accept tasks!")
|
|
|
|
|
|
while self._state != CLOSE and self.connection:
|