|
@@ -99,17 +99,17 @@ class WorkController(object):
|
|
|
self.setup_defaults(**kwargs)
|
|
|
self.on_after_init(**kwargs)
|
|
|
|
|
|
+ self.setup_instance(**self.prepare_args(**kwargs))
|
|
|
self._finalize = [
|
|
|
Finalize(self, self._send_worker_shutdown, exitpriority=10),
|
|
|
]
|
|
|
- self.setup_instance(**self.prepare_args(**kwargs))
|
|
|
|
|
|
def setup_instance(self, queues=None, ready_callback=None, pidfile=None,
|
|
|
include=None, use_eventloop=None, exclude_queues=None,
|
|
|
**kwargs):
|
|
|
self.pidfile = pidfile
|
|
|
self.setup_queues(queues, exclude_queues)
|
|
|
- self.setup_includes(include)
|
|
|
+ self.setup_includes(str_to_list(include))
|
|
|
|
|
|
# Set default concurrency
|
|
|
if not self.concurrency:
|
|
@@ -187,14 +187,14 @@ class WorkController(object):
|
|
|
def setup_includes(self, includes):
|
|
|
# Update celery_include to have all known task modules, so that we
|
|
|
# ensure all task modules are imported in case an execv happens.
|
|
|
- inc = self.app.conf.CELERY_INCLUDE
|
|
|
+ prev = tuple(self.app.conf.CELERY_INCLUDE)
|
|
|
if includes:
|
|
|
- includes = str_to_list(includes)
|
|
|
- inc = self.app.conf.CELERY_INCLUDE = tuple(inc) + tuple(includes)
|
|
|
+ prev += tuple(includes)
|
|
|
+ [self.app.loader.import_task_module(m) for m in includes]
|
|
|
self.include = includes
|
|
|
task_modules = set(task.__class__.__module__
|
|
|
for task in values(self.app.tasks))
|
|
|
- self.app.conf.CELERY_INCLUDE = tuple(set(inc) | task_modules)
|
|
|
+ self.app.conf.CELERY_INCLUDE = tuple(set(prev) | task_modules)
|
|
|
|
|
|
def prepare_args(self, **kwargs):
|
|
|
return kwargs
|