Browse Source

Pass around the retries count (available as task kwarg "task_retries")

Ask Solem 15 năm trước cách đây
mục cha
commit
fe62c47cb0
1 tập tin đã thay đổi với 5 bổ sung2 xóa
  1. 5 2
      celery/worker/job.py

+ 5 - 2
celery/worker/job.py

@@ -146,10 +146,11 @@ class TaskWrapper(object):
     fail_email_body = TASK_FAIL_EMAIL_BODY
     fail_email_body = TASK_FAIL_EMAIL_BODY
 
 
     def __init__(self, task_name, task_id, task_func, args, kwargs,
     def __init__(self, task_name, task_id, task_func, args, kwargs,
-            on_ack=None, **opts):
+            on_ack=None, retries=0, **opts):
         self.task_name = task_name
         self.task_name = task_name
         self.task_id = task_id
         self.task_id = task_id
         self.task_func = task_func
         self.task_func = task_func
+        self.retries = retries
         self.args = args
         self.args = args
         self.kwargs = kwargs
         self.kwargs = kwargs
         self.logger = kwargs.get("logger")
         self.logger = kwargs.get("logger")
@@ -181,6 +182,7 @@ class TaskWrapper(object):
         task_id = message_data["id"]
         task_id = message_data["id"]
         args = message_data["args"]
         args = message_data["args"]
         kwargs = message_data["kwargs"]
         kwargs = message_data["kwargs"]
+        retries = message_data["retries"]
 
 
         # Convert any unicode keys in the keyword arguments to ascii.
         # Convert any unicode keys in the keyword arguments to ascii.
         kwargs = dict((key.encode("utf-8"), value)
         kwargs = dict((key.encode("utf-8"), value)
@@ -190,7 +192,7 @@ class TaskWrapper(object):
             raise NotRegistered(task_name)
             raise NotRegistered(task_name)
         task_func = tasks[task_name]
         task_func = tasks[task_name]
         return cls(task_name, task_id, task_func, args, kwargs,
         return cls(task_name, task_id, task_func, args, kwargs,
-                    on_ack=message.ack, logger=logger)
+                    retries=retries, on_ack=message.ack, logger=logger)
 
 
     def extend_with_default_kwargs(self, loglevel, logfile):
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
         """Extend the tasks keyword arguments with standard task arguments.
@@ -203,6 +205,7 @@ class TaskWrapper(object):
                             "loglevel": loglevel,
                             "loglevel": loglevel,
                             "task_id": self.task_id,
                             "task_id": self.task_id,
                             "task_name": self.task_name}
                             "task_name": self.task_name}
+                            "task_retries": self.retries}
         kwargs.update(task_func_kwargs)
         kwargs.update(task_func_kwargs)
         return kwargs
         return kwargs