Переглянути джерело

Task.send_events can now be set to disable events for that task.

Ask Solem 8 роки тому
батько
коміт
c18c502cf9

+ 8 - 0
celery/app/task.py

@@ -185,6 +185,14 @@ class Task(object):
     #: (``result.children``).
     trail = True
 
+    #: If enabled the worker will send monitoring events related to
+    #: this task (but only if the worker is configured to send
+    #: task related events).
+    #: Note that this has no effect on the task-failure event case
+    #: where a task is not registered (as it will have no task class
+    #: to check this flag).
+    send_events = True
+
     #: When enabled errors will be stored even if the task is otherwise
     #: configured to ignore results.
     store_errors_even_if_ignored = None

+ 7 - 0
celery/tests/worker/test_request.py

@@ -385,6 +385,13 @@ class test_Request(RequestCase):
         job.send_event('task-frobulated')
         job.eventer.send.assert_called_with('task-frobulated', uuid=job.id)
 
+    def test_send_events__disabled_at_task_level(self):
+        job = self.xRequest()
+        job.task.send_events = False
+        job.eventer = Mock(name='.eventer')
+        job.send_event('task-frobulated')
+        job.eventer.send.assert_not_called()
+
     def test_on_retry(self):
         job = self.get_request(self.mytask.s(1, f='x'))
         job.eventer = Mock(name='.eventer')

+ 1 - 1
celery/worker/request.py

@@ -268,7 +268,7 @@ class Request(object):
         return False
 
     def send_event(self, type, **fields):
-        if self.eventer and self.eventer.enabled:
+        if self.eventer and self.eventer.enabled and self.task.send_events:
             self.eventer.send(type, uuid=self.id, **fields)
 
     def on_accepted(self, pid, time_accepted):

+ 7 - 2
celery/worker/strategy.py

@@ -58,11 +58,16 @@ def default(task, app, consumer,
             to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t,
             proto1_to_proto2=proto1_to_proto2):
     hostname = consumer.hostname
-    eventer = consumer.event_dispatcher
     connection_errors = consumer.connection_errors
     _does_info = logger.isEnabledFor(logging.INFO)
+
+    # task event related
+    # (optimized to avoid calling request.send_event)
+    eventer = consumer.event_dispatcher
     events = eventer and eventer.enabled
     send_event = eventer.send
+    task_sends_events = events and task.send_events
+
     call_at = consumer.timer.call_at
     apply_eta_task = consumer.apply_eta_task
     rate_limits_enabled = not consumer.disable_rate_limits
@@ -96,7 +101,7 @@ def default(task, app, consumer,
         if (req.expires or req.id in revoked_tasks) and req.revoked():
             return
 
-        if events:
+        if task_sends_events:
             send_event(
                 'task-received',
                 uuid=req.id, name=req.name,