|
@@ -68,6 +68,11 @@ class EventDispatcher(object):
|
|
|
if self.enabled:
|
|
|
self.enable()
|
|
|
|
|
|
+ def __enter__(self):
|
|
|
+ return self
|
|
|
+
|
|
|
+ def __exit__(self, *exc_info):
|
|
|
+ self.close()
|
|
|
|
|
|
def enable(self):
|
|
|
self.publisher = Producer(self.channel or self.connection.channel(),
|
|
@@ -78,10 +83,7 @@ class EventDispatcher(object):
|
|
|
def disable(self):
|
|
|
if self.enabled:
|
|
|
self.enabled = False
|
|
|
- if self.publisher is not None:
|
|
|
- if not self.channel: # close auto channel.
|
|
|
- self.publisher.channel.close()
|
|
|
- self.publisher = None
|
|
|
+ self.close()
|
|
|
|
|
|
def send(self, type, **fields):
|
|
|
"""Send event.
|
|
@@ -116,7 +118,10 @@ class EventDispatcher(object):
|
|
|
def close(self):
|
|
|
"""Close the event dispatcher."""
|
|
|
self.mutex.locked() and self.mutex.release()
|
|
|
- self.publisher and self.publisher.channel.close()
|
|
|
+ if self.publisher is not None:
|
|
|
+ if not self.channel: # close auto channel.
|
|
|
+ self.publisher.channel.close()
|
|
|
+ self.publisher = None
|
|
|
|
|
|
|
|
|
class EventReceiver(object):
|
|
@@ -236,3 +241,11 @@ class Events(object):
|
|
|
def State(self):
|
|
|
from celery.events.state import State as _State
|
|
|
return _State()
|
|
|
+
|
|
|
+ @contextmanager
|
|
|
+ def default_dispatcher(self, hostname=None, enabled=True,
|
|
|
+ buffer_while_offline=False):
|
|
|
+ with self.app.amqp.publisher_pool.acquire(block=True) as pub:
|
|
|
+ with self.Dispatcher(pub.connection, hostname, enabled,
|
|
|
+ pub.channel, buffer_while_offline) as d:
|
|
|
+ yield d
|