Explorar o código

Use kombu.utils.blocking

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
01014309ed

+ 0 - 3
celery/concurrency/base.py

@@ -113,9 +113,6 @@ class BasePool(object):
                 traceback.format_exc(), ),
                 exc_info=sys.exc_info())
 
-    def blocking(self, fun, *args, **kwargs):
-        return fun(*args, **kwargs)
-
     def _get_info(self):
         return {}
 

+ 0 - 4
celery/concurrency/evg.py

@@ -5,7 +5,6 @@ from gevent import monkey
 if not os.environ.get("GEVENT_NOPATCH"):
     monkey.patch_all()
 
-from gevent import Greenlet
 from gevent.pool import Pool
 
 from celery.concurrency.base import apply_target, BasePool
@@ -27,6 +26,3 @@ class TaskPool(BasePool):
             accept_callback=None, **_):
         return self._pool.spawn(apply_target, target, args, kwargs,
                                 callback, accept_callback)
-
-    def blocking(self, fun, *args, **kwargs):
-        Greenlet.spawn(fun, *args, **kwargs).get()

+ 1 - 4
celery/concurrency/evlet.py

@@ -11,7 +11,7 @@ if not os.environ.get("EVENTLET_NOPATCH"):
     eventlet.debug.hub_prevent_multiple_readers(False)
 
 from eventlet import GreenPool
-from eventlet.greenthread import getcurrent, spawn, spawn_after_local
+from eventlet.greenthread import getcurrent, spawn_after_local
 from greenlet import GreenletExit
 
 from celery.concurrency import base
@@ -105,6 +105,3 @@ class TaskPool(base.BasePool):
             accept_callback=None, **_):
         self._pool.spawn(apply_target, target, args, kwargs,
                          callback, accept_callback)
-
-    def blocking(self, fun, *args, **kwargs):
-        return spawn(fun, *args, **kwargs).wait()

+ 4 - 3
celery/worker/__init__.py

@@ -2,6 +2,7 @@ import socket
 import logging
 import traceback
 
+from kombu.utils import blocking
 from kombu.utils.finalize import Finalize
 
 from celery import beat
@@ -251,7 +252,7 @@ class WorkController(object):
                 self.logger.debug("Starting thread %s..." % (
                                         component.__class__.__name__))
                 self._running = i + 1
-                self.pool.blocking(component.start)
+                blocking(component.start)
         except SystemTerminate:
             self.terminate()
             raise SystemExit()
@@ -278,13 +279,13 @@ class WorkController(object):
         """Graceful shutdown of the worker server."""
         if in_sighandler and not self.pool.signal_safe:
             return
-        self.pool.blocking(self._shutdown, warm=True)
+        blocking(self._shutdown, warm=True)
 
     def terminate(self, in_sighandler=False):
         """Not so graceful shutdown of the worker server."""
         if in_sighandler and not self.pool.signal_safe:
             return
-        self.pool.blocking(self._shutdown, warm=False)
+        blocking(self._shutdown, warm=False)
 
     def _shutdown(self, warm=True):
         what = (warm and "stopping" or "terminating").capitalize()