Explorar o código

Restart actors consumers after connection reset

Rumyana Neykova %!s(int64=12) %!d(string=hai) anos
pai
achega
84246390bf
Modificáronse 1 ficheiros con 28 adicións e 29 borrados
  1. 28 29
      celery/worker/actorsbootstrap.py

+ 28 - 29
celery/worker/actorsbootstrap.py

@@ -5,14 +5,13 @@ from celery.utils.imports import instantiate
 from kombu.utils import uuid
 from .bootsteps import StartStopComponent
 from celery.utils.log import get_logger
+from celery.worker.consumer import debug
 
 logger = get_logger(__name__)
 info, warn, error, crit = (logger.info, logger.warn,
                            logger.error, logger.critical)
 
-#from examples.workflow import TestActor
-#
-#
+
 class WorkerComponent(StartStopComponent):
     """This component starts an ActorManager instance if actors support is enabled."""
     name = 'worker.actors-manager'
@@ -29,7 +28,7 @@ class WorkerComponent(StartStopComponent):
         w.actors_manager = None
     
     def create(self, w):
-        print 'create ActorsManager'
+        debug('create ActorsManager')
         actor = w.actors_manager = self.instantiate(self.ActorsManager(w), 
                                                     app = w.app)
         actor.app = w.app
@@ -39,7 +38,7 @@ class WorkerComponent(StartStopComponent):
 
 class ActorProxy(object):
     """
-    A class that represent an actor started remotely
+    A class that represents an actor started remotely
     """
     def __init__(self, local_actor, actor_id, async_start_result):
         self.__subject = local_actor.__copy__()
@@ -76,35 +75,41 @@ class ActorsManager(Actor):
         return Actor.contribute_to_state(self, state)
     
     class state(Actor.state):
+        def _start_actor_consumer(self, actor):
+            consumer = actor.Consumer(self.connection.channel())                
+            consumer.consume()
+            self.actor_registry[actor.id] = (actor, consumer)
+            
         def add_actor(self, name, id = None):
             """Add actor to the actor registry and start the actor's main method"""
             try:
                 actor = instantiate(name, connection = self.connection, 
                                     id = id)
-                consumer = actor.Consumer(self.connection.channel())                
-                consumer.consume()
-                if actor.id not in self.actor_registry:
-                    self.actor_registry[actor.id] = consumer
-                else:
+                self._start_actor_consumer(actor)
+                if actor.id in self.actor_registry:
                     warn('Actor with the same id already exists')
-                print 'Register actor in the actor registry: %s' % name
+                debug('Register actor in the actor registry: %s' % name)
                 return actor.id
             except Exception as exc:
                 error('Start actor error: %r', exc, exc_info=True)
-            
+             
         def stop_all(self):
-            for _, consumer in self.actor_registry.items():
+            for _, (_, consumer) in self.actor_registry.items():
                 self.maybe_conn_error(consumer.cancel)
             self.actor_registry.clear()
 
         def reset(self):
-            for _, consumer in self.actor_registry.items():
+            debug('Resetting active actors')
+            print self.actor_registry.items()
+            for id, (actor, consumer) in self.actor_registry.items():
                 self.maybe_conn_error(consumer.cancel)
-                consumer.consume()
+                # TODO:setting the connection here seems wrong ?
+                actor.connection = self.connection
+                self._start_actor_consumer(actor)
     
         def stop_actor(self, actor_id):
             if actor_id in self.actor_registry:
-                consumer = self.actor_registry.pop(actor_id)
+                _ , (_, consumer) = self.actor_registry.pop(actor_id)
                 self.maybe_conn_error(consumer.cancel)
                 
         def maybe_conn_error(self, fun):
@@ -126,33 +131,27 @@ class ActorsManager(Actor):
         actor_proxy = ActorProxy(actor, actor_id, res)
         return actor_proxy
         
-        
-    def stop_actor_by_name(self, name, nowait=False):              
-        return self.scatter('stop_actor', {'name' : name}, nowait=nowait)
-        
     def stop_actor_by_id(self, actor_id, nowait=False):              
-        return self.call('stop_actor', {'actor_id' : actor_id},
-                         type = 'round-robin', 
-                         nowait=nowait)   
+        return self.scatter('stop_actor', {'actor_id' : actor_id}, 
+                            nowait=nowait)   
         
     def start(self):
-        print 'Starting ActorsManager'
+        debug('Starting ActorsManager')
     
     def stop(self):
         if self.actors_consumer:
-            self.actors_consumer.stop()
+            self.actors_consumer.cancel()
     
     def on_start(self, connection):
-        print 'establishing connection for ActorsManager'
         self.connection = connection
         actor_consumer = self.Consumer(self.connection.channel())
-        actor_consumer.consume()
-        print 'Start consuming'
+        debug('ActorsManager start consuming blabla')
         self.actor_consumer = actor_consumer
+        self.actor_consumer.consume()
         self.contribute_to_state(self.state)
         
     def on_consumer_ready(self, consumer):
-        print 'ActorsManager in On consumer ready'
+        debug('ActorsManager in On consumer ready')
         if consumer.connection: 
             raise Exception('Consumer is ready.')
         consumer.on_reset_connection.append(self.on_start)