Jelajahi Sumber

Makes sure tasks always use the right app when using force_execv. Closes #1072

Ask Solem 12 tahun lalu
induk
melakukan
89b760cd83

+ 11 - 0
celery/app/base.py

@@ -9,6 +9,7 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 from __future__ import with_statement
 from __future__ import with_statement
 
 
+import os
 import threading
 import threading
 import warnings
 import warnings
 
 
@@ -35,6 +36,8 @@ from .defaults import DEFAULTS, find_deprecated_settings
 from .registry import TaskRegistry
 from .registry import TaskRegistry
 from .utils import AppPickler, Settings, bugreport, _unpickle_app
 from .utils import AppPickler, Settings, bugreport, _unpickle_app
 
 
+_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
+
 
 
 def _unpickle_appattr(reverse_name, args):
 def _unpickle_appattr(reverse_name, args):
     """Given an attribute name and a list of args, gets
     """Given an attribute name and a list of args, gets
@@ -122,6 +125,14 @@ class Celery(object):
 
 
     def task(self, *args, **opts):
     def task(self, *args, **opts):
         """Creates new task class from any callable."""
         """Creates new task class from any callable."""
+        if _EXECV and not opts.get('_force_evaluate'):
+            # When using execv the task in the original module will point to a
+            # different app, so doing things like 'add.request' will point to
+            # a differnt task instance.  This makes sure it will always use
+            # the task instance from the current app.
+            # Really need a better solution for this :(
+            from . import shared_task as proxies_to_curapp
+            return proxies_to_curapp(*args, _force_evaluate=True, **opts)
 
 
         def inner_create_task_cls(shared=True, filter=None, **opts):
         def inner_create_task_cls(shared=True, filter=None, **opts):
 
 

+ 5 - 5
celery/app/builtins.py

@@ -53,7 +53,7 @@ def add_backend_cleanup_task(app):
 
 
     """
     """
 
 
-    @app.task(name='celery.backend_cleanup')
+    @app.task(name='celery.backend_cleanup', _force_evaluate=True)
     def backend_cleanup():
     def backend_cleanup():
         app.backend.cleanup()
         app.backend.cleanup()
     return backend_cleanup
     return backend_cleanup
@@ -71,7 +71,7 @@ def add_unlock_chord_task(app):
     from celery import result as _res
     from celery import result as _res
 
 
     @app.task(name='celery.chord_unlock', max_retries=None,
     @app.task(name='celery.chord_unlock', max_retries=None,
-              default_retry_delay=1, ignore_result=True)
+              default_retry_delay=1, ignore_result=True, _force_evaluate=True)
     def unlock_chord(group_id, callback, interval=None, propagate=False,
     def unlock_chord(group_id, callback, interval=None, propagate=False,
             max_retries=None, result=None):
             max_retries=None, result=None):
         if interval is None:
         if interval is None:
@@ -89,7 +89,7 @@ def add_unlock_chord_task(app):
 def add_map_task(app):
 def add_map_task(app):
     from celery.canvas import subtask
     from celery.canvas import subtask
 
 
-    @app.task(name='celery.map')
+    @app.task(name='celery.map', _force_evaluate=True)
     def xmap(task, it):
     def xmap(task, it):
         task = subtask(task).type
         task = subtask(task).type
         return list(map(task, it))
         return list(map(task, it))
@@ -100,7 +100,7 @@ def add_map_task(app):
 def add_starmap_task(app):
 def add_starmap_task(app):
     from celery.canvas import subtask
     from celery.canvas import subtask
 
 
-    @app.task(name='celery.starmap')
+    @app.task(name='celery.starmap', _force_evaluate=True)
     def xstarmap(task, it):
     def xstarmap(task, it):
         task = subtask(task).type
         task = subtask(task).type
         return list(starmap(task, it))
         return list(starmap(task, it))
@@ -111,7 +111,7 @@ def add_starmap_task(app):
 def add_chunk_task(app):
 def add_chunk_task(app):
     from celery.canvas import chunks as _chunks
     from celery.canvas import chunks as _chunks
 
 
-    @app.task(name='celery.chunks')
+    @app.task(name='celery.chunks', _force_evaluate=True)
     def chunks(task, it, n):
     def chunks(task, it, n):
         return _chunks.apply_chunks(task, it, n)
         return _chunks.apply_chunks(task, it, n)
     return chunks
     return chunks

+ 8 - 5
celery/concurrency/processes/__init__.py

@@ -35,9 +35,6 @@ WORKER_SIGIGNORE = frozenset(['SIGINT'])
 
 
 def process_initializer(app, hostname):
 def process_initializer(app, hostname):
     """Initializes the process so it can be used to process tasks."""
     """Initializes the process so it can be used to process tasks."""
-    app.set_current()
-    set_default_app(app)
-    trace._tasks = app._tasks  # make sure this optimization is set.
     platforms.signals.reset(*WORKER_SIGRESET)
     platforms.signals.reset(*WORKER_SIGRESET)
     platforms.signals.ignore(*WORKER_SIGIGNORE)
     platforms.signals.ignore(*WORKER_SIGIGNORE)
     platforms.set_mp_process_title('celeryd', hostname=hostname)
     platforms.set_mp_process_title('celeryd', hostname=hostname)
@@ -50,8 +47,14 @@ def process_initializer(app, hostname):
                   str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
                   str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
     app.loader.init_worker()
     app.loader.init_worker()
     app.loader.init_worker_process()
     app.loader.init_worker_process()
-    app.finalize()
-
+    if os.environ.get('FORKED_BY_MULTIPROCESSING'):
+        # pool did execv after fork
+        trace.setup_worker_optimizations(app)
+    else:
+        app.set_current()
+        set_default_app(app)
+        app.finalize()
+        trace._tasks = app._tasks  # enables fast_trace_task optimization.
     from celery.task.trace import build_tracer
     from celery.task.trace import build_tracer
     for name, task in app.tasks.iteritems():
     for name, task in app.tasks.iteritems():
         task.__trace__ = build_tracer(name, task, app.loader, hostname)
         task.__trace__ = build_tracer(name, task, app.loader, hostname)

+ 1 - 0
celery/task/trace.py

@@ -345,6 +345,7 @@ def setup_worker_optimizations(app):
     # we set this to always return our app.  This is a hack,
     # we set this to always return our app.  This is a hack,
     # and means that only a single app can be used for workers
     # and means that only a single app can be used for workers
     # running in the same process.
     # running in the same process.
+    app.set_current()
     set_default_app(app)
     set_default_app(app)
 
 
     # evaluate all task classes by finalizing the app.
     # evaluate all task classes by finalizing the app.