Browse Source

Added support for task soft and hard timelimits. Requires billiard timelimits branch.

New settings added
------------------

    * CELERYD_TASK_TIME_LIMIT
        Hard time limit. The worker processing the task will be killed and
        replaced with a new one when this is exceeded.
    * CELERYD_SOFT_TASK_TIME_LIMIT
        Soft time limit. The celery.exceptions.SoftTimeLimitExceeded exception
        will be raised when this is exceeded. The task can catch this to
        e.g. clean up before the hard time limit comes.

New command line arguments to celeryd added
-------------------------------------------
--time-limit and --soft-time-limit.

What's left?
-----------

This won't work on platforms not supporting signals (and specifically
the SIGUSR1 signal) yet. So an alternative the ability to disable
the feature alltogether on nonconforming platforms must be implemented.

Also when the hard time limit is exceeded, the task result should
be a TimeLimitExceeded exception.
Ask Solem 15 years ago
parent
commit
115ce982aa
6 changed files with 51 additions and 7 deletions
  1. 15 1
      celery/bin/celeryd.py
  2. 2 0
      celery/conf.py
  3. 6 0
      celery/exceptions.py
  4. 8 2
      celery/worker/__init__.py
  5. 9 0
      celery/worker/job.py
  6. 11 4
      celery/worker/pool.py

+ 15 - 1
celery/bin/celeryd.py

@@ -110,6 +110,14 @@ OPTION_LIST = (
     optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
             action="store_true", dest="events",
             help="Send events so celery can be monitored by e.g. celerymon."),
+    optparse.make_option('--time-limit',
+            default=conf.CELERYD_TASK_TIME_LIMIT,
+            action="store", type="int", dest="task_time_limit",
+            help="Enables a hard time limit (in seconds) for task run times."),
+    optparse.make_option('--soft-time-limit',
+            default=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
+            action="store", type="int", dest="task_soft_time_limit",
+            help="Enables a soft time limit (in seconds) for task run times.")
 )
 
 
@@ -119,6 +127,8 @@ class Worker(object):
             loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
             hostname=None, discard=False, run_clockservice=False,
             schedule=conf.CELERYBEAT_SCHEDULE_FILENAME,
+            task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
+            task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
             events=False, **kwargs):
         self.concurrency = concurrency or multiprocessing.cpu_count()
         self.loglevel = loglevel
@@ -128,6 +138,8 @@ class Worker(object):
         self.run_clockservice = run_clockservice
         self.schedule = schedule
         self.events = events
+        self.task_time_limit = task_time_limit
+        self.task_soft_time_limit = task_soft_time_limit
         if not isinstance(self.loglevel, int):
             self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
 
@@ -215,7 +227,9 @@ class Worker(object):
                                 ready_callback=self.on_listener_ready,
                                 embed_clockservice=self.run_clockservice,
                                 schedule_filename=self.schedule,
-                                send_events=self.events)
+                                send_events=self.events,
+                                task_time_limit=self.task_time_limit,
+                                task_soft_time_limit=self.task_soft_time_limit)
 
         # Install signal handler so SIGHUP restarts the worker.
         install_worker_restart_handler(worker)

+ 2 - 0
celery/conf.py

@@ -114,6 +114,8 @@ MAX_CACHED_RESULTS = _get("CELERY_MAX_CACHED_RESULTS")
 SEND_EVENTS = _get("CELERY_SEND_EVENTS")
 DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
 DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
+CELERYD_TASK_TIME_LIMIT = _get("CELERYD_TASK_TIME_LIMIT")
+CELERYD_TASK_SOFT_TIME_LIMIT = _get("CELERYD_TASK_SOFT_TIME_LIMIT")
 STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
 CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS",
                                      not settings.DEBUG,

+ 6 - 0
celery/exceptions.py

@@ -3,12 +3,18 @@
 Common Exceptions
 
 """
+from billiard.pool import SoftTimeLimitExceeded as _SoftTimeLimitExceeded
 
 UNREGISTERED_FMT = """
 Task of kind %s is not registered, please make sure it's imported.
 """.strip()
 
 
+class SoftTimeLimitExceeded(_SoftTimeLimitExceeded):
+    """The soft time limit has been exceeded. This exception is raised
+    to give the task a chance to clean up."""
+
+
 class ImproperlyConfigured(Exception):
     """Celery is somehow improperly configured."""
 

+ 8 - 2
celery/worker/__init__.py

@@ -107,7 +107,9 @@ class WorkController(object):
             pool_cls=conf.CELERYD_POOL, listener_cls=conf.CELERYD_LISTENER,
             mediator_cls=conf.CELERYD_MEDIATOR,
             eta_scheduler_cls=conf.CELERYD_ETA_SCHEDULER,
-            schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
+            schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
+            task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
+            task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT):
 
         # Options
         self.loglevel = loglevel or self.loglevel
@@ -118,6 +120,8 @@ class WorkController(object):
         self.embed_clockservice = embed_clockservice
         self.ready_callback = ready_callback
         self.send_events = send_events
+        self.task_time_limit = task_time_limit
+        self.task_soft_time_limit = task_soft_time_limit
         self._finalize = Finalize(self, self.stop, exitpriority=20)
 
         # Queues
@@ -132,7 +136,9 @@ class WorkController(object):
         # Threads + Pool + Consumer
         self.pool = instantiate(pool_cls, self.concurrency,
                                 logger=self.logger,
-                                initializer=process_initializer)
+                                initializer=process_initializer,
+                                timeout=self.task_time_limit,
+                                soft_timeout=self.task_soft_time_limit)
         self.mediator = instantiate(mediator_cls, self.ready_queue,
                                     callback=self.process_task,
                                     logger=self.logger)

+ 9 - 0
celery/worker/job.py

@@ -322,6 +322,7 @@ class TaskWrapper(object):
         self.time_start = time.time()
         result = pool.apply_async(execute_and_trace, args=args,
                     accept_callback=self.on_accepted,
+                    timeout_callback=self.on_timeout,
                     callbacks=[self.on_success], errbacks=[self.on_failure])
         return result
 
@@ -332,6 +333,14 @@ class TaskWrapper(object):
         self.logger.debug("Task accepted: %s[%s]" % (
             self.task_name, self.task_id))
 
+    def on_timeout(self, soft):
+        if soft:
+            self.logger.warning("Soft time limit exceeded for %s[%s]" % (
+                self.task_name, self.task_id))
+        else:
+            self.logger.error("Hard time limit exceeded for %s[%s]" % (
+                self.task_name, self.task_id))
+
     def acknowledge(self):
         if not self.acknowledged:
             self.on_ack()

+ 11 - 4
celery/worker/pool.py

@@ -27,10 +27,13 @@ class TaskPool(object):
 
     """
 
-    def __init__(self, limit, logger=None, initializer=None):
+    def __init__(self, limit, logger=None, initializer=None,
+            timeout=None, soft_timeout=None):
         self.limit = limit
         self.logger = logger or log.get_default_logger()
         self.initializer = initializer
+        self.timeout = timeout
+        self.soft_timeout = soft_timeout
         self._pool = None
 
     def start(self):
@@ -40,7 +43,9 @@ class TaskPool(object):
 
         """
         self._pool = DynamicPool(processes=self.limit,
-                                 initializer=self.initializer)
+                                 initializer=self.initializer,
+                                 timeout=self.timeout,
+                                 soft_timeout=self.soft_timeout)
 
     def stop(self):
         """Terminate the pool."""
@@ -57,7 +62,8 @@ class TaskPool(object):
                     dead_count))
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, accept_callback=None, **compat):
+            errbacks=None, accept_callback=None, timeout_callback=None,
+            **compat):
         """Equivalent of the :func:``apply`` built-in function.
 
         All ``callbacks`` and ``errbacks`` should complete immediately since
@@ -78,7 +84,8 @@ class TaskPool(object):
 
         return self._pool.apply_async(target, args, kwargs,
                                       callback=on_ready,
-                                      accept_callback=accept_callback)
+                                      accept_callback=accept_callback,
+                                      timeout_callback=timeout_callback)
 
     def on_ready(self, callbacks, errbacks, ret_value):
         """What to do when a worker task is ready and its return value has