|
@@ -94,7 +94,8 @@ class Celery(object):
|
|
|
amqp=None, events=None, log=None, control=None,
|
|
|
set_as_current=True, accept_magic_kwargs=False,
|
|
|
tasks=None, broker=None, include=None, changes=None,
|
|
|
- config_source=None, fixups=None, task_cls=None, **kwargs):
|
|
|
+ config_source=None, fixups=None, task_cls=None,
|
|
|
+ autofinalize=True, **kwargs):
|
|
|
self.clock = LamportClock()
|
|
|
self.main = main
|
|
|
self.amqp_cls = amqp or self.amqp_cls
|
|
@@ -109,6 +110,7 @@ class Celery(object):
|
|
|
self.accept_magic_kwargs = accept_magic_kwargs
|
|
|
self.user_options = defaultdict(set)
|
|
|
self.steps = defaultdict(set)
|
|
|
+ self.autofinalize = autofinalize
|
|
|
|
|
|
self.configured = False
|
|
|
self._config_source = config_source
|
|
@@ -220,6 +222,8 @@ class Celery(object):
|
|
|
return inner_create_task_cls(**opts)
|
|
|
|
|
|
def _task_from_fun(self, fun, **options):
|
|
|
+ if not self.finalized and not self.autofinalize:
|
|
|
+ raise RuntimeError('Contract breach: app not finalized')
|
|
|
base = options.pop('base', None) or self.Task
|
|
|
bind = options.pop('bind', False)
|
|
|
|
|
@@ -233,9 +237,11 @@ class Celery(object):
|
|
|
task = self._tasks[T.name] # return global instance.
|
|
|
return task
|
|
|
|
|
|
- def finalize(self):
|
|
|
+ def finalize(self, auto=False):
|
|
|
with self._finalize_mutex:
|
|
|
if not self.finalized:
|
|
|
+ if auto and not self.autofinalize:
|
|
|
+ raise RuntimeError('Contract breach: app not finalized')
|
|
|
self.finalized = True
|
|
|
load_shared_tasks(self)
|
|
|
|
|
@@ -626,7 +632,7 @@ class Celery(object):
|
|
|
|
|
|
@cached_property
|
|
|
def tasks(self):
|
|
|
- self.finalize()
|
|
|
+ self.finalize(auto=True)
|
|
|
return self._tasks
|
|
|
|
|
|
@cached_property
|