|
@@ -17,12 +17,14 @@ from contextlib import contextmanager
|
|
|
from copy import deepcopy
|
|
|
from operator import attrgetter
|
|
|
|
|
|
+from amqp import promise
|
|
|
from billiard.util import register_after_fork
|
|
|
from kombu.clocks import LamportClock
|
|
|
from kombu.common import oid_from
|
|
|
from kombu.utils import cached_property, uuid
|
|
|
|
|
|
from celery import platforms
|
|
|
+from celery import signals
|
|
|
from celery._state import (
|
|
|
_task_stack, _tls, get_current_app, set_default_app,
|
|
|
_register_app, get_current_worker_task,
|
|
@@ -257,13 +259,13 @@ class Celery(object):
|
|
|
del(self.conf)
|
|
|
return self.loader.config_from_object(obj, silent=silent)
|
|
|
|
|
|
- def config_from_envvar(self, variable_name, silent=False):
|
|
|
+ def config_from_envvar(self, variable_name, silent=False, force=False):
|
|
|
module_name = os.environ.get(variable_name)
|
|
|
if not module_name:
|
|
|
if silent:
|
|
|
return False
|
|
|
raise ImproperlyConfigured(ERR_ENVVAR_NOT_SET.format(module_name))
|
|
|
- return self.config_from_object(module_name, silent=silent)
|
|
|
+ return self.config_from_object(module_name, silent=silent, force=force)
|
|
|
|
|
|
def config_from_cmdline(self, argv, namespace='celery'):
|
|
|
self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
|
|
@@ -274,7 +276,16 @@ class Celery(object):
|
|
|
return setup_security(allowed_serializers, key, cert,
|
|
|
store, digest, serializer, app=self)
|
|
|
|
|
|
- def autodiscover_tasks(self, packages, related_name='tasks'):
|
|
|
+ def autodiscover_tasks(self, packages, related_name='tasks', force=False):
|
|
|
+ if force:
|
|
|
+ return self._autodiscover_tasks(packages, related_name)
|
|
|
+ signals.import_modules.connect(promise(
|
|
|
+ self._autodiscover_tasks, (packages, related_name),
|
|
|
+ ), weak=False, sender=self)
|
|
|
+
|
|
|
+ def _autodiscover_tasks(self, packages, related_name='tasks', **kwargs):
|
|
|
+ # argument may be lazy
|
|
|
+ packages = packages() if isinstance(packages, Callable) else packages
|
|
|
if self.conf.CELERY_FORCE_BILLIARD_LOGGING:
|
|
|
# we'll use billiard's processName instead of
|
|
|
# multiprocessing's one in all the loggers
|