فهرست منبع

Fixes time_limit argument to request

Ask Solem 10 سال پیش
والد
کامیت
9ca65432ff
1فایلهای تغییر یافته به همراه14 افزوده شده و 13 حذف شده
  1. 14 13
      celery/worker/request.py

+ 14 - 13
celery/worker/request.py

@@ -68,7 +68,7 @@ class Request(object):
     acknowledged = False
     time_start = None
     worker_pid = None
-    timeouts = (None, None)
+    time_limits = (None, None)
     _already_revoked = False
     _terminate_on_ack = None
     _apply_result = None
@@ -107,8 +107,8 @@ class Request(object):
 
         name = self.name = headers['task']
         self.id = headers['id']
-        if 'timeouts' in headers:
-            self.timeouts = headers['timeouts']
+        if 'timelimit' in headers:
+            self.time_limits = headers['timelimit']
         self.on_ack = on_ack
         self.on_reject = on_reject
         self.hostname = hostname or socket.gethostname()
@@ -173,9 +173,9 @@ class Request(object):
         if self.revoked():
             raise TaskRevokedError(task_id)
 
-        timeout, soft_timeout = self.timeouts
-        timeout = timeout or task.time_limit
-        soft_timeout = soft_timeout or task.soft_time_limit
+        time_limit, soft_time_limit = self.time_limits
+        time_limit = time_limit or task.time_limit
+        soft_time_limit = soft_time_limit or task.soft_time_limit
         result = pool.apply_async(
             trace_task_ret,
             args=(self.name, task_id, self.request_dict, self.body,
@@ -184,8 +184,8 @@ class Request(object):
             timeout_callback=self.on_timeout,
             callback=self.on_success,
             error_callback=self.on_failure,
-            soft_timeout=soft_timeout or task.soft_time_limit,
-            timeout=timeout or task.time_limit,
+            soft_timeout=soft_time_limit,
+            timeout=time_limit,
             correlation_id=task_id,
         )
         # cannot create weakref to None
@@ -369,6 +369,7 @@ class Request(object):
     def acknowledge(self):
         """Acknowledge task."""
         if not self.acknowledged:
+            print('!!!!ACKING TASK!!!!')
             self.on_ack(logger, self.connection_errors)
             self.acknowledged = True
 
@@ -455,9 +456,9 @@ def create_request_cls(base, task, pool, hostname, eventer,
             if (self.expires or task_id in revoked_tasks) and self.revoked():
                 raise TaskRevokedError(task_id)
 
-            timeout, soft_timeout = self.timeouts
-            timeout = timeout or default_time_limit
-            soft_timeout = soft_timeout or default_soft_time_limit
+            time_limit, soft_time_limit = self.time_limits
+            time_limit = time_limit or default_time_limit
+            soft_time_limit = soft_time_limit or default_soft_time_limit
             result = apply_async(
                 trace,
                 args=(self.name, task_id, self.request_dict, self.body,
@@ -466,8 +467,8 @@ def create_request_cls(base, task, pool, hostname, eventer,
                 timeout_callback=self.on_timeout,
                 callback=self.on_success,
                 error_callback=self.on_failure,
-                soft_timeout=soft_timeout,
-                timeout=timeout,
+                soft_timeout=soft_time_limit,
+                timeout=time_limit,
                 correlation_id=task_id,
             )
             # cannot create weakref to None