|
@@ -83,6 +83,7 @@ import threading
|
|
|
from time import sleep
|
|
|
from Queue import Empty
|
|
|
|
|
|
+from billiard.exceptions import WorkerLostError
|
|
|
from kombu.utils.encoding import safe_repr
|
|
|
|
|
|
from celery.app import app_or_default
|
|
@@ -367,7 +368,7 @@ class Consumer(object):
|
|
|
qos = self.qos
|
|
|
concurrency = self.pool.num_processes
|
|
|
update_qos = qos.update
|
|
|
- update_fds = hub.update
|
|
|
+ update_readers = hub.update_readers
|
|
|
fdmap = hub.fdmap
|
|
|
poll = hub.poller.poll
|
|
|
fire_timers = hub.fire_timers
|
|
@@ -392,18 +393,17 @@ class Consumer(object):
|
|
|
name = body["task"]
|
|
|
except (KeyError, TypeError):
|
|
|
return self.handle_unknown_message(body, message)
|
|
|
- #try:
|
|
|
- # strategies[name](message, body, message.ack_log_error)
|
|
|
- #except KeyError, exc:
|
|
|
- # self.handle_unknown_task(body, message, exc)
|
|
|
- #except InvalidTaskError, exc:
|
|
|
- # self.handle_invalid_task(body, message, exc)
|
|
|
+ bufferlen = len(buffer)
|
|
|
buffer.append((name, body, message))
|
|
|
- if len(buffer) >= concurrency * 10:
|
|
|
+ if bufferlen + 1 >= 4:
|
|
|
flush_buffer()
|
|
|
- #fire_timers()
|
|
|
+ if bufferlen:
|
|
|
+ fire_timers()
|
|
|
|
|
|
- update_fds(self.connection.eventmap, self.pool.eventmap)
|
|
|
+ if not self.pool.did_start_ok():
|
|
|
+ raise WorkerLostError("Could not start worker processes")
|
|
|
+
|
|
|
+ update_readers(self.connection.eventmap, self.pool.readers)
|
|
|
for handler, interval in self.pool.timers.iteritems():
|
|
|
self.timer.apply_interval(interval * 1000.0, handler)
|
|
|
|
|
@@ -435,19 +435,19 @@ class Consumer(object):
|
|
|
if qos.prev != qos.value:
|
|
|
update_qos()
|
|
|
|
|
|
- update_fds(on_poll_start())
|
|
|
+ update_readers(on_poll_start())
|
|
|
if fdmap:
|
|
|
- #for timeout in (time_to_sleep, 0.1, 0.1):
|
|
|
- for fileno, event in poll(time_to_sleep) or ():
|
|
|
- try:
|
|
|
- fdmap[fileno](fileno, event)
|
|
|
- except Empty:
|
|
|
- pass
|
|
|
- except socket.error:
|
|
|
- if self._state != CLOSE: # pragma: no cover
|
|
|
- raise
|
|
|
- if buffer:
|
|
|
- flush_buffer()
|
|
|
+ for timeout in (time_to_sleep, 0.001):
|
|
|
+ for fileno, event in poll(timeout) or ():
|
|
|
+ try:
|
|
|
+ fdmap[fileno](fileno, event)
|
|
|
+ except Empty:
|
|
|
+ break
|
|
|
+ except socket.error:
|
|
|
+ if self._state != CLOSE: # pragma: no cover
|
|
|
+ raise
|
|
|
+ if buffer:
|
|
|
+ flush_buffer()
|
|
|
else:
|
|
|
sleep(min(time_to_sleep, 0.1))
|
|
|
|