|
@@ -23,7 +23,7 @@ from celery.utils.functional import noop
|
|
|
from celery.utils.imports import qualname
|
|
|
from celery.utils.log import get_logger
|
|
|
from celery.utils.text import dump_body
|
|
|
-from celery.utils.timeutils import humanize_seconds
|
|
|
+from celery.utils.timeutils import humanize_seconds, timezone
|
|
|
from celery.worker import state
|
|
|
from celery.worker.state import maybe_shutdown
|
|
|
from celery.worker.bootsteps import Namespace as _NS, StartStopComponent, CLOSE
|
|
@@ -374,16 +374,18 @@ class Consumer(object):
|
|
|
expires=task.expires and task.expires.isoformat())
|
|
|
|
|
|
if task.eta:
|
|
|
+ eta = timezone.to_system(task.eta) if task.utc else task.eta
|
|
|
try:
|
|
|
- eta = to_timestamp(task.eta)
|
|
|
+ eta = to_timestamp(eta)
|
|
|
except OverflowError as exc:
|
|
|
error("Couldn't convert eta %s to timestamp: %r. Task: %r",
|
|
|
task.eta, exc, task.info(safe=True), exc_info=True)
|
|
|
task.acknowledge()
|
|
|
else:
|
|
|
self.qos.increment_eventually()
|
|
|
- self.timer.apply_at(eta, self.apply_eta_task, (task, ),
|
|
|
- priority=6)
|
|
|
+ self.timer.apply_at(
|
|
|
+ eta, self.apply_eta_task, (task, ), priority=6,
|
|
|
+ )
|
|
|
else:
|
|
|
task_reserved(task)
|
|
|
self._quick_put(task)
|