|
@@ -37,6 +37,7 @@ from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
|
|
|
from celery.five import items, values
|
|
|
from celery.loaders import get_loader_cls
|
|
|
from celery.local import PromiseProxy, maybe_evaluate
|
|
|
+from celery.utils import abstract
|
|
|
from celery.utils import gen_task_name
|
|
|
from celery.utils.dispatch import Signal
|
|
|
from celery.utils.functional import first, maybe_list, head_from_fun
|
|
@@ -537,8 +538,8 @@ class Celery(object):
|
|
|
# load lazy periodic tasks
|
|
|
pending_beat = self._pending_periodic_tasks
|
|
|
while pending_beat:
|
|
|
- pargs, pkwargs = pending_beat.popleft()
|
|
|
- self._add_periodic_task(*pargs, **pkwargs)
|
|
|
+ self._add_periodic_task(*pending_beat.popleft())
|
|
|
+
|
|
|
# Settings.__setitem__ method, set Settings.change
|
|
|
if self._preconf:
|
|
|
for key, value in items(self._preconf):
|
|
@@ -562,20 +563,22 @@ class Celery(object):
|
|
|
kwargs['app'] = self
|
|
|
return self.canvas.signature(*args, **kwargs)
|
|
|
|
|
|
- def add_periodic_task(self, *args, **kwargs):
|
|
|
- if not self.configured:
|
|
|
- return self._pending_periodic_tasks.append((args, kwargs))
|
|
|
- return self._add_periodic_task(*args, **kwargs)
|
|
|
-
|
|
|
- def _add_periodic_task(self, schedule, sig,
|
|
|
- args=(), kwargs={}, name=None, **opts):
|
|
|
- from .task import Task
|
|
|
-
|
|
|
- sig = (self.signature(sig.name, args, kwargs)
|
|
|
- if isinstance(sig, Task) else sig.clone(args, kwargs))
|
|
|
-
|
|
|
- name = name or ':'.join([sig.name, ','.join(map(str, sig.args))])
|
|
|
- self._conf.CELERYBEAT_SCHEDULE[name] = {
|
|
|
+ def add_periodic_task(self, schedule, sig,
|
|
|
+ args=(), kwargs=(), name=None, **opts):
|
|
|
+ key, entry = self._sig_to_periodic_task_entry(
|
|
|
+ schedule, sig, args, kwargs, name, **opts)
|
|
|
+ if self.configured:
|
|
|
+ self._add_periodic_task(key, entry)
|
|
|
+ else:
|
|
|
+ self._pending_periodic_tasks.append((key, entry))
|
|
|
+ return key
|
|
|
+
|
|
|
+ def _sig_to_periodic_task_entry(self, schedule, sig,
|
|
|
+ args=(), kwargs={}, name=None, **opts):
|
|
|
+ sig = (sig.clone(args, kwargs)
|
|
|
+ if isinstance(sig, abstract.CallableSignature)
|
|
|
+ else self.signature(sig.name, args, kwargs)
|
|
|
+ return name or repr(sig), {
|
|
|
'schedule': schedule,
|
|
|
'task': sig.name,
|
|
|
'args': sig.args,
|
|
@@ -583,6 +586,9 @@ class Celery(object):
|
|
|
'options': dict(sig.options, **opts),
|
|
|
}
|
|
|
|
|
|
+ def _add_periodic_task(self, key, entry):
|
|
|
+ self._conf.CELERYBEAT_SCHEDULE[key] = entry
|
|
|
+
|
|
|
def create_task_cls(self):
|
|
|
"""Creates a base task class using default configuration
|
|
|
taken from this app."""
|