|
@@ -92,6 +92,7 @@ from celery.utils import text
|
|
|
from celery.utils import timer2
|
|
|
from celery.utils.functional import noop
|
|
|
from celery.utils.log import get_logger
|
|
|
+from celery.utils.timer2 import to_timestamp
|
|
|
from celery.utils.timeutils import humanize_seconds, timezone
|
|
|
|
|
|
from . import state
|
|
@@ -494,7 +495,8 @@ class Consumer(object):
|
|
|
|
|
|
sleep(min(poll_timeout, 0.1))
|
|
|
|
|
|
- def on_task(self, task, task_reserved=task_reserved):
|
|
|
+ def on_task(self, task, task_reserved=task_reserved,
|
|
|
+ to_system_tz=timezone.to_system):
|
|
|
"""Handle received task.
|
|
|
|
|
|
If the task has an `eta` we enter it into the ETA schedule,
|
|
@@ -516,9 +518,11 @@ class Consumer(object):
|
|
|
expires=task.expires and task.expires.isoformat())
|
|
|
|
|
|
if task.eta:
|
|
|
- eta = timezone.to_system(task.eta) if task.utc else task.eta
|
|
|
try:
|
|
|
- eta = timer2.to_timestamp(eta)
|
|
|
+ if task.utc:
|
|
|
+ eta = to_timestamp(to_system_tz(task.eta))
|
|
|
+ else:
|
|
|
+ eta = to_timestamp(task.eta, timezone.local)
|
|
|
except OverflowError, exc:
|
|
|
error("Couldn't convert eta %s to timestamp: %r. Task: %r",
|
|
|
task.eta, exc, task.info(safe=True), exc_info=True)
|