|
@@ -295,7 +295,8 @@ class Consumer(object):
|
|
|
self.app.amqp.queues.select_remove(queue)
|
|
|
self.task_consumer.cancel_by_queue(queue)
|
|
|
|
|
|
- def on_task(self, task, task_reserved=task_reserved):
|
|
|
+ def on_task(self, task, task_reserved=task_reserved,
|
|
|
+ to_system_tz=timezone.to_system):
|
|
|
"""Handle received task.
|
|
|
|
|
|
If the task has an `eta` we enter it into the ETA schedule,
|
|
@@ -317,9 +318,11 @@ class Consumer(object):
|
|
|
expires=task.expires and task.expires.isoformat())
|
|
|
|
|
|
if task.eta:
|
|
|
- eta = timezone.to_system(task.eta) if task.utc else task.eta
|
|
|
try:
|
|
|
- eta = to_timestamp(eta)
|
|
|
+ if task.utc:
|
|
|
+ eta = to_timestamp(to_system_tz(task.eta))
|
|
|
+ else:
|
|
|
+ eta = to_timestamp(task.eta, timezone.local)
|
|
|
except OverflowError as exc:
|
|
|
error("Couldn't convert eta %s to timestamp: %r. Task: %r",
|
|
|
task.eta, exc, task.info(safe=True), exc_info=True)
|