Browse Source

Fixes pickling of configuration modules when using execv/fork emulation. Closes #1126

Ask Solem 12 years ago
parent
commit
60aa98fb57
2 changed files with 29 additions and 5 deletions
  1. 12 3
      celery/app/base.py
  2. 17 2
      celery/app/utils.py

+ 12 - 3
celery/app/base.py

@@ -18,6 +18,7 @@ from contextlib import contextmanager
 from copy import deepcopy
 from functools import wraps
 
+from billiard import forking as _forking
 from billiard.util import register_after_fork
 from kombu.clocks import LamportClock
 from kombu.utils import cached_property
@@ -63,7 +64,8 @@ class Celery(object):
     def __init__(self, main=None, loader=None, backend=None,
                  amqp=None, events=None, log=None, control=None,
                  set_as_current=True, accept_magic_kwargs=False,
-                 tasks=None, broker=None, include=None, **kwargs):
+                 tasks=None, broker=None, include=None, changes=None,
+                 **kwargs):
         self.clock = LamportClock()
         self.main = main
         self.amqp_cls = amqp or self.amqp_cls
@@ -88,7 +90,7 @@ class Celery(object):
 
         # these options are moved to the config to
         # simplify pickling of the app object.
-        self._preconf = {}
+        self._preconf = changes or {}
         if broker:
             self._preconf['BROKER_URL'] = broker
         if include:
@@ -408,7 +410,14 @@ class Celery(object):
         )
 
     def __reduce_args__(self):
-        return (self.main, self.conf.changes, self.loader_cls,
+        # _pickleable_changes will also try to include keys from configuration
+        # modules which is necessary when multiprocessing execv/fork emulation
+        # is enabled.  There may be a better way to do this, but attempts
+        # at forcing the subprocess to import the modules did not work out,
+        # apparently some sys.path problem.  More at Issue 1126.
+        conf = (self.conf.changes if _forking._forking_is_enabled
+                else self.conf._pickleable_changes())
+        return (self.main, conf, self.loader_cls,
                 self.backend_cls, self.amqp_cls, self.events_cls,
                 self.log_cls, self.control_cls, self.accept_magic_kwargs)
 

+ 17 - 2
celery/app/utils.py

@@ -10,9 +10,10 @@ from __future__ import absolute_import
 
 import os
 import platform as _platform
+import types
 
-from celery import datastructures
 from celery import platforms
+from celery.datastructures import ConfigurationView, DictAttribute
 from celery.utils.text import pretty
 from celery.utils.imports import qualname
 
@@ -30,7 +31,7 @@ settings -> transport:%(transport)s results:%(results)s
 """
 
 
-class Settings(datastructures.ConfigurationView):
+class Settings(ConfigurationView):
     """Celery settings object."""
 
     @property
@@ -62,6 +63,20 @@ class Settings(datastructures.ConfigurationView):
         # the last stash is the default settings, so just skip that
         return Settings({}, self._order[:-1])
 
+    def _pickleable_changes(self):
+        # attempt to include keys from configuration modules,
+        # to work with multiprocessing execv/fork emulation.
+        # see note at celery.app.base:Celery.__reduce_args__.
+        R = {}
+        for d in reversed(self._order[:-1]):
+            if isinstance(d, DictAttribute):
+                d = object.__getattribute__(d, 'obj')
+                if isinstance(d, types.ModuleType):
+                    d = dict((k, v) for k, v in vars(d).iteritems()
+                             if not k.startswith('__') and k.isupper())
+            R.update(d)
+        return R
+
     def find_option(self, name, namespace='celery'):
         """Search for option by name.