소스 검색

Add callback for consumer start

Rumyana Neykova 13 년 전
부모
커밋
efc6f43291
1개의 변경된 파일25개의 추가작업 그리고 6개의 파일을 삭제
  1. 25 6
      celery/worker/consumer.py

+ 25 - 6
celery/worker/consumer.py

@@ -262,7 +262,17 @@ class Consumer(object):
     #: Will only be called once, even if the connection is lost and
     #: re-established.
     init_callback = None
-
+    
+    
+    #: List of callbacks to be called when the connection is started/reset,
+    #: applied with the connection instance as sole argument. 
+    on_reset_conncetion = None
+    
+    
+    #: List of callbacks to be called before the connection is closed,
+    #: applied with the connection instance as sole argument.
+    on_close_connection = None
+    
     #: The current hostname.  Defaults to the system hostname.
     hostname = None
 
@@ -313,6 +323,8 @@ class Consumer(object):
         self.connection = None
         self.task_consumer = None
         self.controller = controller
+        self.on_reset_connection = []
+        self.on_close_connection = []
         self.broadcast_consumer = None
         self.actor_registry = {}
         self.ready_queue = ready_queue
@@ -367,7 +379,6 @@ class Consumer(object):
         """
 
         self.init_callback(self)
-
         while self._state != CLOSE:
             self.maybe_shutdown()
             try:
@@ -375,7 +386,6 @@ class Consumer(object):
                 self.consume_messages()
             except self.connection_errors + self.channel_errors:
                 error(RETRY_CONNECTION, exc_info=True)
-
     def on_poll_init(self, hub):
         hub.update_readers(self.connection.eventmap)
         self.connection.transport.on_poll_init(hub.poller)
@@ -525,14 +535,15 @@ class Consumer(object):
             error('Control command error: %r', exc, exc_info=True)
             self.reset_pidbox_node()
 
-    def add_actor(self, actor_name):
+    def add_actor(self, actor_name, actor_id):
         """Add actor to the actor registry and start the actor main method"""
         try:
-            actor = instantiate(actor_name, connection = self.connection)
+            actor = instantiate(actor_name, connection = self.connection, 
+                                id = actor_id)
             consumer = actor.Consumer(self.connection.channel())
             consumer.consume()
             self.actor_registry[actor.id] = consumer
-            print 'Register actor in the actor registry: %s' % actor_name
+            info('Register actor in the actor registry: %s' % actor_name)
             return actor.id
         except Exception as exc:
             error('Start actor error: %r', exc, exc_info=True)
@@ -541,6 +552,7 @@ class Consumer(object):
         for _, consumer in self.actor_registry.items():
             self.maybe_conn_error(consumer.cancel)
         self.actor_registry.clear()
+        
     
     def reset_actor_nodes(self):
         for _, consumer in self.actor_registry.items():
@@ -609,6 +621,7 @@ class Consumer(object):
     def close_connection(self):
         """Closes the current broker connection and all open channels."""
 
+
         # We must set self.connection to None here, so
         # that the green pidbox thread exits.
         connection, self.connection = self.connection, None
@@ -620,6 +633,8 @@ class Consumer(object):
 
         self.stop_pidbox_node()
         self.stop_all_actors()
+        
+        [callback() for callback in self.on_close_connection]
             
         if connection:
             debug('Closing broker connection...')
@@ -765,6 +780,9 @@ class Consumer(object):
 
         # We're back!
         self._state = RUN
+        
+        for callback in self.on_reset_connection:
+            callback(self.connection)
 
     def restart_heartbeat(self):
         """Restart the heartbeat thread.
@@ -813,6 +831,7 @@ class Consumer(object):
         # anymore.
         self.close()
         debug('Stopping consumers...')
+            
         self.stop_consumers(close_connection=False)
 
     def close(self):