|
@@ -130,7 +130,7 @@ class Scheduler(UserDict):
|
|
|
Entry = ScheduleEntry
|
|
|
|
|
|
def __init__(self, schedule=None, logger=None, max_interval=None,
|
|
|
- app=None, **kwargs):
|
|
|
+ app=None, Publisher=None, **kwargs):
|
|
|
UserDict.__init__(self)
|
|
|
if schedule is None:
|
|
|
schedule = {}
|
|
@@ -141,14 +141,15 @@ class Scheduler(UserDict):
|
|
|
name="celery.beat")
|
|
|
self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
|
|
|
self.setup_schedule()
|
|
|
+ self.Publisher = Publisher or self.app.amqp.TaskPublisher
|
|
|
|
|
|
- def maybe_due(self, entry, connection=None):
|
|
|
+ def maybe_due(self, entry, publisher=None):
|
|
|
is_due, next_time_to_run = entry.is_due()
|
|
|
|
|
|
if is_due:
|
|
|
self.logger.debug("Scheduler: Sending due task %s" % entry.task)
|
|
|
try:
|
|
|
- result = self.apply_async(entry, connection=connection)
|
|
|
+ result = self.apply_async(entry, publisher=publisher)
|
|
|
except SchedulingError, exc:
|
|
|
self.logger.error("Scheduler: %s" % exc)
|
|
|
else:
|
|
@@ -164,15 +165,17 @@ class Scheduler(UserDict):
|
|
|
"""
|
|
|
remaining_times = []
|
|
|
connection = self.app.broker_connection()
|
|
|
+ publisher = self.Publisher(connection=connection)
|
|
|
try:
|
|
|
try:
|
|
|
for entry in self.schedule.itervalues():
|
|
|
- next_time_to_run = self.maybe_due(entry, connection)
|
|
|
+ next_time_to_run = self.maybe_due(entry, publisher)
|
|
|
if next_time_to_run:
|
|
|
remaining_times.append(next_time_to_run)
|
|
|
except RuntimeError:
|
|
|
pass
|
|
|
finally:
|
|
|
+ publisher.close()
|
|
|
connection.close()
|
|
|
|
|
|
return min(remaining_times + [self.max_interval])
|
|
@@ -181,15 +184,26 @@ class Scheduler(UserDict):
|
|
|
new_entry = self.schedule[entry.name] = entry.next()
|
|
|
return new_entry
|
|
|
|
|
|
- def apply_async(self, entry, connection=None, **kwargs):
|
|
|
+ def apply_async(self, entry, publisher=None, **kwargs):
|
|
|
# Update timestamps and run counts before we actually execute,
|
|
|
# so we have that done if an exception is raised (doesn't schedule
|
|
|
# forever.)
|
|
|
entry = self.reserve(entry)
|
|
|
|
|
|
try:
|
|
|
- result = self.send_task(entry.task, entry.args, entry.kwargs,
|
|
|
- connection=connection, **entry.options)
|
|
|
+ task = registry.tasks[entry.task]
|
|
|
+ except KeyError:
|
|
|
+ task = None
|
|
|
+
|
|
|
+ try:
|
|
|
+ if task:
|
|
|
+ result = task.apply_async(entry.args, entry.kwargs,
|
|
|
+ publisher=publisher,
|
|
|
+ **entry.options)
|
|
|
+ else:
|
|
|
+ result = self.send_task(entry.task, entry.args, entry.kwargs,
|
|
|
+ publisher=publisher,
|
|
|
+ **entry.options)
|
|
|
except Exception, exc:
|
|
|
raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
|
|
|
entry.name, exc))
|