|
@@ -89,6 +89,7 @@ from celery.task.trace import build_tracer
|
|
|
from celery.utils import timer2
|
|
|
from celery.utils.functional import noop
|
|
|
from celery.utils.log import get_logger
|
|
|
+from celery.utils.imports import instantiate
|
|
|
from celery.utils import text
|
|
|
|
|
|
from . import state
|
|
@@ -285,6 +286,9 @@ class Consumer(object):
|
|
|
#: The consumer used to consume broadcast commands.
|
|
|
broadcast_consumer = None
|
|
|
|
|
|
+ #: Dictionary holding all active actors.
|
|
|
+ actor_registry = {}
|
|
|
+
|
|
|
#: The process mailbox (kombu pidbox node).
|
|
|
pidbox_node = None
|
|
|
_pidbox_node_shutdown = None # used for greenlets
|
|
@@ -310,6 +314,7 @@ class Consumer(object):
|
|
|
self.task_consumer = None
|
|
|
self.controller = controller
|
|
|
self.broadcast_consumer = None
|
|
|
+ self.actor_registry = {}
|
|
|
self.ready_queue = ready_queue
|
|
|
self.send_events = send_events
|
|
|
self.init_callback = init_callback
|
|
@@ -326,6 +331,7 @@ class Consumer(object):
|
|
|
self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
|
|
|
state=pidbox_state,
|
|
|
handlers=Panel.data)
|
|
|
+
|
|
|
conninfo = self.app.connection()
|
|
|
self.connection_errors = conninfo.connection_errors
|
|
|
self.channel_errors = conninfo.channel_errors
|
|
@@ -516,6 +522,33 @@ class Consumer(object):
|
|
|
error('Control command error: %r', exc, exc_info=True)
|
|
|
self.reset_pidbox_node()
|
|
|
|
|
|
+ def add_actor(self, actor_name):
|
|
|
+ """Add actor to the actor registry and start the actor main method"""
|
|
|
+ try:
|
|
|
+ actor = instantiate(actor_name, connection = self.connection)
|
|
|
+ consumer = actor.Consumer(self.connection.channel())
|
|
|
+ consumer.consume()
|
|
|
+ self.actor_registry[actor.id] = consumer
|
|
|
+ print 'Register actor in the actor registry: %s' % actor_name
|
|
|
+ return actor.id
|
|
|
+ except Exception as exc:
|
|
|
+ error('Start actor error: %r', exc, exc_info=True)
|
|
|
+
|
|
|
+ def stop_all_actors(self):
|
|
|
+ for _, consumer in self.actor_registry.items():
|
|
|
+ self.maybe_conn_error(consumer.cancel)
|
|
|
+ self.actor_registry.clear()
|
|
|
+
|
|
|
+ def reset_actor_nodes(self):
|
|
|
+ for actor, consumer in self.actor_registry:
|
|
|
+ self.maybe_conn_error(consumer.cancel)
|
|
|
+ consumer.consume()
|
|
|
+
|
|
|
+ def stop_actor(self, actor_id):
|
|
|
+ if actor_id in self.actor_registry:
|
|
|
+ consumer = self.actor_registry.pop(actor_id)
|
|
|
+ self.maybe_conn_error(consumer.cancel)
|
|
|
+
|
|
|
def apply_eta_task(self, task):
|
|
|
"""Method called by the timer to apply a task with an
|
|
|
ETA/countdown."""
|
|
@@ -548,6 +581,7 @@ class Consumer(object):
|
|
|
:param message: The kombu message object.
|
|
|
|
|
|
"""
|
|
|
+ print 'I am in receive_message'
|
|
|
try:
|
|
|
name = body['task']
|
|
|
except (KeyError, TypeError):
|
|
@@ -583,7 +617,8 @@ class Consumer(object):
|
|
|
self.maybe_conn_error(self.task_consumer.close)
|
|
|
|
|
|
self.stop_pidbox_node()
|
|
|
-
|
|
|
+ self.stop_all_actors()
|
|
|
+
|
|
|
if connection:
|
|
|
debug('Closing broker connection...')
|
|
|
self.maybe_conn_error(connection.close)
|
|
@@ -612,7 +647,9 @@ class Consumer(object):
|
|
|
debug('Shutting down event dispatcher...')
|
|
|
self.event_dispatcher = \
|
|
|
self.maybe_conn_error(self.event_dispatcher.close)
|
|
|
-
|
|
|
+
|
|
|
+ self.stop_all_actors()
|
|
|
+
|
|
|
debug('Cancelling broadcast consumer...')
|
|
|
if self.broadcast_consumer:
|
|
|
self.maybe_conn_error(self.broadcast_consumer.cancel)
|
|
@@ -650,8 +687,8 @@ class Consumer(object):
|
|
|
return self.pool.spawn_n(self._green_pidbox_node)
|
|
|
self.pidbox_node.channel = self.connection.channel()
|
|
|
self.broadcast_consumer = self.pidbox_node.listen(
|
|
|
- callback=self.on_control)
|
|
|
-
|
|
|
+ callback=self.on_control)
|
|
|
+
|
|
|
def stop_pidbox_node(self):
|
|
|
if self._pidbox_node_stopped:
|
|
|
self._pidbox_node_shutdown.set()
|
|
@@ -701,6 +738,7 @@ class Consumer(object):
|
|
|
debug('Connection established.')
|
|
|
self.task_consumer = self.app.amqp.TaskConsumer(self.connection,
|
|
|
on_decode_error=self.on_decode_error)
|
|
|
+ self.reset_actor_nodes()
|
|
|
# QoS: Reset prefetch window.
|
|
|
self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
|
|
|
self.qos.update()
|