Ver Fonte

Adds setting CELERY_ENABLE_UTC and renames tz message field to 'utc'

Ask Solem há 13 anos atrás
pai
commit
0a89eae8b3
4 ficheiros alterados com 20 adições e 11 exclusões
  1. 3 6
      celery/app/amqp.py
  2. 1 0
      celery/app/defaults.py
  3. 3 3
      celery/worker/job.py
  4. 13 2
      docs/internals/protocol.rst

+ 3 - 6
celery/app/amqp.py

@@ -21,11 +21,6 @@ from .. import routes as _routes
 from .. import signals
 from .. import signals
 from ..utils import cached_property, textindent, uuid
 from ..utils import cached_property, textindent, uuid
 
 
-# UTC timezone mark.
-# Defaults to local in 2.5, and UTC in 3.x.
-TZ_LOCAL = 0x0
-TZ_UTC = 0x1
-
 #: List of known options to a Kombu producers send method.
 #: List of known options to a Kombu producers send method.
 #: Used to extract the message related options out of any `dict`.
 #: Used to extract the message related options out of any `dict`.
 MSG_OPTIONS = ("mandatory", "priority", "immediate", "routing_key",
 MSG_OPTIONS = ("mandatory", "priority", "immediate", "routing_key",
@@ -159,6 +154,7 @@ class TaskPublisher(messaging.Publisher):
         self.retry = kwargs.pop("retry", self.retry)
         self.retry = kwargs.pop("retry", self.retry)
         self.retry_policy = kwargs.pop("retry_policy",
         self.retry_policy = kwargs.pop("retry_policy",
                                         self.retry_policy or {})
                                         self.retry_policy or {})
+        self.utc = kwargs.pop("enable_utc", False)
         super(TaskPublisher, self).__init__(*args, **kwargs)
         super(TaskPublisher, self).__init__(*args, **kwargs)
 
 
     def declare(self):
     def declare(self):
@@ -228,7 +224,7 @@ class TaskPublisher(messaging.Publisher):
                 "retries": retries or 0,
                 "retries": retries or 0,
                 "eta": eta,
                 "eta": eta,
                 "expires": expires,
                 "expires": expires,
-                "tz": TZ_LOCAL}
+                "utc": self.utc}
         if taskset_id:
         if taskset_id:
             body["taskset"] = taskset_id
             body["taskset"] = taskset_id
         if chord:
         if chord:
@@ -326,6 +322,7 @@ class AMQP(object):
                     "serializer": conf.CELERY_TASK_SERIALIZER,
                     "serializer": conf.CELERY_TASK_SERIALIZER,
                     "retry": conf.CELERY_TASK_PUBLISH_RETRY,
                     "retry": conf.CELERY_TASK_PUBLISH_RETRY,
                     "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
                     "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
+                    "enable_utc": conf.CELERY_ENABLE_UTC,
                     "app": self}
                     "app": self}
         return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
         return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
 
 

+ 1 - 0
celery/app/defaults.py

@@ -103,6 +103,7 @@ NAMESPACES = {
         "DEFAULT_EXCHANGE_TYPE": Option("direct"),
         "DEFAULT_EXCHANGE_TYPE": Option("direct"),
         "DEFAULT_DELIVERY_MODE": Option(2, type="string"),
         "DEFAULT_DELIVERY_MODE": Option(2, type="string"),
         "EAGER_PROPAGATES_EXCEPTIONS": Option(False, type="bool"),
         "EAGER_PROPAGATES_EXCEPTIONS": Option(False, type="bool"),
+        "ENABLE_UTC": Option(False, type="bool"),
         "EVENT_SERIALIZER": Option("json"),
         "EVENT_SERIALIZER": Option("json"),
         "IMPORTS": Option((), type="tuple"),
         "IMPORTS": Option((), type="tuple"),
         "IGNORE_RESULT": Option(False, type="bool"),
         "IGNORE_RESULT": Option(False, type="bool"),

+ 3 - 3
celery/worker/job.py

@@ -241,7 +241,7 @@ class TaskRequest(object):
     def __init__(self, task_name, task_id, args, kwargs,
     def __init__(self, task_name, task_id, args, kwargs,
             on_ack=noop, retries=0, delivery_info=None, hostname=None,
             on_ack=noop, retries=0, delivery_info=None, hostname=None,
             logger=None, eventer=None, eta=None, expires=None, app=None,
             logger=None, eventer=None, eta=None, expires=None, app=None,
-            taskset_id=None, chord=None, tz=0x0, **opts):
+            taskset_id=None, chord=None, utc=False, **opts):
         self.app = app_or_default(app)
         self.app = app_or_default(app)
         self.task_name = task_name
         self.task_name = task_name
         self.task_id = task_id
         self.task_id = task_id
@@ -266,7 +266,7 @@ class TaskRequest(object):
         # timezone means the message is timezone-aware, and the only timezone
         # timezone means the message is timezone-aware, and the only timezone
         # supported at this point is UTC.
         # supported at this point is UTC.
         self.tzlocal = timezone.tz_or_local(self.app.conf.CELERY_TIMEZONE)
         self.tzlocal = timezone.tz_or_local(self.app.conf.CELERY_TIMEZONE)
-        tz = tz and timezone.utc or self.tzlocal
+        tz = timezone.utc if utc else self.tzlocal
         if self.eta is not None:
         if self.eta is not None:
             self.eta = timezone.to_local(self.eta, self.tzlocal, tz)
             self.eta = timezone.to_local(self.eta, self.tzlocal, tz)
         if self.expires is not None:
         if self.expires is not None:
@@ -305,7 +305,7 @@ class TaskRequest(object):
                    expires=maybe_iso8601(body.get("expires")),
                    expires=maybe_iso8601(body.get("expires")),
                    on_ack=on_ack,
                    on_ack=on_ack,
                    delivery_info=delivery_info,
                    delivery_info=delivery_info,
-                   tz=body.get("tz", None),
+                   utc=body.get("utc", None),
                    **kw)
                    **kw)
 
 
     def get_instance_attrs(self, loglevel, logfile):
     def get_instance_attrs(self, loglevel, logfile):

+ 13 - 2
docs/internals/protocol.rst

@@ -44,8 +44,9 @@ Message format
     format. If not provided the message is not scheduled, but will be
     format. If not provided the message is not scheduled, but will be
     executed asap.
     executed asap.
 
 
-* expires (introduced after v2.0.2)
+* expires
     `string` (ISO 8601)
     `string` (ISO 8601)
+    .. versionadded:: 2.0.2
 
 
     Expiration date. This is the date and time in ISO 8601 format.
     Expiration date. This is the date and time in ISO 8601 format.
     If not provided the message will never expire. The message
     If not provided the message will never expire. The message
@@ -63,15 +64,25 @@ to process it.
 
 
 
 
 * taskset_id
 * taskset_id
+  `string`
 
 
   The taskset this task is part of.
   The taskset this task is part of.
 
 
 * chord
 * chord
+  `object`
+  .. versionadded:: 2.3
 
 
-  Siginifies that this task is one of the header parts of a chord.  The value
+  Signifies that this task is one of the header parts of a chord.  The value
   of this key is the body of the cord that should be executed when all of
   of this key is the body of the cord that should be executed when all of
   the tasks in the header has returned.
   the tasks in the header has returned.
 
 
+* utc
+  `bool`
+  .. versionadded:: 2.5
+
+  If true time uses the UTC timezone, if not the current local timezone
+  should be used.
+
 Example message
 Example message
 ===============
 ===============