|
@@ -19,6 +19,7 @@ from contextlib import contextmanager
|
|
from copy import copy
|
|
from copy import copy
|
|
|
|
|
|
from kombu import eventloop, Exchange, Queue, Consumer, Producer
|
|
from kombu import eventloop, Exchange, Queue, Consumer, Producer
|
|
|
|
+from kombu.mixins import ConsumerMixin
|
|
from kombu.utils import cached_property
|
|
from kombu.utils import cached_property
|
|
|
|
|
|
from celery.app import app_or_default
|
|
from celery.app import app_or_default
|
|
@@ -153,7 +154,7 @@ class EventDispatcher(object):
|
|
self.publisher = None
|
|
self.publisher = None
|
|
|
|
|
|
|
|
|
|
-class EventReceiver(object):
|
|
|
|
|
|
+class EventReceiver(ConsumerMixin):
|
|
"""Capture events.
|
|
"""Capture events.
|
|
|
|
|
|
:param connection: Connection to the broker.
|
|
:param connection: Connection to the broker.
|
|
@@ -190,21 +191,17 @@ class EventReceiver(object):
|
|
handler = self.handlers.get(type) or self.handlers.get('*')
|
|
handler = self.handlers.get(type) or self.handlers.get('*')
|
|
handler and handler(event)
|
|
handler and handler(event)
|
|
|
|
|
|
- @contextmanager
|
|
|
|
- def consumer(self, wakeup=True):
|
|
|
|
- """Create event consumer."""
|
|
|
|
- consumer = Consumer(self.connection,
|
|
|
|
- queues=[self.queue], no_ack=True)
|
|
|
|
- consumer.register_callback(self._receive)
|
|
|
|
- with consumer:
|
|
|
|
- if wakeup:
|
|
|
|
- self.wakeup_workers(channel=consumer.channel)
|
|
|
|
- yield consumer
|
|
|
|
|
|
+ def get_consumers(self, Consumer, channel):
|
|
|
|
+ return [Consumer(queues=[self.queue],
|
|
|
|
+ callbacks=[self._receive], no_ack=True)]
|
|
|
|
+
|
|
|
|
+ def on_consume_ready(self, connection, channel, consumers,
|
|
|
|
+ wakeup=True, **kwargs):
|
|
|
|
+ if wakeup:
|
|
|
|
+ self.wakeup_workers(channel=channel)
|
|
|
|
|
|
def itercapture(self, limit=None, timeout=None, wakeup=True):
|
|
def itercapture(self, limit=None, timeout=None, wakeup=True):
|
|
- with self.consumer(wakeup=wakeup) as consumer:
|
|
|
|
- yield consumer
|
|
|
|
- self.drain_events(limit=limit, timeout=timeout)
|
|
|
|
|
|
+ return self.consume(limit=limit, timeout=timeout, wakeup=wakeup)
|
|
|
|
|
|
def capture(self, limit=None, timeout=None, wakeup=True):
|
|
def capture(self, limit=None, timeout=None, wakeup=True):
|
|
"""Open up a consumer capturing events.
|
|
"""Open up a consumer capturing events.
|
|
@@ -213,17 +210,13 @@ class EventReceiver(object):
|
|
stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
|
|
stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
|
|
|
|
|
|
"""
|
|
"""
|
|
- list(self.itercapture(limit=limit, timeout=timeout, wakeup=wakeup))
|
|
|
|
|
|
+ return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
|
|
|
|
|
|
def wakeup_workers(self, channel=None):
|
|
def wakeup_workers(self, channel=None):
|
|
self.app.control.broadcast('heartbeat',
|
|
self.app.control.broadcast('heartbeat',
|
|
connection=self.connection,
|
|
connection=self.connection,
|
|
channel=channel)
|
|
channel=channel)
|
|
|
|
|
|
- def drain_events(self, **kwargs):
|
|
|
|
- for _ in eventloop(self.connection, **kwargs):
|
|
|
|
- pass
|
|
|
|
-
|
|
|
|
def _receive(self, body, message):
|
|
def _receive(self, body, message):
|
|
type = body.pop('type').lower()
|
|
type = body.pop('type').lower()
|
|
clock = body.get('clock')
|
|
clock = body.get('clock')
|