Browse Source

Adds celery.utils.threads

Ask Solem 13 years ago
parent
commit
6640235750
3 changed files with 70 additions and 52 deletions
  1. 59 0
      celery/utils/threads.py
  2. 6 25
      celery/worker/autoscale.py
  3. 5 27
      celery/worker/mediator.py

+ 59 - 0
celery/utils/threads.py

@@ -0,0 +1,59 @@
+import os
+import sys
+import threading
+
+_Thread = threading.Thread
+_Event = threading._Event
+
+
+class Event(_Event):
+
+    if not hasattr(_Event, "is_set"):
+        is_set = _Event.isSet
+
+
+class Thread(_Thread):
+
+    if not hasattr(_Thread, "is_alive"):
+        is_alive = _Thread.isAlive
+
+    if not hasattr(_Thread, "daemon"):
+        daemon = property(_Thread.isDaemon, _Thread.setDaemon)
+
+    if not hasattr(_Thread, "name"):
+        name = property(_Thread.getName, _Thread.setName)
+
+
+class bgThread(Thread):
+
+    def __init__(self, name=None, **kwargs):
+        super(bgThread, self).__init__()
+        self._is_shutdown = Event()
+        self._is_stopped = Event()
+        self.daemon = True
+        self.name = name or self.__class__.__name__
+
+    def next(self):
+        raise NotImplementedError("subclass responsibility")
+    __next__ = next  # 2to3.
+
+    def on_crash(self, msg, *fmt, **kwargs):
+        sys.stderr.write(msg + "\n" % fmt)
+
+    def run(self):
+        shutdown = self._is_shutdown
+        while not shutdown.is_set():
+            try:
+                self.next()
+            except Exception, exc:
+                self.on_crash("%r crashed: %r", self.name, exc, exc_info=True)
+                # exiting by normal means does not work here, so force exit.
+                os._exit(1)
+        self._is_stopped.set()
+
+    def stop(self):
+        """Graceful shutdown."""
+        self._is_shutdown.set()
+        self._is_stopped.wait()
+        if self.is_alive:
+            self.join(1e100)

+ 6 - 25
celery/worker/autoscale.py

@@ -17,7 +17,6 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
-import os
 import sys
 import threading
 import traceback
@@ -25,13 +24,14 @@ import traceback
 from time import sleep, time
 
 from . import state
+from ..utils.threads import bgThread
 
 
-class Autoscaler(threading.Thread):
+class Autoscaler(bgThread):
 
     def __init__(self, pool, max_concurrency, min_concurrency=0,
             keepalive=30, logger=None):
-        threading.Thread.__init__(self)
+        super(Autoscaler, self).__init__()
         self.pool = pool
         self.mutex = threading.Lock()
         self.max_concurrency = max_concurrency
@@ -39,14 +39,10 @@ class Autoscaler(threading.Thread):
         self.keepalive = keepalive
         self.logger = logger
         self._last_action = None
-        self._is_shutdown = threading.Event()
-        self._is_stopped = threading.Event()
-        self.setDaemon(True)
-        self.setName(self.__class__.__name__)
 
         assert self.keepalive, "can't scale down too fast."
 
-    def scale(self):
+    def next(self):
         with self.mutex:
             current = min(self.qty, self.max_concurrency)
             if current > self.processes:
@@ -54,6 +50,8 @@ class Autoscaler(threading.Thread):
             elif current < self.processes:
                 self.scale_down(
                     (self.processes - current) - self.min_concurrency)
+        sleep(1.0)
+    scale = next  # XXX compat
 
     def update(self, max=None, min=None):
         with self.mutex:
@@ -109,23 +107,6 @@ class Autoscaler(threading.Thread):
             self._last_action = time()
             self._shrink(n)
 
-    def run(self):
-        while not self._is_shutdown.isSet():
-            try:
-                self.scale()
-                sleep(1.0)
-            except Exception, exc:
-                self.logger.error("Thread Autoscaler crashed: %r", exc,
-                                  exc_info=sys.exc_info())
-                os._exit(1)
-        self._is_stopped.set()
-
-    def stop(self):
-        self._is_shutdown.set()
-        self._is_stopped.wait()
-        if self.isAlive():
-            self.join(1e10)
-
     def info(self):
         return {"max": self.max_concurrency,
                 "min": self.min_concurrency,

+ 5 - 27
celery/worker/mediator.py

@@ -18,17 +18,15 @@
 """
 from __future__ import absolute_import
 
-import os
-import sys
-import threading
 import traceback
 
 from Queue import Empty
 
 from ..app import app_or_default
+from ..utils.threads import bgThread
 
 
-class Mediator(threading.Thread):
+class Mediator(bgThread):
 
     #: The task queue, a :class:`~Queue.Queue` instance.
     ready_queue = None
@@ -37,17 +35,13 @@ class Mediator(threading.Thread):
     callback = None
 
     def __init__(self, ready_queue, callback, logger=None, app=None):
-        threading.Thread.__init__(self)
         self.app = app_or_default(app)
         self.logger = logger or self.app.log.get_default_logger()
         self.ready_queue = ready_queue
         self.callback = callback
-        self._is_shutdown = threading.Event()
-        self._is_stopped = threading.Event()
-        self.setDaemon(True)
-        self.setName(self.__class__.__name__)
+        super(Mediator, self).__init__()
 
-    def move(self):
+    def next(self):
         try:
             task = self.ready_queue.get(timeout=1.0)
         except Empty:
@@ -69,20 +63,4 @@ class Mediator(threading.Thread):
                               extra={"data": {"id": task.task_id,
                                               "name": task.task_name,
                                               "hostname": task.hostname}})
-
-    def run(self):
-        """Move tasks until :meth:`stop` is called."""
-        while not self._is_shutdown.isSet():
-            try:
-                self.move()
-            except Exception, exc:
-                self.logger.error("Mediator crash: %r", exc, exc_info=True)
-                # exiting by normal means does not work here, so force exit.
-                os._exit(1)
-        self._is_stopped.set()
-
-    def stop(self):
-        """Gracefully shutdown the thread."""
-        self._is_shutdown.set()
-        self._is_stopped.wait()
-        self.join(1e10)
+    move = next   # XXX compat