瀏覽代碼

Merge branch 'timelimits'

Ask Solem 15 年之前
父節點
當前提交
d5658dd835
共有 6 個文件被更改,包括 51 次插入7 次删除
  1. 15 1
      celery/bin/celeryd.py
  2. 2 0
      celery/conf.py
  3. 6 0
      celery/exceptions.py
  4. 8 2
      celery/worker/__init__.py
  5. 9 0
      celery/worker/job.py
  6. 11 4
      celery/worker/pool.py

+ 15 - 1
celery/bin/celeryd.py

@@ -110,6 +110,14 @@ OPTION_LIST = (
     optparse.make_option('-E', '--events', default=conf.SEND_EVENTS,
             action="store_true", dest="events",
             help="Send events so celery can be monitored by e.g. celerymon."),
+    optparse.make_option('--time-limit',
+            default=conf.CELERYD_TASK_TIME_LIMIT,
+            action="store", type="int", dest="task_time_limit",
+            help="Enables a hard time limit (in seconds) for task run times."),
+    optparse.make_option('--soft-time-limit',
+            default=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
+            action="store", type="int", dest="task_soft_time_limit",
+            help="Enables a soft time limit (in seconds) for task run times.")
 )
 
 
@@ -119,6 +127,8 @@ class Worker(object):
             loglevel=conf.CELERYD_LOG_LEVEL, logfile=conf.CELERYD_LOG_FILE,
             hostname=None, discard=False, run_clockservice=False,
             schedule=conf.CELERYBEAT_SCHEDULE_FILENAME,
+            task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
+            task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT,
             events=False, **kwargs):
         self.concurrency = concurrency or multiprocessing.cpu_count()
         self.loglevel = loglevel
@@ -128,6 +138,8 @@ class Worker(object):
         self.run_clockservice = run_clockservice
         self.schedule = schedule
         self.events = events
+        self.task_time_limit = task_time_limit
+        self.task_soft_time_limit = task_soft_time_limit
         if not isinstance(self.loglevel, int):
             self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
 
@@ -215,7 +227,9 @@ class Worker(object):
                                 ready_callback=self.on_listener_ready,
                                 embed_clockservice=self.run_clockservice,
                                 schedule_filename=self.schedule,
-                                send_events=self.events)
+                                send_events=self.events,
+                                task_time_limit=self.task_time_limit,
+                                task_soft_time_limit=self.task_soft_time_limit)
 
         # Install signal handler so SIGHUP restarts the worker.
         install_worker_restart_handler(worker)

+ 2 - 0
celery/conf.py

@@ -118,6 +118,8 @@ MAX_CACHED_RESULTS = _get("CELERY_MAX_CACHED_RESULTS")
 SEND_EVENTS = _get("CELERY_SEND_EVENTS")
 DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
 DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
+CELERYD_TASK_TIME_LIMIT = _get("CELERYD_TASK_TIME_LIMIT")
+CELERYD_TASK_SOFT_TIME_LIMIT = _get("CELERYD_TASK_SOFT_TIME_LIMIT")
 STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
 CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS",
                                      not settings.DEBUG,

+ 6 - 0
celery/exceptions.py

@@ -3,12 +3,18 @@
 Common Exceptions
 
 """
+from billiard.pool import SoftTimeLimitExceeded as _SoftTimeLimitExceeded
 
 UNREGISTERED_FMT = """
 Task of kind %s is not registered, please make sure it's imported.
 """.strip()
 
 
+class SoftTimeLimitExceeded(_SoftTimeLimitExceeded):
+    """The soft time limit has been exceeded. This exception is raised
+    to give the task a chance to clean up."""
+
+
 class ImproperlyConfigured(Exception):
     """Celery is somehow improperly configured."""
 

+ 8 - 2
celery/worker/__init__.py

@@ -107,7 +107,9 @@ class WorkController(object):
             pool_cls=conf.CELERYD_POOL, listener_cls=conf.CELERYD_LISTENER,
             mediator_cls=conf.CELERYD_MEDIATOR,
             eta_scheduler_cls=conf.CELERYD_ETA_SCHEDULER,
-            schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
+            schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
+            task_time_limit=conf.CELERYD_TASK_TIME_LIMIT,
+            task_soft_time_limit=conf.CELERYD_TASK_SOFT_TIME_LIMIT):
 
         # Options
         self.loglevel = loglevel or self.loglevel
@@ -118,6 +120,8 @@ class WorkController(object):
         self.embed_clockservice = embed_clockservice
         self.ready_callback = ready_callback
         self.send_events = send_events
+        self.task_time_limit = task_time_limit
+        self.task_soft_time_limit = task_soft_time_limit
         self._finalize = Finalize(self, self.stop, exitpriority=20)
 
         # Queues
@@ -132,7 +136,9 @@ class WorkController(object):
         # Threads + Pool + Consumer
         self.pool = instantiate(pool_cls, self.concurrency,
                                 logger=self.logger,
-                                initializer=process_initializer)
+                                initializer=process_initializer,
+                                timeout=self.task_time_limit,
+                                soft_timeout=self.task_soft_time_limit)
         self.mediator = instantiate(mediator_cls, self.ready_queue,
                                     callback=self.process_task,
                                     logger=self.logger)

+ 9 - 0
celery/worker/job.py

@@ -322,6 +322,7 @@ class TaskWrapper(object):
         self.time_start = time.time()
         result = pool.apply_async(execute_and_trace, args=args,
                     accept_callback=self.on_accepted,
+                    timeout_callback=self.on_timeout,
                     callbacks=[self.on_success], errbacks=[self.on_failure])
         return result
 
@@ -332,6 +333,14 @@ class TaskWrapper(object):
         self.logger.debug("Task accepted: %s[%s]" % (
             self.task_name, self.task_id))
 
+    def on_timeout(self, soft):
+        if soft:
+            self.logger.warning("Soft time limit exceeded for %s[%s]" % (
+                self.task_name, self.task_id))
+        else:
+            self.logger.error("Hard time limit exceeded for %s[%s]" % (
+                self.task_name, self.task_id))
+
     def acknowledge(self):
         if not self.acknowledged:
             self.on_ack()

+ 11 - 4
celery/worker/pool.py

@@ -27,10 +27,13 @@ class TaskPool(object):
 
     """
 
-    def __init__(self, limit, logger=None, initializer=None):
+    def __init__(self, limit, logger=None, initializer=None,
+            timeout=None, soft_timeout=None):
         self.limit = limit
         self.logger = logger or log.get_default_logger()
         self.initializer = initializer
+        self.timeout = timeout
+        self.soft_timeout = soft_timeout
         self._pool = None
 
     def start(self):
@@ -40,7 +43,9 @@ class TaskPool(object):
 
         """
         self._pool = DynamicPool(processes=self.limit,
-                                 initializer=self.initializer)
+                                 initializer=self.initializer,
+                                 timeout=self.timeout,
+                                 soft_timeout=self.soft_timeout)
 
     def stop(self):
         """Terminate the pool."""
@@ -57,7 +62,8 @@ class TaskPool(object):
                     dead_count))
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, accept_callback=None, **compat):
+            errbacks=None, accept_callback=None, timeout_callback=None,
+            **compat):
         """Equivalent of the :func:``apply`` built-in function.
 
         All ``callbacks`` and ``errbacks`` should complete immediately since
@@ -78,7 +84,8 @@ class TaskPool(object):
 
         return self._pool.apply_async(target, args, kwargs,
                                       callback=on_ready,
-                                      accept_callback=accept_callback)
+                                      accept_callback=accept_callback,
+                                      timeout_callback=timeout_callback)
 
     def on_ready(self, callbacks, errbacks, ret_value):
         """What to do when a worker task is ready and its return value has