Selaa lähdekoodia

Worker now stores NotRegisteredError for unknown task, and adds task_rejected + task_unknown signals. Closes #2092

Ask Solem 9 vuotta sitten
vanhempi
commit
20424c5561
3 muutettua tiedostoa jossa 52 lisäystä ja 1 poistoa
  1. 6 0
      celery/signals.py
  2. 7 1
      celery/worker/consumer.py
  3. 39 0
      docs/userguide/signals.rst

+ 6 - 0
celery/signals.py

@@ -50,6 +50,12 @@ task_failure = Signal(providing_args=[
 task_revoked = Signal(providing_args=[
     'request', 'terminated', 'signum', 'expired',
 ])
+task_rejected = Signal(providing_args=[
+    'message', 'exc',
+])
+task_unknown = Signal(providing_args=[
+    'message', 'exc',
+])
 celeryd_init = Signal(providing_args=['instance', 'conf', 'options'])
 celeryd_after_setup = Signal(providing_args=['instance', 'conf'])
 import_modules = Signal(providing_args=[])

+ 7 - 1
celery/worker/consumer.py

@@ -32,9 +32,10 @@ from kombu.utils.encoding import safe_repr, bytes_t
 from kombu.utils.limits import TokenBucket
 
 from celery import bootsteps
+from celery import signals
 from celery.app.trace import build_tracer
 from celery.canvas import signature
-from celery.exceptions import InvalidTaskError
+from celery.exceptions import InvalidTaskError, NotRegistered
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.text import truncate
@@ -434,14 +435,19 @@ class Consumer(object):
     def on_unknown_message(self, body, message):
         warn(UNKNOWN_FORMAT, self._message_report(body, message))
         message.reject_log_error(logger, self.connection_errors)
+        signals.task_rejected.send(sender=self, message=message, exc=None)
 
     def on_unknown_task(self, body, message, exc):
         error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
         message.reject_log_error(logger, self.connection_errors)
+        self.app.backend.mark_as_failure(
+            message.headers['id'], NotRegistered(message.headers['task']))
+        signals.task_unknown.send(sender=self, message=message, exc=exc)
 
     def on_invalid_task(self, body, message, exc):
         error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
         message.reject_log_error(logger, self.connection_errors)
+        signals.task_rejected.send(sender=self, message=message, exc=exc)
 
     def update_strategies(self):
         loader = self.app.loader

+ 39 - 0
docs/userguide/signals.rst

@@ -300,6 +300,45 @@ Provides arguments:
 * expired
   Set to :const:`True` if the task expired.
 
+.. signal:: task_unknown
+
+task_unknown
+~~~~~~~~~~~~
+
+Dispatched when a worker receives a message for a task that is not registered.
+
+Sender is the worker :class:`~celery.worker.consumer.Consumer`.
+
+Provides arguments:
+
+* message
+
+    Raw message object.
+
+* exc
+
+    The error that occurred.
+
+.. signal:: task_rejected
+
+task_rejected
+~~~~~~~~~~~~~
+
+Dispatched when a worker receives an unknown type of message to one of its
+task queues.
+
+Sender is the worker :class:`~celery.worker.consumer.Consumer`.
+
+Provides arguments:
+
+* message
+
+  Raw message object.
+
+* exc
+
+    The error that occurred (if any).
+
 App Signals
 -----------