|
@@ -174,13 +174,13 @@ class Scheduler(object):
|
|
|
logger = logger # compat
|
|
|
|
|
|
def __init__(self, app, schedule=None, max_interval=None,
|
|
|
- Publisher=None, lazy=False, sync_every_tasks=None, **kwargs):
|
|
|
+ Producer=None, lazy=False, sync_every_tasks=None, **kwargs):
|
|
|
self.app = app
|
|
|
self.data = maybe_evaluate({} if schedule is None else schedule)
|
|
|
self.max_interval = (max_interval
|
|
|
or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
|
|
|
or self.max_interval)
|
|
|
- self.Publisher = Publisher or app.amqp.TaskProducer
|
|
|
+ self.Producer = Producer or app.amqp.TaskProducer
|
|
|
self._heap = None
|
|
|
self.sync_every_tasks = (
|
|
|
app.conf.CELERYBEAT_SYNC_EVERY if sync_every_tasks is None
|
|
@@ -199,11 +199,11 @@ class Scheduler(object):
|
|
|
'options': {'expires': 12 * 3600}}
|
|
|
self.update_from_dict(entries)
|
|
|
|
|
|
- def maybe_due(self, entry, publisher=None):
|
|
|
+ def maybe_due(self, entry, producer=None):
|
|
|
is_due, next_time_to_run = entry.is_due()
|
|
|
|
|
|
if is_due:
|
|
|
- self.apply_entry(entry, producer=publisher, advance=True)
|
|
|
+ self.apply_entry(entry, producer=producer, advance=True)
|
|
|
return next_time_to_run
|
|
|
|
|
|
def apply_entry(self, entry, producer=None):
|
|
@@ -234,7 +234,7 @@ class Scheduler(object):
|
|
|
verify = heappop(H)
|
|
|
if verify is event:
|
|
|
next_entry = self.reserve(entry)
|
|
|
- self.apply_entry(entry, producer=self.publisher)
|
|
|
+ self.apply_entry(entry, producer=self.producer)
|
|
|
heappush(H, event_t(next_time_to_run, event[1], next_entry))
|
|
|
return 0
|
|
|
else:
|
|
@@ -355,8 +355,8 @@ class Scheduler(object):
|
|
|
return self.app.connection()
|
|
|
|
|
|
@cached_property
|
|
|
- def publisher(self):
|
|
|
- return self.Publisher(self._ensure_connected())
|
|
|
+ def producer(self):
|
|
|
+ return self.Producer(self._ensure_connected())
|
|
|
|
|
|
@property
|
|
|
def info(self):
|