Selaa lähdekoodia

Merge remote-tracking branch 'local_changes/celery-actors'

Rumyana Neykova 13 vuotta sitten
vanhempi
commit
a1c689ba27

+ 5 - 2
celery/app/control.py

@@ -76,14 +76,13 @@ class Inspect(object):
 
     def ping(self):
         return self._request('ping')
-
+    
     def active_queues(self):
         return self._request('active_queues')
 
     def conf(self):
         return self._request('dump_conf')
 
-
 class Control(object):
     Mailbox = Mailbox
 
@@ -140,6 +139,10 @@ class Control(object):
         return self.broadcast('ping', reply=True, destination=destination,
                               timeout=timeout, **kwargs)
 
+    def start_actor(self, actor_name, destination = None, timeout = 1, **kwargs):
+        return self.broadcast('start_actor', name = actor_name, reply=True, destination=destination,
+                              timeout=timeout, **kwargs)
+        
     def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
         """Tell all (or specific) workers to set a new rate limit
         for task by type.

+ 1 - 0
celery/tests/worker/test_control.py

@@ -468,3 +468,4 @@ class test_ControlPanel(Case):
             self.assertTrue(consumer.controller.pool.restart.called)
             self.assertTrue(_reload.called)
             self.assertFalse(_import.called)
+            

+ 42 - 4
celery/worker/consumer.py

@@ -89,6 +89,7 @@ from celery.task.trace import build_tracer
 from celery.utils import timer2
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
+from celery.utils.imports import instantiate
 from celery.utils import text
 
 from . import state
@@ -285,6 +286,9 @@ class Consumer(object):
     #: The consumer used to consume broadcast commands.
     broadcast_consumer = None
 
+    #: Dictionary holding all active actors.
+    actor_registry = {}
+    
     #: The process mailbox (kombu pidbox node).
     pidbox_node = None
     _pidbox_node_shutdown = None   # used for greenlets
@@ -310,6 +314,7 @@ class Consumer(object):
         self.task_consumer = None
         self.controller = controller
         self.broadcast_consumer = None
+        self.actor_registry = {}
         self.ready_queue = ready_queue
         self.send_events = send_events
         self.init_callback = init_callback
@@ -326,6 +331,7 @@ class Consumer(object):
         self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
                                                          state=pidbox_state,
                                                          handlers=Panel.data)
+        
         conninfo = self.app.connection()
         self.connection_errors = conninfo.connection_errors
         self.channel_errors = conninfo.channel_errors
@@ -519,6 +525,33 @@ class Consumer(object):
             error('Control command error: %r', exc, exc_info=True)
             self.reset_pidbox_node()
 
+    def add_actor(self, actor_name):
+        """Add actor to the actor registry and start the actor main method"""
+        try:
+            actor = instantiate(actor_name, connection = self.connection)
+            consumer = actor.Consumer(self.connection.channel())
+            consumer.consume()
+            self.actor_registry[actor.id] = consumer
+            print 'Register actor in the actor registry: %s' % actor_name
+            return actor.id
+        except Exception as exc:
+            error('Start actor error: %r', exc, exc_info=True)
+    
+    def stop_all_actors(self):
+        for _, consumer in self.actor_registry.items():
+            self.maybe_conn_error(consumer.cancel)
+        self.actor_registry.clear()
+    
+    def reset_actor_nodes(self):
+        for actor, consumer in self.actor_registry:
+            self.maybe_conn_error(consumer.cancel)
+            consumer.consume()
+    
+    def stop_actor(self, actor_id):
+        if actor_id in self.actor_registry:
+            consumer = self.actor_registry.pop(actor_id)
+            self.maybe_conn_error(consumer.cancel)
+        
     def apply_eta_task(self, task):
         """Method called by the timer to apply a task with an
         ETA/countdown."""
@@ -551,6 +584,7 @@ class Consumer(object):
         :param message: The kombu message object.
 
         """
+        print 'I am in receive_message'
         try:
             name = body['task']
         except (KeyError, TypeError):
@@ -586,7 +620,8 @@ class Consumer(object):
                     self.maybe_conn_error(self.task_consumer.close)
 
         self.stop_pidbox_node()
-
+        self.stop_all_actors()
+            
         if connection:
             debug('Closing broker connection...')
             self.maybe_conn_error(connection.close)
@@ -615,7 +650,9 @@ class Consumer(object):
             debug('Shutting down event dispatcher...')
             self.event_dispatcher = \
                     self.maybe_conn_error(self.event_dispatcher.close)
-
+        
+        self.stop_all_actors()
+            
         debug('Cancelling broadcast consumer...')
         if self.broadcast_consumer:
             self.maybe_conn_error(self.broadcast_consumer.cancel)
@@ -653,8 +690,8 @@ class Consumer(object):
             return self.pool.spawn_n(self._green_pidbox_node)
         self.pidbox_node.channel = self.connection.channel()
         self.broadcast_consumer = self.pidbox_node.listen(
-                                        callback=self.on_control)
-
+                                        callback=self.on_control)   
+    
     def stop_pidbox_node(self):
         if self._pidbox_node_stopped:
             self._pidbox_node_shutdown.set()
@@ -704,6 +741,7 @@ class Consumer(object):
         debug('Connection established.')
         self.task_consumer = self.app.amqp.TaskConsumer(self.connection,
                                     on_decode_error=self.on_decode_error)
+        self.reset_actor_nodes()
         # QoS: Reset prefetch window.
         self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
         self.qos.update()

+ 12 - 2
celery/worker/control.py

@@ -18,6 +18,7 @@ from celery.utils import timeutils
 from celery.utils.compat import UserDict
 from celery.utils.log import get_logger
 from celery.utils import jsonify
+from celery.utils.imports import instantiate
 
 from . import state
 from .state import revoked
@@ -212,7 +213,7 @@ def dump_tasks(panel, taskinfoitems=None, **kwargs):
 
 @Panel.register
 def ping(panel, **kwargs):
-    return 'pong'
+    return {'ok':'ihu-pong'}
 
 
 @Panel.register
@@ -274,7 +275,16 @@ def active_queues(panel):
     return [dict(queue.as_dict(recurse=True))
                     for queue in panel.consumer.task_consumer.queues]
 
-
 @Panel.register
 def dump_conf(panel, **kwargs):
     return jsonify(dict(panel.app.conf))
+
+@Panel.register
+def start_actor(panel, name):
+    print name
+    return panel.consumer.add_actor(name)
+
+@Panel.register
+def stop_actor(panel, id):
+    #instantiate(name).stop()
+    return panel.consumer.stop_actor(id)