瀏覽代碼

Use datetime.isoformat() to send eta, and dateutil.parse.parse to receive eta.

Ask Solem 15 年之前
父節點
當前提交
b2a3d95cd8
共有 2 個文件被更改,包括 5 次插入1 次删除
  1. 3 1
      celery/messaging.py
  2. 2 0
      celery/worker/__init__.py

+ 3 - 1
celery/messaging.py

@@ -46,6 +46,8 @@ class TaskPublisher(Publisher):
         """INTERNAL"""
         """INTERNAL"""
 
 
         task_id = task_id or gen_unique_id()
         task_id = task_id or gen_unique_id()
+        eta = kwargs.get("eta")
+        eta = eta and eta.isoformat()
 
 
         message_data = {
         message_data = {
             "task": task_name,
             "task": task_name,
@@ -53,7 +55,7 @@ class TaskPublisher(Publisher):
             "args": task_args or [],
             "args": task_args or [],
             "kwargs": task_kwargs or {},
             "kwargs": task_kwargs or {},
             "retries": kwargs.get("retries", 0),
             "retries": kwargs.get("retries", 0),
-            "eta": kwargs.get("eta"),
+            "eta": eta,
         }
         }
 
 
         if part_of_set:
         if part_of_set:

+ 2 - 0
celery/worker/__init__.py

@@ -8,6 +8,7 @@ import logging
 import socket
 import socket
 from Queue import Queue
 from Queue import Queue
 
 
+import dateutil
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 
 
 from celery import conf
 from celery import conf
@@ -106,6 +107,7 @@ class CarrotListener(object):
 
 
         eta = message_data.get("eta")
         eta = message_data.get("eta")
         if eta:
         if eta:
+            eta = dateutil.parser.parse(eta)
             self.prefetch_count.increment()
             self.prefetch_count.increment()
             self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
             self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
                     task.task_name, task.task_id, eta))
                     task.task_name, task.task_id, eta))