Przeglądaj źródła

Implemented task-retried event

Ask Solem 14 lat temu
rodzic
commit
1161d871cc
3 zmienionych plików z 24 dodań i 10 usunięć
  1. 21 5
      celery/worker/job.py
  2. 2 3
      docs/internals/events.rst
  3. 1 2
      docs/userguide/monitoring.rst

+ 21 - 5
celery/worker/job.py

@@ -10,7 +10,7 @@ from celery import log
 from celery import platforms
 from celery import platforms
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
-from celery.exceptions import WorkerLostError
+from celery.exceptions import WorkerLostError, RetryTaskError
 from celery.execute.trace import TaskTrace
 from celery.execute.trace import TaskTrace
 from celery.loaders import current_loader
 from celery.loaders import current_loader
 from celery.registry import tasks
 from celery.registry import tasks
@@ -122,7 +122,7 @@ class WorkerTaskTrace(TaskTrace):
         message, orig_exc = exc.args
         message, orig_exc = exc.args
         if self._store_errors:
         if self._store_errors:
             self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
             self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
-        self.super.handle_retry(exc, type_, tb, strtb)
+        return self.super.handle_retry(exc, type_, tb, strtb)
 
 
     def handle_failure(self, exc, type_, tb, strtb):
     def handle_failure(self, exc, type_, tb, strtb):
         """Handle exception."""
         """Handle exception."""
@@ -202,6 +202,9 @@ class TaskRequest(object):
     error_msg = """
     error_msg = """
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
         Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
     """
     """
+    retry_msg = """
+        Task %(name)s[%(id)s] retry: %(exc)s
+    """
 
 
     # E-mails
     # E-mails
     email_subject = """
     email_subject = """
@@ -432,6 +435,16 @@ class TaskRequest(object):
         #     "the quick brown fox jumps over the lazy dog" :)
         #     "the quick brown fox jumps over the lazy dog" :)
         return truncate_text(repr(result), maxlen)
         return truncate_text(repr(result), maxlen)
 
 
+    def on_retry(self, exc_info):
+        self.send_event("task-retried", uuid=self.task_id,
+                                        exception=repr(exc_info.exception.exc),
+                                        traceback=repr(exc_info.traceback))
+        msg = self.retry_msg.strip() % {
+                "id": self.task_id,
+                "name": self.task_name,
+                "exc": repr(exc_info.exception.exc)}
+        self.logger.info(msg)
+
     def on_failure(self, exc_info):
     def on_failure(self, exc_info):
         """The handler used if the task raised an exception."""
         """The handler used if the task raised an exception."""
         state.task_ready(self)
         state.task_ready(self)
@@ -439,9 +452,8 @@ class TaskRequest(object):
         if self.task.acks_late:
         if self.task.acks_late:
             self.acknowledge()
             self.acknowledge()
 
 
-        self.send_event("task-failed", uuid=self.task_id,
-                                       exception=repr(exc_info.exception),
-                                       traceback=exc_info.traceback)
+        if isinstance(exc_info.exception, RetryTaskError):
+            return self.on_retry(exc_info)
 
 
         # This is a special case as the process would not have had
         # This is a special case as the process would not have had
         # time to write the result.
         # time to write the result.
@@ -450,6 +462,10 @@ class TaskRequest(object):
                 self.task.backend.mark_as_failure(self.task_id,
                 self.task.backend.mark_as_failure(self.task_id,
                                                   exc_info.exception)
                                                   exc_info.exception)
 
 
+        self.send_event("task-failed", uuid=self.task_id,
+                                       exception=repr(exc_info.exception),
+                                       traceback=exc_info.traceback)
+
         context = {"hostname": self.hostname,
         context = {"hostname": self.hostname,
                    "id": self.task_id,
                    "id": self.task_id,
                    "name": self.task_name,
                    "name": self.task_name,

+ 2 - 3
docs/internals/events.rst

@@ -36,10 +36,9 @@ Task Events
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker)
     to be sent by more than one worker)
 
 
-* task-retried(uuid, exception, traceback, hostname, delay, timestamp)
+* task-retried(uuid, exception, traceback, hostname, timestamp)
 
 
-    Sent if the task failed, but will be retried in the future.
-    (**NOT IMPLEMENTED**)
+    Sent if the task failed, but will be retried.
 
 
 Worker Events
 Worker Events
 =============
 =============

+ 1 - 2
docs/userguide/monitoring.rst

@@ -507,10 +507,9 @@ Task Events
     Sent if the task has been revoked (Note that this is likely
     Sent if the task has been revoked (Note that this is likely
     to be sent by more than one worker).
     to be sent by more than one worker).
 
 
-* ``task-retried(uuid, exception, traceback, hostname, delay, timestamp)``
+* ``task-retried(uuid, exception, traceback, hostname, timestamp)``
 
 
     Sent if the task failed, but will be retried in the future.
     Sent if the task failed, but will be retried in the future.
-    (**NOT IMPLEMENTED**)
 
 
 .. _event-reference-worker:
 .. _event-reference-worker: