Browse Source

Use MP_MAIN_MODULE when execv is used to properly rename __main__ tasks to App.main

Ask Solem 12 năm trước cách đây
mục cha
commit
58ca503749

+ 15 - 0
celery/app/task.py

@@ -13,6 +13,7 @@ from __future__ import absolute_import
 from __future__ import with_statement
 
 import logging
+import os
 import sys
 
 from kombu import Exchange
@@ -42,6 +43,13 @@ extract_exec_options = mattrgetter("queue", "routing_key",
                                    "compression", "expires", "bare")
 
 
+#: Billiard sets this when execv is enabled.
+#: We use it to find out the name of the original ``__main__``
+#: module, so that we can properly rewrite the name of the
+#: task to be that of ``App.main``.
+MP_MAIN_FILE = os.environ.get("MP_MAIN_FILE") or None
+
+
 class Context(object):
     # Default context
     logfile = None
@@ -128,6 +136,13 @@ class TaskType(type):
         # with the framework.  There should only be one class for each task
         # name, so we always return the registered version.
         tasks = app._tasks
+
+        # - If the task module is used as the __main__ script
+        # - we need to rewrite the module part of the task name
+        # - to match App.main.
+        if MP_MAIN_FILE and sys.modules[task_module].__file__ == MP_MAIN_FILE:
+            # - see comment about :envvar:`MP_MAIN_FILE` above.
+            task_module = "__main__"
         if autoname and task_module == "__main__" and app.main:
             attrs["name"] = '.'.join([app.main, name])
 

+ 0 - 3
celery/concurrency/processes/__init__.py

@@ -8,7 +8,6 @@ import signal as _signal
 from celery import platforms
 from celery import signals
 from celery.state import set_default_app
-from celery.app import app_or_default
 from celery.concurrency.base import BasePool
 from celery.task import trace
 from billiard.pool import Pool, RUN, CLOSE
@@ -34,8 +33,6 @@ WORKER_SIGIGNORE = frozenset(["SIGINT"])
 
 def process_initializer(app, hostname):
     """Initializes the process so it can be used to process tasks."""
-    app = app_or_default(app)
-    app.set_current()
     set_default_app(app)
     trace._tasks = app._tasks  # make sure this optimization is set.
     platforms.signals.reset(*WORKER_SIGRESET)

+ 1 - 1
requirements/default-py3k.txt

@@ -1,4 +1,4 @@
-billiard>=2.7.3.7
+billiard>=2.7.3.9
 python-dateutil>=2.0
 pytz
 kombu>=2.1.8

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
-billiard>=2.7.3.8
+billiard>=2.7.3.9
 python-dateutil>=1.5,<2.0
 kombu>=2.1.8,<3.0

+ 1 - 1
setup.cfg

@@ -16,7 +16,7 @@ upload-dir = docs/.build/html
 [bdist_rpm]
 requires = uuid
            importlib
-           billiard>=2.7.3.8
+           billiard>=2.7.3.9
            python-dateutil >= 1.5
            kombu >= 2.1.8
            ordereddict