Forráskód Böngészése

Adds signals: eventlet_pool_started, eventlet_pool_preshutdown, eventlet_pool_postshutdown, eventlet_pool_apply

Ask Solem 14 éve
szülő
commit
9c04913c9d
3 módosított fájl, 74 hozzáadás és 1 törlés
  1. 5 0
      celery/concurrency/evlet.py
  2. 10 1
      celery/datastructures.py
  3. 59 0
      celery/signals.py

+ 5 - 0
celery/concurrency/evlet.py

@@ -106,13 +106,18 @@ class TaskPool(base.BasePool):
 
     def on_start(self):
         self._pool = self.Pool(self.limit)
+        signals.eventlet_pool_started.send(sender=self)
 
     def on_stop(self):
+        signals.eventlet_pool_preshutdown.send(sender=self)
         if self._pool is not None:
             self._pool.waitall()
+        signals.eventlet_pool_postshutdown.send(sender=self)
 
     def on_apply(self, target, args=None, kwargs=None, callback=None,
             accept_callback=None, **_):
+        signals.eventlet_pool_apply.send(sender=self,
+                target=target, args=args, kwargs=kwargs)
         self._pool.spawn_n(apply_target, target, args, kwargs,
                            callback, accept_callback,
                            self.getcurrent)

+ 10 - 1
celery/datastructures.py

@@ -84,6 +84,15 @@ class DictAttribute(object):
 
 
 class ConfigurationView(AttributeDictMixin):
+    """A view over an applications configuration dicts.
+
+    If the key does not exist in ``changes``, the ``defaults`` dict
+    is consulted.
+
+    :param changes:  Dict containing changes to the configuration.
+    :param defaults: Dict containing the default configuration.
+
+    """
     changes = None
     defaults = None
 
@@ -198,7 +207,7 @@ class LimitedSet(object):
     consume too much resources.
 
     :keyword maxlen: Maximum number of members before we start
-                     deleting expired members.
+                     evicting expired members.
     :keyword expires: Time in seconds, before a membership expires.
 
     """

+ 59 - 0
celery/signals.py

@@ -211,6 +211,60 @@ Dispatched in addition to the :signal:`beat_init` signal when celerybeat is
 started as an embedded process.  Sender is the
 :class:`celery.beat.Service` instance.
 
+Eventlet Signals
+----------------
+
+.. signal:: eventlet_pool_started
+
+eventlet_pool_started
+~~~~~~~~~~~~~~~~~~~~~
+
+Sent when the eventlet pool has been started.
+
+Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
+
+.. signal:: eventlet_pool_preshutdown
+
+eventlet_pool_preshutdown
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sent when the worker shutdown, just before the eventlet pool
+is requested to wait for remaining workers.
+
+Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
+
+.. signal:: eventlet_pool_postshutdown
+
+eventlet_pool_postshutdown
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sent when the pool has been joined and the worker is ready to shutdown.
+
+Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
+
+.. signal:: eventlet_pool_apply
+
+eventlet_pool_apply
+~~~~~~~~~~~~~~~~~~~
+
+Sent whenever a task is applied to the pool.
+
+Sender is the :class:`celery.concurrency.evlet.TaskPool` instance.
+
+Provides arguments:
+
+* target
+
+    The target function.
+
+* args
+
+    Positional arguments.
+
+* kwargs
+
+    Keyword arguments.
+
 
 """
 from celery.utils.dispatch import Signal
@@ -239,3 +293,8 @@ setup_logging = Signal(providing_args=["loglevel", "logfile",
 
 beat_init = Signal(providing_args=[])
 beat_embedded_init = Signal(providing_args=[])
+
+eventlet_pool_started = Signal(providing_args=[])
+eventlet_pool_preshutdown = Signal(providing_args=[])
+eventlet_pool_postshutdown = Signal(providing_args=[])
+eventlet_pool_apply = Signal(providing_args=["target", "args", "kwargs"])