瀏覽代碼

Now runs on Jython using -P threads

Ask Solem 14 年之前
父節點
當前提交
e2f8a5cead

+ 21 - 8
celery/apps/worker.py

@@ -1,6 +1,9 @@
 import atexit
 import logging
-import multiprocessing
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
 import os
 import socket
 import sys
@@ -36,6 +39,11 @@ EXTRA_INFO_FMT = """
 %(tasks)s
 """
 
+def cpu_count():
+    if multiprocessing is not None:
+        return multiprocessing.cpu_count()
+    return 2
+
 
 class Worker(object):
     WorkController = WorkController
@@ -50,7 +58,7 @@ class Worker(object):
         self.app = app = app_or_default(app)
         self.concurrency = (concurrency or
                             app.conf.CELERYD_CONCURRENCY or
-                            multiprocessing.cpu_count())
+                            cpu_count())
         self.loglevel = loglevel or app.conf.CELERYD_LOG_LEVEL
         self.logfile = logfile or app.conf.CELERYD_LOG_FILE
 
@@ -263,8 +271,10 @@ class Worker(object):
 def install_worker_int_handler(worker):
 
     def _stop(signum, frame):
-        process_name = multiprocessing.current_process().name
-        if process_name == "MainProcess":
+        process_name = None
+        if multiprocessing:
+            process_name = multiprocessing.current_process().name
+        if not process_name or process_name == "MainProcess":
             worker.logger.warn(
                 "celeryd: Hitting Ctrl+C again will terminate "
                 "all running tasks!")
@@ -280,8 +290,10 @@ def install_worker_int_handler(worker):
 def install_worker_int_again_handler(worker):
 
     def _stop(signum, frame):
-        process_name = multiprocessing.current_process().name
-        if process_name == "MainProcess":
+        process_name = None
+        if multiprocessing:
+            process_name = multiprocessing.current_process().name
+        if not process_name or process_name == "MainProcess":
             worker.logger.warn("celeryd: Cold shutdown (%s)" % (
                 process_name))
             worker.terminate(in_sighandler=True)
@@ -293,8 +305,9 @@ def install_worker_int_again_handler(worker):
 def install_worker_term_handler(worker):
 
     def _stop(signum, frame):
-        process_name = multiprocessing.current_process().name
-        if process_name == "MainProcess":
+        if multiprocessing:
+            process_name = multiprocessing.current_process().name
+        if not process_name or process_name == "MainProcess":
             worker.logger.warn("celeryd: Warm shutdown (%s)" % (
                 process_name))
             worker.stop(in_sighandler=True)

+ 20 - 14
celery/beat.py

@@ -8,7 +8,10 @@ import shelve
 import sys
 import threading
 import traceback
-import multiprocessing
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
 
 from datetime import datetime
 
@@ -393,21 +396,24 @@ class _Threaded(threading.Thread):
         self.service.stop(wait=True)
 
 
-class _Process(multiprocessing.Process):
-    """Embedded task scheduler using multiprocessing."""
+if multiprocessing is not None:
+    class _Process(multiprocessing.Process):
+        """Embedded task scheduler using multiprocessing."""
 
-    def __init__(self, *args, **kwargs):
-        super(_Process, self).__init__()
-        self.service = Service(*args, **kwargs)
-        self.name = "Beat"
+        def __init__(self, *args, **kwargs):
+            super(_Process, self).__init__()
+            self.service = Service(*args, **kwargs)
+            self.name = "Beat"
 
-    def run(self):
-        platforms.reset_signal("SIGTERM")
-        self.service.start(embedded_process=True)
+        def run(self):
+            platforms.reset_signal("SIGTERM")
+            self.service.start(embedded_process=True)
 
-    def stop(self):
-        self.service.stop()
-        self.terminate()
+        def stop(self):
+            self.service.stop()
+            self.terminate()
+else:
+    _Process = None
 
 
 def EmbeddedService(*args, **kwargs):
@@ -417,7 +423,7 @@ def EmbeddedService(*args, **kwargs):
         Default is :const:`False`.
 
     """
-    if kwargs.pop("thread", False):
+    if kwargs.pop("thread", False) or _Process is None:
         # Need short max interval to be able to stop thread
         # in reasonable time.
         kwargs.setdefault("max_interval", 1)

+ 6 - 2
celery/bin/celeryd.py

@@ -72,7 +72,11 @@
 
 """
 import sys
-import multiprocessing
+
+try:
+    from multiprocessing import freeze_support
+except ImportError:
+    freeze_support = lambda: True
 
 from celery.bin.base import Command, Option
 
@@ -177,7 +181,7 @@ class WorkerCommand(Command):
 
 
 def main():
-    multiprocessing.freeze_support()
+    freeze_support()
     worker = WorkerCommand()
     worker.execute_from_commandline()
 

+ 1 - 0
celery/concurrency/__init__.py

@@ -4,6 +4,7 @@ ALIASES = {
     "processes": "celery.concurrency.processes.TaskPool",
     "eventlet": "celery.concurrency.evlet.TaskPool",
     "gevent": "celery.concurrency.evg.TaskPool",
+    "threads": "celery.concurrency.threads.TaskPool",
 }
 
 

+ 16 - 10
celery/log.py

@@ -4,8 +4,11 @@ import threading
 import sys
 import traceback
 
-from multiprocessing import current_process
-from multiprocessing import util as mputil
+try:
+    from multiprocessing import current_process
+    from multiprocessing import util as mputil
+except ImportError:
+    current_process = mputil = None
 
 from celery import signals
 from celery import current_app
@@ -42,7 +45,8 @@ class ColorFormatter(logging.Formatter):
         # by foreign logger instances.
         # (processName is always supported by Python 2.7)
         if "processName" not in record.__dict__:
-            record.__dict__["processName"] = current_process()._name
+            process_name = current_process and current_process()._name or ""
+            record.__dict__["processName"] = process_name
         t = logging.Formatter.format(self, record)
         if isinstance(t, unicode):
             return t.encode("utf-8", "replace")
@@ -89,10 +93,11 @@ class Logging(object):
         if colorize is None:
             colorize = self.supports_color(logfile)
 
-        try:
-            mputil._logger = None
-        except AttributeError:
-            pass
+        if mputil:
+            try:
+                mputil._logger = None
+            except AttributeError:
+                pass
         ensure_process_aware_logger()
         receivers = signals.setup_logging.send(sender=None,
                                                loglevel=loglevel,
@@ -105,10 +110,11 @@ class Logging(object):
             if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
                 root.handlers = []
 
-            mp = mputil.get_logger()
+            mp = mputil and mputil.get_logger() or None
             for logger in (root, mp):
-                self._setup_logger(logger, logfile, format, colorize, **kwargs)
-                logger.setLevel(loglevel)
+                if logger:
+                    self._setup_logger(logger, logfile, format, colorize, **kwargs)
+                    logger.setLevel(loglevel)
         Logging._setup = True
         return receivers
 

+ 4 - 1
celery/platforms.py

@@ -351,7 +351,10 @@ def set_mp_process_title(progname, info=None, hostname=None):
     Only works if :mod:`setproctitle` is installed.
 
     """
-    from multiprocessing.process import current_process
+    try:
+        from multiprocessing.process import current_process
+    except ImportError:
+        return
     if hostname:
         progname = "%s@%s" % (progname, hostname.split(".")[0])
     return set_process_title("%s:%s" % (progname, current_process().name),

+ 8 - 2
celery/utils/compat.py

@@ -319,7 +319,10 @@ except ImportError:
 ############## logging.LoggerAdapter ########################################
 import inspect
 import logging
-import multiprocessing
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
 import sys
 
 from logging import LogRecord
@@ -392,7 +395,10 @@ class _CompatLoggerAdapter(object):
                     raise KeyError(
                             "Attempt to override %r in LogRecord" % key)
                 rv.__dict__[key] = value
-        rv.processName = multiprocessing.current_process()._name
+        if multiprocessing is not None:
+            rv.processName = multiprocessing.current_process()._name
+        else:
+            rv.processName = ""
         return rv
 
     def _log(self, level, msg, args, exc_info=None, extra=None):

+ 9 - 2
celery/utils/patch.py

@@ -6,7 +6,11 @@ _process_aware = False
 def _patch_logger_class():
     """Make sure process name is recorded when loggers are used."""
 
-    from multiprocessing.process import current_process
+    try:
+        from multiprocessing.process import current_process
+    except ImportError:
+        current_process = None
+
     logging._acquireLock()
     try:
         OldLoggerClass = logging.getLoggerClass()
@@ -17,7 +21,10 @@ def _patch_logger_class():
 
                 def makeRecord(self, *args, **kwds):
                     record = OldLoggerClass.makeRecord(self, *args, **kwds)
-                    record.processName = current_process()._name
+                    if current_process:
+                        record.processName = current_process()._name
+                    else:
+                        record.processName = ""
                     return record
             logging.setLoggerClass(ProcessAwareLogger)
     finally:

+ 2 - 1
celery/worker/__init__.py

@@ -1,7 +1,8 @@
 import socket
 import logging
 import traceback
-from multiprocessing.util import Finalize
+
+from kombu.utils.finalize import Finalize
 
 from celery import beat
 from celery import concurrency as _concurrency

+ 1 - 1
celery/worker/heartbeat.py

@@ -62,4 +62,4 @@ class Heart(threading.Thread):
         self._state = "CLOSE"
         self._shutdown.set()
         if self.isAlive():
-            self.join(1e100)
+            self.join(1e10)