Browse Source

Various eventlet related fixes (shutdown) and added a -P|--pool argument to celeryd to select pool implementation on the commandline

Ask Solem 14 years ago
parent
commit
b0345dc457

+ 5 - 5
celery/apps/worker.py

@@ -43,7 +43,7 @@ class Worker(object):
             max_tasks_per_child=None, queues=None, events=False, db=None,
             include=None, app=None, pidfile=None,
             redirect_stdouts=None, redirect_stdouts_level=None,
-            autoscale=None, scheduler_cls=None, **kwargs):
+            autoscale=None, scheduler_cls=None, pool=None, **kwargs):
         self.app = app = app_or_default(app)
         self.concurrency = (concurrency or
                             app.conf.CELERYD_CONCURRENCY or
@@ -68,6 +68,7 @@ class Worker(object):
                                  app.conf.CELERY_REDIRECT_STDOUTS)
         self.redirect_stdouts_level = (redirect_stdouts_level or
                                        app.conf.CELERY_REDIRECT_STDOUTS_LEVEL)
+        self.pool = (pool or app.conf.CELERYD_POOL)
         self.db = db
         self.use_queues = queues or []
         self.queues = None
@@ -207,7 +208,8 @@ class Worker(object):
                                 max_tasks_per_child=self.max_tasks_per_child,
                                 task_time_limit=self.task_time_limit,
                                 task_soft_time_limit=self.task_soft_time_limit,
-                                autoscale=self.autoscale)
+                                autoscale=self.autoscale,
+                                pool_cls=self.pool)
         self.install_platform_tweaks(worker)
         worker.start()
 
@@ -257,7 +259,6 @@ def install_worker_int_handler(worker):
             install_worker_int_again_handler(worker)
             worker.logger.warn("celeryd: Warm shutdown (%s)" % (
                 process_name))
-            worker.stop()
         raise SystemExit()
 
     platforms.install_signal_handler("SIGINT", _stop)
@@ -270,8 +271,7 @@ def install_worker_int_again_handler(worker):
         if process_name == "MainProcess":
             worker.logger.warn("celeryd: Cold shutdown (%s)" % (
                 process_name))
-            worker.terminate()
-        raise SystemExit()
+        raise SystemTerminate()
 
     platforms.install_signal_handler("SIGINT", _stop)
 

+ 9 - 0
celery/bin/celeryd.py

@@ -83,6 +83,11 @@ class WorkerCommand(Command):
 
     def run(self, *args, **kwargs):
         kwargs.pop("app", None)
+        # Pools like eventlet/gevent needs to patch libs as early
+        # as possible.
+        from celery import concurrency
+        kwargs["pool"] = concurrency.get_implementation(
+                    kwargs.get("pool") or self.app.conf.CELERYD_POOL)
         return self.app.Worker(**kwargs).run()
 
     def get_options(self):
@@ -92,6 +97,10 @@ class WorkerCommand(Command):
                 default=conf.CELERYD_CONCURRENCY,
                 action="store", dest="concurrency", type="int",
                 help="Number of child processes processing the queue."),
+            Option('-P', '--pool',
+                default=conf.CELERYD_POOL,
+                action="store", dest="pool", type="str",
+                help="Pool implementation: processes|eventlet|gevent"),
             Option('--purge', '--discard', default=False,
                 action="store_true", dest="discard",
                 help="Discard all waiting tasks before the server is"

+ 4 - 0
celery/exceptions.py

@@ -9,6 +9,10 @@ Task of kind %s is not registered, please make sure it's imported.
 """.strip()
 
 
+class SystemTerminate(SystemExit):
+    pass
+
+
 class QueueNotFound(KeyError):
     """Task routed to a queue not in CELERY_QUEUES."""
     pass

+ 12 - 20
celery/worker/__init__.py

@@ -9,6 +9,7 @@ from celery import registry
 from celery import platforms
 from celery import signals
 from celery.app import app_or_default
+from celery.exceptions import SystemTerminate
 from celery.log import SilenceRepeated
 from celery.utils import noop, instantiate
 
@@ -192,23 +193,10 @@ class WorkController(object):
                                         callback=self.process_task,
                                         logger=self.logger)
 
-        class DummyTimer(object):
-
-            def start(self):
-                pass
-
-            def stop(self):
-                pass
-
-            def clear(self):
-                pass
-
-
-        #self.scheduler = instantiate(self.eta_scheduler_cls,
-        #                        precision=eta_scheduler_precision,
-        #                        on_error=self.on_timer_error,
-        #                        on_tick=self.on_timer_tick)
-        self.scheduler = DummyTimer()
+        self.scheduler = instantiate(self.eta_scheduler_cls,
+                                precision=eta_scheduler_precision,
+                                on_error=self.on_timer_error,
+                                on_tick=self.on_timer_tick)
 
         self.beat = None
         if self.embed_clockservice:
@@ -248,7 +236,7 @@ class WorkController(object):
             self.logger.debug("Starting thread %s..." % (
                                     component.__class__.__name__))
             self._running = i + 1
-            component.start()
+            self.pool.blocking(component.start)
 
     def process_task(self, wrapper):
         """Process task by sending it to the pool of workers."""
@@ -259,16 +247,20 @@ class WorkController(object):
             except Exception, exc:
                 self.logger.critical("Internal error %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))
+        except SystemTerminate:
+            self.terminate()
+            raise SystemExit()
         except (SystemExit, KeyboardInterrupt):
             self.stop()
+            raise SystemExit()
 
     def stop(self):
         """Graceful shutdown of the worker server."""
-        self._shutdown(warm=True)
+        self.pool.blocking(self._shutdown, warm=True)
 
     def terminate(self):
         """Not so graceful shutdown of the worker server."""
-        self._shutdown(warm=False)
+        self.pool.blocking(self._shutdown, warm=False)
 
     def _shutdown(self, warm=True):
         what = (warm and "stopping" or "terminating").capitalize()

+ 3 - 1
celery/worker/consumer.py

@@ -249,6 +249,8 @@ class Consumer(object):
         self.logger.debug("Consumer: Ready to accept tasks!")
 
         while 1:
+            if not self.connection:
+                break
             if self.qos.prev != self.qos.next:
                 self.qos.update()
             self.connection.drain_events()
@@ -436,7 +438,7 @@ class Consumer(object):
 
     def restart_heartbeat(self):
         self.heart = Heart(self.event_dispatcher)
-        #self.heart.start()
+        self.heart.start()
 
     def _mainloop(self):
         while 1:

+ 0 - 1
celery/worker/heartbeat.py

@@ -63,6 +63,5 @@ class Heart(threading.Thread):
             return
         self._state = "CLOSE"
         self._shutdown.set()
-        self._stopped.wait()            # blocks until this thread is done
         if self.isAlive():
             self.join(1e100)