|
@@ -27,7 +27,7 @@ from kombu.clocks import LamportClock
|
|
|
from kombu.utils import cached_property
|
|
|
|
|
|
from celery import platforms
|
|
|
-from celery.exceptions import AlwaysEagerIgnored
|
|
|
+from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
|
|
|
from celery.loaders import get_loader_cls
|
|
|
from celery.local import PromiseProxy, maybe_evaluate
|
|
|
from celery._state import _task_stack, _tls, get_current_app, _register_app
|
|
@@ -68,6 +68,7 @@ class Celery(object):
|
|
|
amqp=None, events=None, log=None, control=None,
|
|
|
set_as_current=True, accept_magic_kwargs=False,
|
|
|
tasks=None, broker=None, include=None, changes=None,
|
|
|
+ config_source=None,
|
|
|
**kwargs):
|
|
|
self.clock = LamportClock()
|
|
|
self.main = main
|
|
@@ -80,6 +81,7 @@ class Celery(object):
|
|
|
self.set_as_current = set_as_current
|
|
|
self.registry_cls = symbol_by_name(self.registry_cls)
|
|
|
self.accept_magic_kwargs = accept_magic_kwargs
|
|
|
+ self._config_source = config_source
|
|
|
|
|
|
self.configured = False
|
|
|
self._pending_defaults = deque()
|
|
@@ -101,6 +103,14 @@ class Celery(object):
|
|
|
|
|
|
if self.set_as_current:
|
|
|
self.set_current()
|
|
|
+
|
|
|
+ # See Issue #1126
|
|
|
+ # this is used when pickling the app object so that configuration
|
|
|
+ # is reread without having to pickle the contents
|
|
|
+ # (which is often unpickleable anyway)
|
|
|
+ if self._config_source:
|
|
|
+ self.config_from_object(self._config_source)
|
|
|
+
|
|
|
self.on_init()
|
|
|
_register_app(self)
|
|
|
|
|
@@ -203,11 +213,16 @@ class Celery(object):
|
|
|
|
|
|
def config_from_object(self, obj, silent=False):
|
|
|
del(self.conf)
|
|
|
+ self._config_source = obj
|
|
|
return self.loader.config_from_object(obj, silent=silent)
|
|
|
|
|
|
def config_from_envvar(self, variable_name, silent=False):
|
|
|
- del(self.conf)
|
|
|
- return self.loader.config_from_envvar(variable_name, silent=silent)
|
|
|
+ module_name = os.environ.get(variable_name)
|
|
|
+ if not module_name:
|
|
|
+ if silent:
|
|
|
+ return False
|
|
|
+ raise ImproperlyConfigured(self.error_envvar_not_set % module_name)
|
|
|
+ return self.config_from_object(module_name, silent=silent)
|
|
|
|
|
|
def config_from_cmdline(self, argv, namespace='celery'):
|
|
|
self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
|
|
@@ -413,20 +428,10 @@ class Celery(object):
|
|
|
)
|
|
|
|
|
|
def __reduce_args__(self):
|
|
|
- # _pickleable_changes will also try to include keys from configuration
|
|
|
- # modules which is necessary when multiprocessing execv/fork emulation
|
|
|
- # is enabled. There may be a better way to do this, but attempts
|
|
|
- # at forcing the subprocess to import the modules did not work out,
|
|
|
- # apparently some sys.path problem. More at Issue 1126.
|
|
|
- if self.IS_WINDOWS:
|
|
|
- conf = {}
|
|
|
- else:
|
|
|
- conf = (self.conf.changes
|
|
|
- if _forking and _forking._forking_is_enabled
|
|
|
- else self.conf._pickleable_changes())
|
|
|
- return (self.main, conf, self.loader_cls,
|
|
|
+ return (self.main, self.conf.changes, self.loader_cls,
|
|
|
self.backend_cls, self.amqp_cls, self.events_cls,
|
|
|
- self.log_cls, self.control_cls, self.accept_magic_kwargs)
|
|
|
+ self.log_cls, self.control_cls, self.accept_magic_kwargs,
|
|
|
+ self._config_source)
|
|
|
|
|
|
@cached_property
|
|
|
def Worker(self):
|