Procházet zdrojové kódy

Added signals: task_prerun + task_postrun

Ask Solem před 15 roky
rodič
revize
e0c941cd33
2 změnil soubory, kde provedl 61 přidání a 1 odebrání
  1. 54 0
      celery/signals.py
  2. 7 1
      celery/worker/job.py

+ 54 - 0
celery/signals.py

@@ -0,0 +1,54 @@
+from django.dispatch import Signal
+
+
+"""
+
+.. DATA: task_prerun
+
+Triggered before a task is executed.
+
+Provides arguments:
+
+* task_id
+    Id of the task to be executed.
+
+* task
+    The task being executed.
+
+* args
+    the tasks positional arguments.
+
+* kwargs
+    The tasks keyword arguments.
+
+"""
+task_prerun = Signal(providing_args=[
+                        "task_id", "task", "args", "kwargs"])
+
+"""
+
+.. DATA: task_postrun
+
+Triggered after a task has been executed.
+
+Provides arguments:
+
+* task_id
+    Id of the task to be executed.
+
+* task
+    The task being executed.
+
+* args
+    the tasks positional arguments.
+
+* kwargs
+    The tasks keyword arguments.
+
+* retval
+
+    The return value of the task.
+
+"""
+task_postrun = Signal(providing_args=[
+                        "task_id", "task", "args", "kwargs", "retval"])

+ 7 - 1
celery/worker/job.py

@@ -10,6 +10,7 @@ from celery.loaders import current_loader
 from django.core.mail import mail_admins
 from celery.monitoring import TaskTimerStats
 from celery.task.base import RetryTaskError
+from celery import signals
 import multiprocessing
 import traceback
 import socket
@@ -59,7 +60,9 @@ def jail(task_id, task_name, func, args, kwargs):
     timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
 
     # Run task loader init handler.
-    current_loader.on_worker_init()
+    current_loader.on_task_init(task_id, func)
+    signals.task_prerun.send(sender=func, task_id=task_id, task=func,
+                             args=args, kwargs=kwargs)
 
     # Backend process cleanup
     default_backend.process_cleanup()
@@ -102,6 +105,9 @@ def jail(task_id, task_name, func, args, kwargs):
     finally:
         timer_stat.stop()
 
+    signals.task_postrun.send(sender=func, task_id=task_id, task=func,
+                              args=args, kwargs=kwargs, retval=retval)
+
     return retval