|
@@ -74,8 +74,6 @@ import socket
|
|
|
import sys
|
|
|
import warnings
|
|
|
|
|
|
-from Queue import Empty, Queue
|
|
|
-
|
|
|
from celery.app import app_or_default
|
|
|
from celery.datastructures import AttributeDict, SharedCounter
|
|
|
from celery.exceptions import NotRegistered
|
|
@@ -90,22 +88,6 @@ RUN = 0x1
|
|
|
CLOSE = 0x2
|
|
|
|
|
|
|
|
|
-class MethodQueue(Queue):
|
|
|
-
|
|
|
- def delegate(self, fun, *args, **kwargs):
|
|
|
- self.put_nowait((fun, args, kwargs))
|
|
|
-
|
|
|
- def drain(self):
|
|
|
- while 1:
|
|
|
- try:
|
|
|
- m = self.get_nowait()
|
|
|
- except Empty:
|
|
|
- break
|
|
|
- else:
|
|
|
- method, margs, mkwargs = m
|
|
|
- method(*margs, **mkwargs)
|
|
|
-
|
|
|
-
|
|
|
class QoS(object):
|
|
|
"""Quality of Service for Channel.
|
|
|
|
|
@@ -229,8 +211,6 @@ class Consumer(object):
|
|
|
self.event_dispatcher = None
|
|
|
self.heart = None
|
|
|
self.pool = pool
|
|
|
- self.method_queue = MethodQueue()
|
|
|
- self.pool.method_queue = self.method_queue; # FIXME Ahg ahg ahg.
|
|
|
pidbox_state = AttributeDict(app=self.app,
|
|
|
logger=logger,
|
|
|
hostname=self.hostname,
|
|
@@ -266,13 +246,12 @@ class Consumer(object):
|
|
|
self.logger.debug("Consumer: Starting message consumer...")
|
|
|
self.task_consumer.consume()
|
|
|
self.broadcast_consumer.consume()
|
|
|
- wait_for_message = self._mainloop().next
|
|
|
self.logger.debug("Consumer: Ready to accept tasks!")
|
|
|
|
|
|
while 1:
|
|
|
if self.qos.prev != self.qos.next:
|
|
|
self.qos.update()
|
|
|
- wait_for_message()
|
|
|
+ self.connection.drain_events()
|
|
|
|
|
|
def on_task(self, task):
|
|
|
"""Handle received task.
|
|
@@ -333,6 +312,7 @@ class Consumer(object):
|
|
|
self.logger.critical(
|
|
|
"Couldn't ack %r: message:%r reason:%r" % (
|
|
|
message.delivery_tag, message_data, exc))
|
|
|
+
|
|
|
try:
|
|
|
task = TaskRequest.from_message(message, message_data, ack,
|
|
|
app=self.app,
|
|
@@ -456,15 +436,11 @@ class Consumer(object):
|
|
|
|
|
|
def restart_heartbeat(self):
|
|
|
self.heart = Heart(self.event_dispatcher)
|
|
|
- self.heart.start()
|
|
|
+ #self.heart.start()
|
|
|
|
|
|
def _mainloop(self):
|
|
|
while 1:
|
|
|
- self.method_queue.drain()
|
|
|
- try:
|
|
|
- yield self.connection.drain_events(timeout=0.1)
|
|
|
- except socket.timeout:
|
|
|
- pass
|
|
|
+ yield self.connection.drain_events()
|
|
|
|
|
|
def _open_connection(self):
|
|
|
"""Open connection. May retry opening the connection if configuration
|