Ask Solem 12 anni fa
parent
commit
ef86abe9fb
5 ha cambiato i file con 102 aggiunte e 169 eliminazioni
  1. 2 8
      celery/app/control.py
  2. 1 2
      celery/app/defaults.py
  3. 95 121
      celery/worker/actors.py
  4. 1 29
      celery/worker/consumer.py
  5. 3 9
      celery/worker/control.py

+ 2 - 8
celery/app/control.py

@@ -76,13 +76,14 @@ class Inspect(object):
 
     def ping(self):
         return self._request('ping')
-    
+
     def active_queues(self):
         return self._request('active_queues')
 
     def conf(self):
         return self._request('dump_conf')
 
+
 class Control(object):
     Mailbox = Mailbox
 
@@ -139,13 +140,6 @@ class Control(object):
         return self.broadcast('ping', reply=True, destination=destination,
                               timeout=timeout, **kwargs)
 
-    def start_actor(self, actor_name, actor_id = None, 
-                    destination = None, timeout = 1, **kwargs):
-        return self.broadcast('start_actor', name = actor_name, 
-                              actor_id = actor_id, 
-                              reply=True, destination=destination,
-                              timeout=timeout, **kwargs)
-        
     def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
         """Tell all (or specific) workers to set a new rate limit
         for task by type.

+ 1 - 2
celery/app/defaults.py

@@ -150,8 +150,7 @@ NAMESPACES = {
         'WORKER_DIRECT': Option(False, type='bool'),
     },
     'CELERYD': {
-        'ACTORS_MANAGER': Option(
-            'celery.worker.actorsbootstrap:ActorsManager'),
+        'AGENT': Option('celery.worker.actors:Agent'),
         'AUTOSCALER': Option('celery.worker.autoscale:Autoscaler'),
         'AUTORELOADER': Option('celery.worker.autoreload:Autoreloader'),
         'BOOT_STEPS': Option((), type='tuple'),

+ 95 - 121
celery/worker/actors.py

@@ -1,158 +1,132 @@
+from __future__ import absolute_import
 
-import bootsteps
+from celery.bootsteps import StartStopStep
+from celery.utils.imports import instantiate, qualname
+from celery.utils.log import get_logger
+from celery.worker.consumer import Connection
 from cell import Actor
-from celery.utils.imports import instantiate
+from kombu.common import ignore_errors
 from kombu.utils import uuid
-from .bootsteps import StartStopComponent
-from celery.utils.log import get_logger
-from celery.worker.consumer import debug
 
 logger = get_logger(__name__)
-info, warn, error, crit = (logger.info, logger.warn,
-                           logger.error, logger.critical)
-
-
-class WorkerComponent(StartStopComponent):
-    """This component starts an ActorManager instance if actors support is enabled."""
-    name = 'worker.actors-manager'
-    consumer = None
-    
-    def ActorsManager(self, w):
-        return (w.actors_manager_cls or ActorsManager)
-    
-    def include_if(self, w):
-        #return w.actors_enabled
-        return True
-    
-    def init(self, w, **kwargs):
-        w.actors_manager = None
-    
-    def create(self, w):
-        debug('create ActorsManager')
-        actor = w.actors_manager = self.instantiate(self.ActorsManager(w), 
-                                                    app = w.app)
-        actor.app = w.app
-        w.on_consumer_ready_callbacks.append(actor.on_consumer_ready)
-        return actor
+debug, warn, error = logger.debug, logger.warn, logger.error
+
+
+class Bootstep(StartStopStep):
+    requires = (Connection, )
+
+    def __init__(self, c, **kwargs):
+        c.agent = None
+
+    def create(self, c):
+        agent = c.app.conf.CELERYD_AGENT
+        agent = c.agent = self.instantiate(c.app.conf.CELERYD_AGENT,
+            connection=c.connection, app=c.app,
+        )
+        return agent
 
 
 class ActorProxy(object):
-    """
-    A class that represents an actor started remotely
-    """
+    """A class that represents an actor started remotely."""
+
     def __init__(self, local_actor, actor_id, async_start_result):
         self.__subject = local_actor.__copy__()
         self.__subject.id = actor_id
         self.async_start_result = async_start_result
-    
+
     def __getattr__(self, name):
             return getattr(self.__subject, name)
-    
+
     def wait_to_start(self):
         self.async_start_result._result
-         
 
-class ActorsManager(Actor):
-    connection = None
+
+class Agent(Actor):
     types = ('round-robin', 'scatter')
-    actor_registry = {}
-    actors_consumer = None
-    connection = None
-    app = None
-    
-    def __init__(self, app=None, *args, **kwargs):
-        self.app = app
-        super(ActorsManager, self).__init__(*args, **kwargs)
-            
-    def contribute_to_state(self, state):
-        state.actor_registry = self.actor_registry
-        state.connection = self.connection
-        conninfo = self.app.connection()
-        state.connection_errors = conninfo.connection_errors
-        state.channel_errors = conninfo.channel_errors
-        state.reset()
-        return Actor.contribute_to_state(self, state)
-    
-    class state(Actor.state):
+
+    class state(object):
+
         def _start_actor_consumer(self, actor):
-            consumer = actor.Consumer(self.connection.channel())                
-            consumer.consume()
-            self.actor_registry[actor.id] = (actor, consumer)
-            
-        def add_actor(self, name, id = None):
-            """Add actor to the actor registry and start the actor's main method"""
+            actor.consumer = actor.Consumer(self.connection.channel())
+            actor.consumer.consume()
+            self.actor.registry[actor.id] = actor
+
+        def add_actor(self, name, id=None):
+            """Add actor to the registry and start the actor's main method."""
             try:
-                actor = instantiate(name, connection = self.connection, 
-                                    id = id)
+                actor = instantiate(name, connection=self.connection, id=id)
+                if actor.id in self.actor.registry:
+                    warn('Actor id %r already exists', actor.id)
                 self._start_actor_consumer(actor)
-                if actor.id in self.actor_registry:
-                    warn('Actor with the same id already exists')
-                debug('Register actor in the actor registry: %s' % name)
+                debug('Actor registered: %s', name)
                 return actor.id
             except Exception as exc:
-                error('Start actor error: %r', exc, exc_info=True)
-             
+                error('Cannot start actor: %r', exc, exc_info=True)
+
         def stop_all(self):
-            for _, (_, consumer) in self.actor_registry.items():
-                self.maybe_conn_error(consumer.cancel)
-            self.actor_registry.clear()
+            self.actor.shutdown()
 
         def reset(self):
             debug('Resetting active actors')
-            print self.actor_registry.items()
-            for id, (actor, consumer) in self.actor_registry.items():
-                self.maybe_conn_error(consumer.cancel)
-                # TODO:setting the connection here seems wrong ?
+            for actor in self.actor.registry.itervalues():
+                if actor.consumer:
+                    ignore_errors(self.connection, actor.consumer.cancel)
                 actor.connection = self.connection
                 self._start_actor_consumer(actor)
-    
-        def stop_actor(self, actor_id):
-            if actor_id in self.actor_registry:
-                (_, consumer) = self.actor_registry.pop(actor_id)
-                self.maybe_conn_error(consumer.cancel)
-                
-        def maybe_conn_error(self, fun):
-            """Applies function but ignores any connection or channel
-            errors raised."""
+
+        def stop_actor(self, id):
             try:
-                fun()
-            except (AttributeError, ) + \
-                    self.connection_errors + \
-                    self.channel_errors:
+                actor = self.actor.registry.pop(id)
+            except KeyError:
                 pass
-        
+            else:
+                if actor.consumer and actor.consumer.channel:
+                    ignore_errors(self.connection, consumer.cancel)
+
+    def __init__(self, connection, app=None, *args, **kwargs):
+        self.connection = connection
+        self.app = app
+        self.registry = {}
+        super(ActorManager, self).__init__(*args, **kwargs)
+
+    def contribute_to_state(self, state):
+        state.connection = self.connection
+        conninfo = self.app.connection()
+        state.connection_errors = conninfo.connection_errors
+        state.channel_errors = conninfo.channel_errors
+        state.reset()
+        return super(ActorsManager, self).contribute_to_state(state)
+
     def add_actor(self, actor, nowait=False):
-        name = "%s.%s"%(actor.__class__.__module__, 
-                        actor.__class__.__name__)
+        name = qualname(actor)
         actor_id = uuid()
-        res = self.call('add_actor', {'name': name, 'id' : actor_id}, 
-                        type = 'round-robin', nowait = 'True')
+        res = self.call('add_actor', {'name': name, 'id': actor_id},
+                        type='round-robin', nowait=True)
         actor_proxy = ActorProxy(actor, actor_id, res)
         return actor_proxy
-        
-    def stop_actor_by_id(self, actor_id, nowait=False):              
-        return self.scatter('stop_actor', {'actor_id' : actor_id}, 
-                            nowait=nowait)   
-    
+
+    def stop_actor_by_id(self, actor_id, nowait=False):
+        return self.scatter('stop_actor', {'actor_id': actor_id},
+                            nowait=nowait)
+
     def start(self):
-        debug('Starting ActorsManager')
-    
+        debug('Starting Agent')
+
+    def _shutdown(self, cancel=True, close=True, clear=True):
+        try:
+            for actor in self.registry.itervalues():
+                if actor and actor.consumer:
+                    if cancel:
+                        ignore_errors(self.connection, actor.consumer.cancel)
+                    if close and actor.consumer.channel:
+                        ignore_errors(self.connection,
+                                      actor.consumer.channel.close)
+        finally:
+            if clear:
+                self.registry.clear()
+
     def stop(self):
-        if self.actors_consumer:
-            self.actors_consumer.cancel()
-    
-    def on_start(self, connection):
-        self.connection = connection
-        actor_consumer = self.Consumer(self.connection.channel())
-        debug('ActorsManager start consuming blabla')
-        self.actor_consumer = actor_consumer
-        self.actor_consumer.consume()
-        self.contribute_to_state(self.state)
-        
-    def on_consumer_ready(self, consumer):
-        debug('ActorsManager in On consumer ready')
-        if consumer.connection: 
-            raise Exception('Consumer is ready.')
-        consumer.on_reset_connection.append(self.on_start)
-        consumer.on_close_connection.append(self.stop)
-     
+        self._shutdown(clear=False)
+
+    def shutdown(self):
+        self._shutdown(cancel=False)

+ 1 - 29
celery/worker/consumer.py

@@ -118,6 +118,7 @@ class Consumer(object):
             'celery.worker.consumer:Control',
             'celery.worker.consumer:Tasks',
             'celery.worker.consumer:Evloop',
+            'celery.worker.actors:Bootstep',
         ]
 
         def shutdown(self, parent):
@@ -180,35 +181,6 @@ class Consumer(object):
                     error(CONNECTION_RETRY, exc_info=True)
                     ns.restart(self)
 
-    def add_actor(self, actor_name, actor_id):
-        """Add actor to the actor registry and start the actor main method"""
-        try:
-            actor = instantiate(actor_name, connection = self.connection,
-                                id = actor_id)
-            consumer = actor.Consumer(self.connection.channel())
-            consumer.consume()
-            self.actor_registry[actor.id] = consumer
-            info('Register actor in the actor registry: %s' % actor_name)
-            return actor.id
-        except Exception as exc:
-            error('Start actor error: %r', exc, exc_info=True)
-
-    def stop_all_actors(self):
-        for _, consumer in self.actor_registry.items():
-            self.maybe_conn_error(consumer.cancel)
-        self.actor_registry.clear()
-
-
-    def reset_actor_nodes(self):
-        for _, consumer in self.actor_registry.items():
-            self.maybe_conn_error(consumer.cancel)
-            consumer.consume()
-
-    def stop_actor(self, actor_id):
-        if actor_id in self.actor_registry:
-            consumer = self.actor_registry.pop(actor_id)
-            self.maybe_conn_error(consumer.cancel)
-
     def shutdown(self):
         self.namespace.shutdown(self)
 

+ 3 - 9
celery/worker/control.py

@@ -53,6 +53,7 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
     logger.info('Revoking task %s', task_id)
     return {'ok': 'revoking task {0}'.format(task_id)}
 
+
 @Panel.register
 def report(panel):
     return {'ok': panel.app.bugreport()}
@@ -213,7 +214,7 @@ def dump_tasks(panel, taskinfoitems=None, **kwargs):
 
 @Panel.register
 def ping(panel, **kwargs):
-    return {'ok':'pong'}
+    return {'ok': 'pong'}
 
 
 @Panel.register
@@ -275,14 +276,7 @@ def active_queues(panel):
     return [dict(queue.as_dict(recurse=True))
                     for queue in panel.consumer.task_consumer.queues]
 
+
 @Panel.register
 def dump_conf(panel, **kwargs):
     return jsonify(dict(panel.app.conf))
-
-@Panel.register
-def start_actor(panel, name, actor_id):
-    return panel.consumer.add_actor(name, actor_id)
-
-@Panel.register
-def stop_actor(panel, actor_id):
-    return panel.consumer.stop_actor(actor_id)