Browse Source

Merge branch '3.0'

Ask Solem 12 years ago
parent
commit
303babc41e
2 changed files with 25 additions and 8 deletions
  1. 25 3
      celery/concurrency/gevent.py
  2. 0 5
      requirements/test.txt

+ 25 - 3
celery/concurrency/gevent.py

@@ -9,6 +9,7 @@
 from __future__ import absolute_import
 
 import os
+import sys
 
 PATCHED = [0]
 if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
@@ -22,14 +23,31 @@ if not os.environ.get('GEVENT_NOPATCH') and not PATCHED[0]:
         _signal = __import__('signal')
         _signal.signal = _gevent_signal
 
+try:
+    from gevent import Timeout
+except ImportError:
+    Timeout = None  # noqa
 
 from time import time
 
+from billiard.einfo import ExceptionInfo
+from billiard.exceptions import TimeLimitExceeded
 from celery.utils import timer2
 
 from .base import apply_target, BasePool
 
 
+def apply_timeout(target, args=(), kwargs={}, callback=None,
+                  accept_callback=None, pid=None, timeout=None,
+                  timeout_callback=None, **rest):
+    try:
+        with Timeout(timeout):
+            return apply_target(target, args, kwargs, callback,
+                                accept_callback, pid, **rest)
+    except Timeout:
+        return timeout_callback(False, timeout)
+
+
 class Schedule(timer2.Schedule):
 
     def __init__(self, *args, **kwargs):
@@ -100,6 +118,7 @@ class TaskPool(BasePool):
         from gevent.pool import Pool
         self.Pool = Pool
         self.spawn_n = spawn_raw
+        self.timeout = kwargs.get('timeout')
         super(TaskPool, self).__init__(*args, **kwargs)
 
     def on_start(self):
@@ -111,9 +130,12 @@ class TaskPool(BasePool):
             self._pool.join()
 
     def on_apply(self, target, args=None, kwargs=None, callback=None,
-            accept_callback=None, **_):
-        return self._quick_put(apply_target, target, args, kwargs,
-                               callback, accept_callback)
+            accept_callback=None, timeout=None, timeout_callback=None, **_):
+        timeout = self.timeout if timeout is None else timeout
+        return self._quick_put(apply_timeout if timeout else apply_target,
+                               target, args, kwargs, callback, accept_callback,
+                               timeout=timeout,
+                               timeout_callback=timeout_callback)
 
     def grow(self, n=1):
         self._pool._semaphore.counter += n

+ 0 - 5
requirements/test.txt

@@ -1,9 +1,4 @@
 unittest2>=0.4.0
 nose
 nose-cover3
-coverage>=3.0
 mock
-redis
-pymongo
-SQLAlchemy
-PyOpenSSL