Преглед на файлове

Added signals: worker_init, worker_ready, worker_shutdown

Ask Solem преди 15 години
родител
ревизия
438c68b884
променени са 4 файла, в които са добавени 40 реда и са изтрити 3 реда
  1. 5 3
      celery/bin/celeryd.py
  2. 29 0
      celery/signals.py
  3. 3 0
      celery/worker/__init__.py
  4. 3 0
      celery/worker/listener.py

+ 5 - 3
celery/bin/celeryd.py

@@ -200,9 +200,6 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
         logger = setup_logger(loglevel, logfile)
         redirect_stdouts_to_logger(logger, loglevel)
 
-    # Run the worker init handler.
-    # (Usually imports task modules and such.)
-    current_loader.on_worker_init()
 
     def run_worker():
         worker = WorkController(concurrency=concurrency,
@@ -211,6 +208,11 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
                                 embed_clockservice=run_clockservice,
                                 send_events=events,
                                 is_detached=detach)
+        # Run the worker init handler.
+        # (Usually imports task modules and such.)
+        from celery import signals
+        current_loader.on_worker_init()
+        signals.worker_init.send(sender=worker)
 
         # Install signal handler that restarts celeryd on SIGHUP,
         # (only on POSIX systems)

+ 29 - 0
celery/signals.py

@@ -84,3 +84,32 @@ Provides arguments:
 """
 task_postrun = Signal(providing_args=[
                         "task_id", "task", "args", "kwargs", "retval"])
+
+
+
+"""
+
+.. data:: worker_init
+
+Triggered before the worker is started.
+
+"""
+worker_init = Signal(providing_args=[])
+
+"""
+
+.. data:: worker_ready
+
+Triggered when the worker is ready to accept work.
+
+"""
+worker_ready = Signal(providing_args=[])
+
+"""
+
+.. data:: worker_shutdown
+
+Triggered when the worker is about to shut down.
+
+"""
+worker_shutdown = Signal(providing_args=[])

+ 3 - 0
celery/worker/__init__.py

@@ -10,6 +10,7 @@ from Queue import Queue
 from celery import conf
 from celery import registry
 from celery import platform
+from celery import signals
 from celery.log import setup_logger
 from celery.beat import ClockServiceThread
 from celery.worker.pool import TaskPool
@@ -179,6 +180,8 @@ class WorkController(object):
         if self._state != "RUN":
             return
 
+        signals.worker_shutdown.send(sender=self)
+
         [component.stop() for component in reversed(self.components)]
 
         self._state = "STOP"

+ 3 - 0
celery/worker/listener.py

@@ -5,6 +5,7 @@ from dateutil.parser import parse as parse_iso8601
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 
 from celery import conf
+from celery import signals
 from celery.utils import retry_over_time
 from celery.worker.job import TaskWrapper
 from celery.worker.revoke import revoked
@@ -61,6 +62,8 @@ class CarrotListener(object):
 
         """
 
+        signals.worker_ready.send(sender=self)
+
         while 1:
             self.reset_connection()
             try: