Pārlūkot izejas kodu

Implemented per task time limits: Task.time_limit + Task.soft_time_limit. NOTE That a global time limit must be enabled for the timeout handler thread to be started

Ask Solem 15 gadi atpakaļ
vecāks
revīzija
a311ed9c69

+ 4 - 2
celery/concurrency/processes/__init__.py

@@ -103,7 +103,7 @@ class TaskPool(object):
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, accept_callback=None, timeout_callback=None,
-            **compat):
+            soft_timeout=None, timeout=None, **compat):
         """Equivalent of the :func:``apply`` built-in function.
 
         All ``callbacks`` and ``errbacks`` should complete immediately since
@@ -126,7 +126,9 @@ class TaskPool(object):
                                       accept_callback=accept_callback,
                                       timeout_callback=timeout_callback,
                                       error_callback=on_worker_error,
-                                      waitforslot=self.putlocks)
+                                      waitforslot=self.putlocks,
+                                      soft_timeout=soft_timeout,
+                                      timeout=timeout)
 
     def on_worker_error(self, errbacks, exc):
         einfo = ExceptionInfo((exc.__class__, exc, None))

+ 23 - 13
celery/concurrency/processes/pool.py

@@ -248,7 +248,7 @@ class TimeoutHandler(PoolThread):
             if time.time() >= start + timeout:
                 return True
 
-        def _on_soft_timeout(job, i):
+        def _on_soft_timeout(job, i, soft_timeout):
             debug('soft time limit exceeded for %i' % i)
             process, _index = _process_by_pid(job._worker_pid)
             if not process:
@@ -256,7 +256,7 @@ class TimeoutHandler(PoolThread):
 
             # Run timeout callback
             if job._timeout_callback is not None:
-                job._timeout_callback(soft=True)
+                job._timeout_callback(soft=True, timeout=soft_timeout)
 
             try:
                 os.kill(job._worker_pid, SIG_SOFT_TIMEOUT)
@@ -268,15 +268,15 @@ class TimeoutHandler(PoolThread):
 
             dirty.add(i)
 
-        def _on_hard_timeout(job, i):
+        def _on_hard_timeout(job, i, hard_timeout):
             debug('hard time limit exceeded for %i', i)
             # Remove from _pool
             process, _index = _process_by_pid(job._worker_pid)
             # Remove from cache and set return value to an exception
-            job._set(i, (False, TimeLimitExceeded()))
+            job._set(i, (False, TimeLimitExceeded(hard_timeout)))
             # Run timeout callback
             if job._timeout_callback is not None:
-                job._timeout_callback(soft=False)
+                job._timeout_callback(soft=False, timeout=hard_timeout)
             if not process:
                 return
             # Terminate the process
@@ -291,10 +291,16 @@ class TimeoutHandler(PoolThread):
 
             for i, job in cache.items():
                 ack_time = job._time_accepted
-                if _timed_out(ack_time, t_hard):
-                    _on_hard_timeout(job, i)
-                elif i not in dirty and _timed_out(ack_time, t_soft):
-                    _on_soft_timeout(job, i)
+                soft_timeout = job._soft_timeout
+                if soft_timeout is None:
+                    soft_timeout = t_soft
+                hard_timeout = job._timeout
+                if hard_timeout is None:
+                    hard_timeout = t_hard
+                if _timed_out(ack_time, hard_timeout):
+                    _on_hard_timeout(job, i, hard_timeout)
+                elif i not in dirty and _timed_out(ack_time, soft_timeout):
+                    _on_soft_timeout(job, i, soft_timeout)
 
             time.sleep(0.5)                     # Don't waste CPU cycles.
 
@@ -460,7 +466,7 @@ class Pool(object):
         self._task_handler.start()
 
         # Thread killing timedout jobs.
-        if self.timeout or self.soft_timeout:
+        if self.timeout is not None or self.soft_timeout is not None:
             self._timeout_handler = self.TimeoutHandler(
                     self._pool, self._cache,
                     self.soft_timeout, self.timeout)
@@ -610,7 +616,8 @@ class Pool(object):
 
     def apply_async(self, func, args=(), kwds={},
             callback=None, accept_callback=None, timeout_callback=None,
-            waitforslot=False, error_callback=None):
+            waitforslot=False, error_callback=None,
+            soft_timeout=None, timeout=None):
         '''
         Asynchronous equivalent of `apply()` builtin.
 
@@ -629,7 +636,7 @@ class Pool(object):
         assert self._state == RUN
         result = ApplyResult(self._cache, callback,
                              accept_callback, timeout_callback,
-                             error_callback)
+                             error_callback, soft_timeout, timeout)
         if waitforslot:
             self._putlock.acquire()
         self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
@@ -763,7 +770,8 @@ DynamicPool = Pool
 class ApplyResult(object):
 
     def __init__(self, cache, callback, accept_callback=None,
-            timeout_callback=None, error_callback=None):
+            timeout_callback=None, error_callback=None, soft_timeout=None,
+            timeout=None):
         self._cond = threading.Condition(threading.Lock())
         self._job = job_counter.next()
         self._cache = cache
@@ -772,6 +780,8 @@ class ApplyResult(object):
         self._accept_callback = accept_callback
         self._errback = error_callback
         self._timeout_callback = timeout_callback
+        self._timeout = timeout
+        self._soft_timeout = soft_timeout
 
         self._accepted = False
         self._worker_pid = None

+ 12 - 0
celery/task/base.py

@@ -215,6 +215,16 @@ class Task(object):
 
         Default task expiry time in seconds or a :class:`~datetime.datetime`.
 
+    .. attribute:: time_limit
+
+        Hard time limit.  Defaults to the :setting:`CELERY_TASK_TIME_LIMIT`
+        setting.
+
+    .. attribute:: soft_time_limit
+
+        Soft time limit.  Defaults to the
+        :setting:`CELERY_TASK_SOFT_TIME_LIMIT` setting.
+
     """
     __metaclass__ = TaskType
 
@@ -245,6 +255,8 @@ class Task(object):
     backend = default_backend
     track_started = conf.TRACK_STARTED
     acks_late = conf.ACKS_LATE
+    time_limit = None
+    soft_time_limit = None
 
     MaxRetriesExceededError = MaxRetriesExceededError
 

+ 10 - 8
celery/worker/job.py

@@ -378,7 +378,9 @@ class TaskRequest(object):
                                   accept_callback=self.on_accepted,
                                   timeout_callback=self.on_timeout,
                                   callbacks=[self.on_success],
-                                  errbacks=[self.on_failure])
+                                  errbacks=[self.on_failure],
+                                  soft_timeout=self.task.soft_time_limit,
+                                  timeout=self.task.time_limit)
         return result
 
     def on_accepted(self):
@@ -389,16 +391,16 @@ class TaskRequest(object):
         self.logger.debug("Task accepted: %s[%s]" % (
             self.task_name, self.task_id))
 
-    def on_timeout(self, soft):
+    def on_timeout(self, soft, timeout):
         state.task_ready(self)
         if soft:
-            self.logger.warning("Soft time limit exceeded for %s[%s]" % (
-                self.task_name, self.task_id))
-            exc = SoftTimeLimitExceeded()
+            self.logger.warning("Soft time limit (%s) exceeded for %s[%s]" % (
+                timeout, self.task_name, self.task_id))
+            exc = SoftTimeLimitExceeded(timeout)
         else:
-            self.logger.error("Hard time limit exceeded for %s[%s]" % (
-                self.task_name, self.task_id))
-            exc = TimeLimitExceeded()
+            self.logger.error("Hard time limit (%s) exceeded for %s[%s]" % (
+                timeout, self.task_name, self.task_id))
+            exc = TimeLimitExceeded(timeout)
 
         self.task.backend.mark_as_failure(self.task_id, exc)