|
@@ -5,7 +5,7 @@ from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
|
|
|
from celery.log import setup_logger
|
|
|
from celery.registry import tasks
|
|
|
from celery.process import ProcessQueue
|
|
|
-from celery.models import PeriodicTaskMeta
|
|
|
+from celery.models import RetryTask, PeriodicTaskMeta
|
|
|
import multiprocessing
|
|
|
import simplejson
|
|
|
import traceback
|
|
@@ -45,7 +45,9 @@ class TaskWrapper(object):
|
|
|
|
|
|
def extend_kwargs_with_logging(self, loglevel, logfile):
|
|
|
task_func_kwargs = {"logfile": logfile,
|
|
|
- "loglevel": loglevel}
|
|
|
+ "loglevel": loglevel,
|
|
|
+ "task_id": self.task_id,
|
|
|
+ "task_name": self.task_name}
|
|
|
task_func_kwargs.update(self.kwargs)
|
|
|
return task_func_kwargs
|
|
|
|
|
@@ -141,6 +143,13 @@ class TaskDaemon(object):
|
|
|
for waiting_task in waiting_tasks]
|
|
|
return waiting_tasks
|
|
|
|
|
|
+ def schedule_retry_tasks(self):
|
|
|
+ """Reschedule all requeued tasks waiting for retry."""
|
|
|
+ retry_tasks = RetryTask.objects.get_waiting_tasks()
|
|
|
+ [retry_task.retry()
|
|
|
+ for retry_task in retry_tasks]
|
|
|
+ return retry_tasks
|
|
|
+
|
|
|
def run(self):
|
|
|
"""The worker server's main loop."""
|
|
|
results = ProcessQueue(self.concurrency, logger=self.logger,
|
|
@@ -149,6 +158,7 @@ class TaskDaemon(object):
|
|
|
ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
|
|
|
events = [
|
|
|
EventTimer(self.run_periodic_tasks, 1),
|
|
|
+ EventTimer(self.schedule_retry_tasks, 2),
|
|
|
EventTimer(self.connection_diagnostics, 3),
|
|
|
EventTimer(self.reset_connection, 60 * 5),
|
|
|
]
|