|
@@ -5,10 +5,13 @@ import threading
|
|
|
from collections import deque
|
|
|
from itertools import count
|
|
|
|
|
|
-from kombu.compat import Publisher, Consumer
|
|
|
+from kombu.entity import Exchange, Queue
|
|
|
+from kombu.messaging import Consumer, Producer
|
|
|
|
|
|
from celery.app import app_or_default
|
|
|
+from celery.utils import gen_unique_id
|
|
|
|
|
|
+event_exchange = Exchange("celeryev", type="topic")
|
|
|
|
|
|
def create_event(type, fields):
|
|
|
std = {"type": type,
|
|
@@ -55,16 +58,14 @@ class EventDispatcher(object):
|
|
|
def enable(self):
|
|
|
conf = self.app.conf
|
|
|
self.enabled = True
|
|
|
- self.publisher = Publisher(self.connection,
|
|
|
- exchange=conf.CELERY_EVENT_EXCHANGE,
|
|
|
- exchange_type=conf.CELERY_EVENT_EXCHANGE_TYPE,
|
|
|
- routing_key=conf.CELERY_EVENT_ROUTING_KEY,
|
|
|
- serializer=conf.CELERY_EVENT_SERIALIZER)
|
|
|
+ self.publisher = Producer(self.connection.channel(),
|
|
|
+ exchange=event_exchange,
|
|
|
+ serializer=conf.CELERY_EVENT_SERIALIZER)
|
|
|
|
|
|
def disable(self):
|
|
|
self.enabled = False
|
|
|
if self.publisher is not None:
|
|
|
- self.publisher.close()
|
|
|
+ self.publisher.channel.close()
|
|
|
self.publisher = None
|
|
|
|
|
|
def send(self, type, **fields):
|
|
@@ -81,7 +82,8 @@ class EventDispatcher(object):
|
|
|
event = Event(type, hostname=self.hostname, **fields)
|
|
|
try:
|
|
|
try:
|
|
|
- self.publisher.send(event)
|
|
|
+ self.publisher.publish(event,
|
|
|
+ routing_key=type.replace("-", "."))
|
|
|
except Exception, exc:
|
|
|
self._outbound_buffer.append((event, exc))
|
|
|
finally:
|
|
@@ -95,7 +97,7 @@ class EventDispatcher(object):
|
|
|
def close(self):
|
|
|
"""Close the event dispatcher."""
|
|
|
self._lock.locked() and self._lock.release()
|
|
|
- self.publisher and self.publisher.close()
|
|
|
+ self.publisher and self.publisher.channel.close()
|
|
|
|
|
|
|
|
|
class EventReceiver(object):
|
|
@@ -111,11 +113,19 @@ class EventReceiver(object):
|
|
|
"""
|
|
|
handlers = {}
|
|
|
|
|
|
- def __init__(self, connection, handlers=None, app=None):
|
|
|
+ def __init__(self, connection, handlers=None, routing_key="#",
|
|
|
+ app=None):
|
|
|
self.app = app_or_default(app)
|
|
|
self.connection = connection
|
|
|
if handlers is not None:
|
|
|
self.handlers = handlers
|
|
|
+ self.routing_key = routing_key
|
|
|
+ self.node_id = gen_unique_id()
|
|
|
+ self.queue = Queue("%s.%s" % ("celeryev", self.node_id),
|
|
|
+ exchange=event_exchange,
|
|
|
+ routing_key=self.routing_key,
|
|
|
+ auto_delete=True,
|
|
|
+ durable=False)
|
|
|
|
|
|
def process(self, type, event):
|
|
|
"""Process the received event by dispatching it to the appropriate
|
|
@@ -124,12 +134,17 @@ class EventReceiver(object):
|
|
|
handler and handler(event)
|
|
|
|
|
|
def consumer(self):
|
|
|
+ """Create event consumer.
|
|
|
+
|
|
|
+ .. warning::
|
|
|
+
|
|
|
+ This creates a new channel that needs to be closed
|
|
|
+ by calling `consumer.channel.close()`.
|
|
|
+
|
|
|
+ """
|
|
|
conf = self.app.conf
|
|
|
- consumer = Consumer(self.connection,
|
|
|
- queue=conf.CELERY_EVENT_QUEUE,
|
|
|
- exchange=conf.CELERY_EVENT_EXCHANGE,
|
|
|
- exchange_type=conf.CELERY_EVENT_EXCHANGE_TYPE,
|
|
|
- routing_key=conf.CELERY_EVENT_ROUTING_KEY,
|
|
|
+ consumer = Consumer(self.connection.channel(),
|
|
|
+ queues=[self.queue],
|
|
|
no_ack=True)
|
|
|
consumer.register_callback(self._receive)
|
|
|
return consumer
|
|
@@ -148,14 +163,15 @@ class EventReceiver(object):
|
|
|
if limit and iteration > limit:
|
|
|
break
|
|
|
try:
|
|
|
- consumer.connection.drain_events(timeout=timeout)
|
|
|
+ self.connection.drain_events(timeout=timeout)
|
|
|
except socket.timeout:
|
|
|
if timeout:
|
|
|
raise
|
|
|
except socket.error:
|
|
|
pass
|
|
|
finally:
|
|
|
- consumer.close()
|
|
|
+ consumer.cancel()
|
|
|
+ consumer.channel.close()
|
|
|
|
|
|
def _receive(self, message_data, message):
|
|
|
type = message_data.pop("type").lower()
|