瀏覽代碼

Now depends on billiard >= 2.7.3.3

Ask Solem 13 年之前
父節點
當前提交
3334afb282

+ 2 - 2
celery/app/__init__.py

@@ -54,13 +54,13 @@ def _app_or_default(app=None):
 
 def _app_or_default_trace(app=None):  # pragma: no cover
     from traceback import print_stack
-    from celery.utils.mp import get_process_name
+    from billiard import current_process
     if app is None:
         if getattr(state._tls, "current_app", None):
             print("-- RETURNING TO CURRENT APP --")  # noqa+
             print_stack()
             return state._tls.current_app
-        if get_process_name() == "MainProcess":
+        if current_process()._name == "MainProcess":
             raise Exception("DEFAULT APP")
         print("-- RETURNING TO DEFAULT APP --")      # noqa+
         print_stack()

+ 10 - 6
celery/apps/worker.py

@@ -8,6 +8,8 @@ import socket
 import sys
 import warnings
 
+from billiard import cpu_count, current_process
+
 from celery import __version__, platforms, signals
 from celery.app import app_or_default
 from celery.app.abstract import configurated, from_config
@@ -15,7 +17,6 @@ from celery.exceptions import ImproperlyConfigured, SystemTerminate
 from celery.utils import cry, isatty
 from celery.utils.imports import qualname
 from celery.utils.log import LOG_LEVELS, get_logger, mlevel
-from celery.utils.mp import cpu_count, get_process_name
 from celery.utils.text import pluralize
 from celery.worker import WorkController
 
@@ -79,7 +80,10 @@ class Worker(configurated):
 
         self.setup_defaults(kwargs, namespace="celeryd")
         if not self.concurrency:
-            self.concurrency = cpu_count()
+            try:
+                self.concurrency = cpu_count()
+            except NotImplementedError:
+                self.concurrency = 2
         self.discard = discard
         self.embed_clockservice = embed_clockservice
         if self.app.IS_WINDOWS and self.embed_clockservice:
@@ -255,7 +259,7 @@ class Worker(configurated):
 def install_worker_int_handler(worker):
 
     def _stop(signum, frame):
-        process_name = get_process_name()
+        process_name = current_process()._name
         if not process_name or process_name == "MainProcess":
             print("celeryd: Hitting Ctrl+C again will terminate "
                   "all running tasks!")
@@ -270,7 +274,7 @@ def install_worker_int_handler(worker):
 def install_worker_int_again_handler(worker):
 
     def _stop(signum, frame):
-        process_name = get_process_name()
+        process_name = current_process()._name
         if not process_name or process_name == "MainProcess":
             print("celeryd: Cold shutdown (%s)" % (process_name, ))
             worker.terminate(in_sighandler=True)
@@ -282,7 +286,7 @@ def install_worker_int_again_handler(worker):
 def install_worker_term_handler(worker):
 
     def _stop(signum, frame):
-        process_name = get_process_name()
+        process_name = current_process()._name
         if not process_name or process_name == "MainProcess":
             print("celeryd: Warm shutdown (%s)" % (process_name, ))
             worker.stop(in_sighandler=True)
@@ -294,7 +298,7 @@ def install_worker_term_handler(worker):
 def install_worker_term_hard_handler(worker):
 
     def _stop(signum, frame):
-        process_name = get_process_name()
+        process_name = current_process()._name
         if not process_name or process_name == "MainProcess":
             print("celeryd: Cold shutdown (%s)" % (process_name, ))
             worker.terminate(in_sighandler=True)

+ 7 - 2
celery/beat.py

@@ -19,6 +19,7 @@ import sys
 import threading
 import traceback
 
+from billiard import Process
 from kombu.utils import reprcall
 from kombu.utils.functional import maybe_promise
 
@@ -32,7 +33,6 @@ from .utils import cached_property
 from .utils.imports import instantiate
 from .utils.timeutils import humanize_seconds
 from .utils.log import get_logger
-from .utils.mp import Process
 
 logger = get_logger(__name__)
 debug, info, error = logger.debug, logger.info, logger.error
@@ -446,8 +446,13 @@ class _Threaded(threading.Thread):
         self.service.stop(wait=True)
 
 
-if Process is not None:
+supports_fork = True
+try:
+    import _multiprocessing
+except ImportError:
+    supports_fork = False
 
+if supports_fork:
     class _Process(Process):
         """Embedded task scheduler using multiprocessing."""
 

+ 2 - 1
celery/bin/celeryd.py

@@ -75,7 +75,8 @@ from __future__ import absolute_import
 
 import sys
 
-from celery.utils.mp import freeze_support
+from billiard import freeze_support
+
 from celery.bin.base import Command, Option
 
 

+ 1 - 1
celery/concurrency/processes/__init__.py

@@ -10,7 +10,7 @@ from celery import platforms
 from celery import signals
 from celery.app import app_or_default
 from celery.concurrency.base import BasePool
-from celery.utils.mp import Pool, RUN
+from billiard.pool import Pool, RUN
 
 if platform.system() == "Windows":  # pragma: no cover
     # On Windows os.kill calls TerminateProcess which cannot be

+ 5 - 5
celery/contrib/rdb.py

@@ -46,6 +46,8 @@ import sys
 
 from pdb import Pdb
 
+from billiard import current_process
+
 default_port = 6899
 
 CELERY_RDB_HOST = os.environ.get("CELERY_RDB_HOST") or "127.0.0.1"
@@ -67,12 +69,10 @@ class Rdb(Pdb):
         self.active = True
 
         try:
-            from celery.utils.mp import current_process
-            if current_process:
-                _, port_skew = current_process().name.split('-')
-        except (ImportError, ValueError):
+            _, port_skew = current_process().name.split('-')
+            port_skew = int(port_skew)
+        except ValueError:
             pass
-        port_skew = int(port_skew)
 
         self._prev_handles = sys.stdin, sys.stdout
         this_port = None

+ 3 - 6
celery/platforms.py

@@ -22,8 +22,8 @@ import sys
 
 from .local import try_import
 
+from billiard import current_process
 from kombu.utils.limits import TokenBucket
-from celery.utils.mp import current_process
 
 _setproctitle = try_import("setproctitle")
 resource = try_import("resource")
@@ -600,11 +600,8 @@ else:
         if not rate_limit or _setps_bucket.can_consume(1):
             if hostname:
                 progname = "%s@%s" % (progname, hostname.split(".")[0])
-            if current_process is not None:
-                return set_process_title(
-                    "%s:%s" % (progname, current_process().name), info=info)
-            else:
-                return set_process_title(progname, info=info)
+            return set_process_title(
+                "%s:%s" % (progname, current_process().name), info=info)
 
 
 def shellsplit(s, posix=True):

+ 5 - 2
celery/tests/test_app/test_beat.py

@@ -305,10 +305,13 @@ class test_Service(Case):
 class test_EmbeddedService(Case):
 
     def test_start_stop_process(self):
-        from celery.utils.mp import Process
-        if not Process:
+        try:
+            import _multiprocessing
+        except ImportError:
             raise SkipTest("multiprocessing not available")
 
+        from billiard.process import Process
+
         s = beat.EmbeddedService()
         self.assertIsInstance(s, Process)
         self.assertIsInstance(s.service, beat.Service)

+ 11 - 4
celery/tests/test_bin/test_celeryd.py

@@ -10,6 +10,8 @@ from functools import wraps
 from mock import patch
 from nose import SkipTest
 
+from billiard import current_process
+
 from celery import Celery
 from celery import platforms
 from celery import signals
@@ -20,7 +22,6 @@ from celery.exceptions import ImproperlyConfigured, SystemTerminate
 
 from celery.tests.utils import (AppCase, WhateverIO, mask_modules,
                                 reset_modules, skip_unless_module)
-from celery.utils.mp import current_process
 
 
 from celery.utils.log import ensure_process_aware_logger
@@ -427,7 +428,9 @@ class test_signal_handlers(AppCase):
 
     @disable_stdouts
     def test_worker_int_handler_only_stop_MainProcess(self):
-        if current_process is None:
+        try:
+            import _multiprocessing
+        except ImportError:
             raise SkipTest("only relevant for multiprocessing")
         process = current_process()
         name, process.name = process.name, "OtherProcess"
@@ -448,7 +451,9 @@ class test_signal_handlers(AppCase):
 
     @disable_stdouts
     def test_worker_int_again_handler_only_stop_MainProcess(self):
-        if current_process is None:
+        try:
+            import _multiprocessing
+        except ImportError:
             raise SkipTest("only relevant for multiprocessing")
         process = current_process()
         name, process.name = process.name, "OtherProcess"
@@ -484,7 +489,9 @@ class test_signal_handlers(AppCase):
 
     @disable_stdouts
     def test_worker_term_handler_only_stop_MainProcess(self):
-        if current_process is None:
+        try:
+            import _multiprocessing
+        except ImportError:
             raise SkipTest("only relevant for multiprocessing")
         process = current_process()
         name, process.name = process.name, "OtherProcess"

+ 2 - 1
celery/utils/__init__.py

@@ -22,12 +22,13 @@ from functools import partial, wraps
 from inspect import getargspec
 from pprint import pprint
 
+from billiard.util import register_after_fork
+
 from celery.exceptions import CPendingDeprecationWarning, CDeprecationWarning
 from .compat import StringIO
 
 from .imports import symbol_by_name, qualname
 from .functional import noop
-from .mp import register_after_fork
 
 PENDING_DEPRECATION_FMT = """
     %(description)s is scheduled for deprecation in \

+ 1 - 1
celery/utils/log.py

@@ -6,10 +6,10 @@ import sys
 import threading
 import traceback
 
+from billiard import current_process, util as mputil
 from kombu.log import get_logger as _get_logger, LOG_LEVELS
 
 from .encoding import safe_str, str_t
-from .mp import current_process, util as mputil
 from .term import colored
 
 _process_aware = False

+ 0 - 47
celery/utils/mp.py

@@ -1,47 +0,0 @@
-try:
-    import billiard
-    from billiard import util
-    from billiard import pool
-    current_process = billiard.current_process
-    register_after_fork = util.register_after_fork
-    freeze_support = billiard.freeze_support
-    Process = billiard.Process
-    cpu_count = billiard.cpu_count
-    Pool = pool.Pool
-    RUN = pool.RUN
-except ImportError:
-    try:
-        import multiprocessing
-        from multiprocessing import util
-        from multiprocessing import pool
-        current_process = multiprocessing.current_process
-        register_after_fork = util.register_after_fork
-        freeze_support = multiprocessing.freeze_support
-        Process = multiprocessing.Process
-        cpu_count = multiprocessing.cpu_count
-        Pool = pool.Pool
-        RUN = pool.RUN
-    except ImportError:
-        current_process = None
-        util = None
-        register_after_fork = lambda *a, **kw: None
-        freeze_support = lambda: True
-        Process = None
-        cpu_count = lambda: 2
-        Pool = None
-        RUN = 1
-
-
-def get_process_name():
-    if current_process is not None:
-        return current_process().name
-
-def forking_enable(enabled):
-    try:
-        from billiard import forking_enable
-    except ImportError:
-        try:
-            from multiprocessing import forking_enable
-        except ImportError:
-            return
-    forking_enable(enabled)

+ 1 - 1
celery/utils/patch.py

@@ -18,7 +18,7 @@ _process_aware = False
 
 def _patch_logger_class():
     """Make sure process name is recorded when loggers are used."""
-    from .mp import current_process
+    from billiard import current_process
     logging._acquireLock()
     try:
         OldLoggerClass = logging.getLoggerClass()

+ 1 - 1
celery/worker/__init__.py

@@ -21,6 +21,7 @@ import sys
 import threading
 import traceback
 
+from billiard import forking_enable
 from kombu.utils.finalize import Finalize
 
 from celery import concurrency as _concurrency
@@ -28,7 +29,6 @@ from celery.app import app_or_default, set_default_app
 from celery.app.abstract import configurated, from_config
 from celery.exceptions import SystemTerminate
 from celery.utils.functional import noop
-from celery.utils.mp import forking_enable
 from celery.utils.imports import qualname, reload_from_cwd
 from celery.utils.log import get_logger
 

+ 218 - 33
docs/reference/celery.rst

@@ -8,7 +8,22 @@
     Application
     -----------
 
-    .. autoclass:: Celery
+    .. class:: Celery(main=None, broker="amqp://guest:guest@localhost:5672//",
+                      loader="app", backend=None)
+
+        :param main: Name of the main module if running as `__main__`.
+        :keyword broker: URL of the default broker used.
+        :keyword loader: The loader class, or the name of the loader class to use.
+                         Default is :class:`celery.loaders.app.AppLoader`.
+        :keyword backend: The result store backend class, or the name of the
+                         backend class to use. Default is the value of the
+                        :setting:`CELERY_RESULT_BACKEND` setting.
+        :keyword amqp: AMQP object or class name.
+        :keyword events: Events object or class name.
+        :keyword log: Log object or class name.
+        :keyword control: Control object or class name.
+        :keyword set_as_current:  Make this the global current app.
+        :keyword tasks: A task registry or the name of a registry class.
 
         .. attribute:: main
 
@@ -17,48 +32,218 @@
             If set this will be used instead of `__main__` when automatically
             generating task names.
 
-        .. autoattribute:: conf
-        .. autoattribute:: amqp
-        .. autoattribute:: backend
-        .. autoattribute:: loader
-        .. autoattribute:: control
-        .. autoattribute:: events
-        .. autoattribute:: log
-        .. autoattribute:: tasks
-        .. autoattribute:: pool
-        .. autoattribute:: Task
+        .. attribute:: conf
+
+            Current configuration.
+
+        .. attribute:: amqp
+
+            AMQP related functionality: :class:`~@amqp`.
+
+        .. attribute:: backend
+
+            Current backend instance.
+
+        .. attribute:: loader
+
+            Current loader instance.
+        .. attribute:: control
+
+            Remote control: :class:`~@control`.
+
+        .. attribute:: events
+
+            Consuming and sending events: :class:`~@events`.
+
+        .. attribute:: log
+
+            Logging: :class:`~@log`.
+
+        .. attribute:: tasks
+
+            Task registry.
+
+        .. attribute:: pool
+
+            Broker connection pool: :class:`~@pool`.
+
+        .. attribute:: Task
+
+            Base task class for this app.
+
+
+        .. method:: bugreport
+
+            Returns a string with information useful for the Celery core
+            developers when reporting a bug.
+
+        .. method:: config_from_object(obj, silent=False)
+
+            Reads configuration from object, where object is either
+            an object or the name of a module to import.
+
+            :keyword silent: If true then import errors will be ignored.
+
+            .. code-block:: python
+
+                >>> celery.config_from_object("myapp.celeryconfig")
+
+                >>> from myapp import celeryconfig
+                >>> celery.config_from_object(celeryconfig)
+
+        .. method:: config_from_envvar(variable_name, silent=False)
+
+            Read configuration from environment variable.
+
+            The value of the environment variable must be the name
+            of a module to import.
+
+            .. code-block:: python
+
+                >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
+                >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
+
+        .. method:: config_from_cmdline(argv, namespace="celery")
+
+            Parses argv for configuration strings.
+
+            Configuration strings must be located after a '--' sequence,
+            e.g.::
+
+                program arg1 arg2 -- celeryd.concurrency=10
+
+            :keyword namespace: Default namespace if omitted.
+
+        .. method:: start(argv=None)
+            Run :program:`celery` using `argv`.
+
+            Uses :data:`sys.argv` if `argv` is not specified.
+
+        .. method:: task(fun, **options)
+            Decorator to create a task class out of any callable.
+
+            **Examples:**
+
+            .. code-block:: python
+
+                @task
+                def refresh_feed(url):
+                    return ...
+
+            with setting extra options:
+
+            .. code-block:: python
+
+                @task(exchange="feeds")
+                def refresh_feed(url):
+                    return ...
+
+            .. admonition:: App Binding
+
+                For custom apps the task decorator returns proxy
+                objects, so that the act of creating the task is not performed
+                until the task is used or the task registry is accessed.
+
+                If you are depending on binding to be deferred, then you must
+                not access any attributes on the returned object until the
+                application is fully set up (finalized).
+
+
+        .. method:: send_task(name, args=(), kwargs={}, countdown=None,
+                eta=None, task_id=None, publisher=None, connection=None,
+                result_cls=AsyncResult, expires=None, queues=None, **options)
+
+            Send task by **name**.
+
+            :param name: Name of task to execute (e.g. `"tasks.add"`).
+            :keyword result_cls: Specify custom result class. Default is
+                using :meth:`AsyncResult`.
+
+            Otherwise supports the same arguments as :meth:`~@Task.apply_async`.
+
+        .. attribute:: AsyncResult
+
+            Create new result instance. See :class:`~celery.result.AsyncResult`.
+
+        .. attribute:: TaskSetResult
+
+            Create new taskset result instance.
+            See :class:`~celery.result.TaskSetResult`.
+
+        .. method:: worker_main(argv=None)
+
+            Run :program:`celeryd` using `argv`.
+
+            Uses :data:`sys.argv` if `argv` is not specified."""
+
+        .. attribute:: Worker
+
+            Worker application. See :class:`~@Worker`.
+
+        .. attribute:: WorkController
+
+            Embeddable worker. See :class:`~@WorkController`.
+
+        .. attribute:: Beat
+
+            Celerybeat scheduler application.
+            See :class:`~@Beat`.
+
+        .. method:: broker_connection(url="amqp://guest:guest@localhost:5672//",
+               ssl=False, transport_options={})
+
+            Establish a connection to the message broker.
+
+            :param url: Either the URL or the hostname of the broker to use.
+
+            :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
+            :keyword userid: defaults to the :setting:`BROKER_USER` setting.
+            :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
+            :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
+            :keyword port: defaults to the :setting:`BROKER_PORT` setting.
+            :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
+            :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
+            :keyword backend_cls: defaults to the :setting:`BROKER_TRANSPORT`
+                setting.
+
+            :returns :class:`kombu.connection.BrokerConnection`:
+
+        """
+        .. method:: default_connection(connection=None)
+
+            For use within a with-statement to get a connection from the pool
+            if one is not already provided.
+
+            :keyword connection: If not provided, then a connection will be
+                                 acquired from the connection pool.
+
+
+        .. method:: mail_admins(subject, body, fail_silently=False)
+
+            Sends an email to the admins in the :setting:`ADMINS` setting.
 
-        .. automethod:: bugreport
+        .. method:: select_queues(queues=[])
 
-        .. automethod:: config_from_object
-        .. automethod:: config_from_envvar
-        .. automethod:: config_from_cmdline
+            Select a subset of queues, where queues must be a list of queue
+            names to keep.
 
-        .. automethod:: start
+        .. method:: now()
 
-        .. automethod:: task
-        .. automethod:: send_task
-        .. autoattribute:: AsyncResult
-        .. autoattribute:: TaskSetResult
+            Returns the current time and date as a :class:`~datetime.datetime`
+            object.
 
-        .. automethod:: worker_main
-        .. autoattribute:: Worker
-        .. autoattribute:: WorkController
-        .. autoattribute:: Beat
+        .. method:: set_current()
 
-        .. automethod:: broker_connection
-        .. automethod:: default_connection
+            Makes this the current app for this thread.
 
-        .. automethod:: mail_admins
+        .. method:: finalize()
 
-        .. automethod:: prepare_config
-        .. automethod:: select_queues
-        .. automethod:: now
+            Finalizes the app by loading built-in tasks,
+            and evaluating pending task decorators
 
-        .. automethod:: set_current
-        .. automethod:: finalize
+        .. attribute:: Pickler
 
-        .. autoattribute:: Pickler
+            Helper class used to pickle this application.
 
     Grouping Tasks
     --------------

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
-billiard>=2.7.3.2
+billiard>=2.7.3.3
 python-dateutil>=1.5,<2.0
 kombu>=2.1.6,<3.0

+ 1 - 1
setup.cfg

@@ -42,7 +42,7 @@ upload-dir = docs/.build/html
 [bdist_rpm]
 requires = uuid
            importlib
-           billiard>=2.7.3.2
+           billiard>=2.7.3.3
            python-dateutil >= 1.5
            kombu >= 2.1.6
            ordereddict