ソースを参照

ETA/expires and beat is now UTC, and timezone aware using the CELERY_TIMEZONE setting.

Closes #113.  Will be part of 3.0.
Ask Solem 14 年 前
コミット
619367b04d

+ 7 - 4
celery/app/amqp.py

@@ -19,6 +19,9 @@ from .. import routes as _routes
 from .. import signals
 from ..utils import cached_property, textindent, uuid
 
+# UTC timezone mark.
+TZ_UTC = 0x1
+
 #: List of known options to a Kombu producers send method.
 #: Used to extract the message related options out of any `dict`.
 MSG_OPTIONS = ("mandatory", "priority", "immediate", "routing_key",
@@ -206,10 +209,10 @@ class TaskPublisher(messaging.Publisher):
         if not isinstance(task_kwargs, dict):
             raise ValueError("task kwargs must be a dictionary")
         if countdown:                           # Convert countdown to ETA.
-            now = now or datetime.now()
+            now = now or datetime.utcnow()
             eta = now + timedelta(seconds=countdown)
         if isinstance(expires, int):
-            now = now or datetime.now()
+            now = now or datetime.utcnow()
             expires = now + timedelta(seconds=expires)
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
@@ -220,8 +223,8 @@ class TaskPublisher(messaging.Publisher):
                 "kwargs": task_kwargs or {},
                 "retries": retries or 0,
                 "eta": eta,
-                "expires": expires}
-
+                "expires": expires,
+                "tz": TZ_UTC}
         if taskset_id:
             body["taskset"] = taskset_id
         if chord:

+ 2 - 1
celery/app/defaults.py

@@ -116,6 +116,7 @@ NAMESPACES = {
                 "interval_step": 0.2}, type="dict"),
         "TASK_RESULT_EXPIRES": Option(timedelta(days=1), type="int"),
         "TASK_SERIALIZER": Option("pickle"),
+        "TIMEZONE": Option(None, type="string"),
         "TRACK_STARTED": Option(False, type="bool"),
         "REDIRECT_STDOUTS": Option(True, type="bool"),
         "REDIRECT_STDOUTS_LEVEL": Option("WARNING"),
@@ -124,7 +125,7 @@ NAMESPACES = {
     "CELERYD": {
         "AUTOSCALER": Option("celery.worker.autoscale.Autoscaler"),
         "CONCURRENCY": Option(0, type="int"),
-        "ETA_SCHEDULER": Option(None, type="str"),
+        "ETA_SCHEDULER": Option(None, type="string"),
         "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),
         "HIJACK_ROOT_LOGGER": Option(True, type="bool"),
         "CONSUMER": Option("celery.worker.consumer.Consumer"),

+ 2 - 2
celery/backends/database.py

@@ -128,9 +128,9 @@ class DatabaseBackend(BaseDictBackend):
         expires = self.expires
         try:
             session.query(Task).filter(
-                    Task.date_done < (datetime.now() - expires)).delete()
+                    Task.date_done < (datetime.utcnow() - expires)).delete()
             session.query(TaskSet).filter(
-                    TaskSet.date_done < (datetime.now() - expires)).delete()
+                    TaskSet.date_done < (datetime.utcnow() - expires)).delete()
             session.commit()
         finally:
             session.close()

+ 2 - 2
celery/backends/mongodb.py

@@ -100,7 +100,7 @@ class MongoBackend(BaseDictBackend):
         meta = {"_id": task_id,
                 "status": status,
                 "result": Binary(self.encode(result)),
-                "date_done": datetime.now(),
+                "date_done": datetime.utcnow(),
                 "traceback": Binary(self.encode(traceback))}
 
         db = self._get_database()
@@ -134,7 +134,7 @@ class MongoBackend(BaseDictBackend):
         taskmeta_collection = db[self.mongodb_taskmeta_collection]
         taskmeta_collection.remove({
                 "date_done": {
-                    "$lt": datetime.now() - self.expires,
+                    "$lt": datetime.utcnow() - self.expires,
                  }
         })
 

+ 3 - 3
celery/beat.py

@@ -77,14 +77,14 @@ class ScheduleEntry(object):
         self.total_run_count = total_run_count or 0
 
     def _default_now(self):
-        return datetime.now()
+        return datetime.utcnow()
 
     def next(self, last_run_at=None):
         """Returns a new instance of the same class, but with
         its date and count fields updated."""
         return self.__class__(**dict(self,
-                                     last_run_at=last_run_at or datetime.now(),
-                                     total_run_count=self.total_run_count + 1))
+                                last_run_at=last_run_at or datetime.utcnow(),
+                                total_run_count=self.total_run_count + 1))
     __next__ = next  # for 2to3
 
     def update(self, other):

+ 3 - 3
celery/db/models.py

@@ -26,8 +26,8 @@ class Task(ResultModelBase):
     task_id = sa.Column(sa.String(255), unique=True)
     status = sa.Column(sa.String(50), default=states.PENDING)
     result = sa.Column(PickleType, nullable=True)
-    date_done = sa.Column(sa.DateTime, default=datetime.now,
-                       onupdate=datetime.now, nullable=True)
+    date_done = sa.Column(sa.DateTime, default=datetime.utcnow,
+                       onupdate=datetime.utcnow, nullable=True)
     traceback = sa.Column(sa.Text, nullable=True)
 
     def __init__(self, task_id):
@@ -53,7 +53,7 @@ class TaskSet(ResultModelBase):
                 autoincrement=True, primary_key=True)
     taskset_id = sa.Column(sa.String(255), unique=True)
     result = sa.Column(sa.PickleType, nullable=True)
-    date_done = sa.Column(sa.DateTime, default=datetime.now,
+    date_done = sa.Column(sa.DateTime, default=datetime.utcnow,
                        nullable=True)
 
     def __init__(self, taskset_id, result):

+ 1 - 1
celery/events/cursesmon.py

@@ -300,7 +300,7 @@ class CursesMonitor(object):
         attr = curses.A_NORMAL
         if task.uuid == self.selected_task:
             attr = curses.A_STANDOUT
-        timestamp = datetime.fromtimestamp(
+        timestamp = datetime.utcfromtimestamp(
                         task.timestamp or time.time())
         timef = timestamp.strftime("%H:%M:%S")
         line = self.format_row(task.uuid, task.name,

+ 1 - 1
celery/events/dumper.py

@@ -25,7 +25,7 @@ def humanize_type(type):
 class Dumper(object):
 
     def on_event(self, event):
-        timestamp = datetime.fromtimestamp(event.pop("timestamp"))
+        timestamp = datetime.utcfromtimestamp(event.pop("timestamp"))
         type = event.pop("type").lower()
         hostname = event.pop("hostname")
         if type.startswith("task-"):

+ 1 - 1
celery/schedules.py

@@ -236,7 +236,7 @@ class crontab(schedule):
         return result
 
     def __init__(self, minute='*', hour='*', day_of_week='*',
-            nowfun=datetime.now):
+            nowfun=datetime.utcnow):
         self._orig_minute = minute
         self._orig_hour = hour
         self._orig_day_of_week = day_of_week

+ 6 - 6
celery/tests/test_task/test_task.py

@@ -301,8 +301,8 @@ class TestCeleryTasks(unittest.TestCase):
 
         # With eta.
         presult2 = t1.apply_async(kwargs=dict(name="George Costanza"),
-                                  eta=datetime.now() + timedelta(days=1),
-                                  expires=datetime.now() + timedelta(days=2))
+                                eta=datetime.utcnow() + timedelta(days=1),
+                                expires=datetime.utcnow() + timedelta(days=2))
         self.assertNextTaskDataEqual(consumer, presult2, t1.name,
                 name="George Costanza", test_eta=True, test_expires=True)
 
@@ -512,11 +512,11 @@ class TestPeriodicTask(unittest.TestCase):
 
     def test_remaining_estimate(self):
         self.assertIsInstance(
-            MyPeriodic().remaining_estimate(datetime.now()),
+            MyPeriodic().remaining_estimate(datetime.utcnow()),
             timedelta)
 
     def test_is_due_not_due(self):
-        due, remaining = MyPeriodic().is_due(datetime.now())
+        due, remaining = MyPeriodic().is_due(datetime.utcnow())
         self.assertFalse(due)
         # This assertion may fail if executed in the
         # first minute of an hour, thus 59 instead of 60
@@ -524,7 +524,7 @@ class TestPeriodicTask(unittest.TestCase):
 
     def test_is_due(self):
         p = MyPeriodic()
-        due, remaining = p.is_due(datetime.now() - p.run_every.run_every)
+        due, remaining = p.is_due(datetime.utcnow() - p.run_every.run_every)
         self.assertTrue(due)
         self.assertEqual(remaining,
                          p.timedelta_seconds(p.run_every.run_every))
@@ -695,7 +695,7 @@ class test_crontab_remaining_estimate(unittest.TestCase):
 class test_crontab_is_due(unittest.TestCase):
 
     def setUp(self):
-        self.now = datetime.now()
+        self.now = datetime.utcnow()
         self.next_minute = 60 - self.now.second - 1e-6 * self.now.microsecond
 
     def test_default_crontab_spec(self):

+ 6 - 6
celery/tests/test_worker/test_worker_job.py

@@ -236,16 +236,16 @@ class test_TaskRequest(unittest.TestCase):
         tw.terminate(pool, signal="KILL")
 
     def test_revoked_expires_expired(self):
-        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-        tw.expires = datetime.now() - timedelta(days=1)
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
+                         expires=datetime.utcnow() - timedelta(days=1))
         tw.revoked()
         self.assertIn(tw.task_id, revoked)
         self.assertEqual(mytask.backend.get_status(tw.task_id),
                          states.REVOKED)
 
     def test_revoked_expires_not_expired(self):
-        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-        tw.expires = datetime.now() + timedelta(days=1)
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
+                         expires=datetime.utcnow() + timedelta(days=1))
         tw.revoked()
         self.assertNotIn(tw.task_id, revoked)
         self.assertNotEqual(mytask.backend.get_status(tw.task_id),
@@ -253,9 +253,9 @@ class test_TaskRequest(unittest.TestCase):
 
     def test_revoked_expires_ignore_result(self):
         mytask.ignore_result = True
-        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
+                         expires=datetime.utcnow() - timedelta(days=1))
         try:
-            tw.expires = datetime.now() - timedelta(days=1)
             tw.revoked()
             self.assertIn(tw.task_id, revoked)
             self.assertNotEqual(mytask.backend.get_status(tw.task_id),

+ 4 - 0
celery/utils/timer2.py

@@ -87,6 +87,10 @@ class Schedule(object):
             if not self.handle_error(sys.exc_info()):
                 raise
 
+        if eta is None:
+            # schedule now.
+            eta = time()
+
         heapq.heappush(self._queue, (eta, priority, entry))
         return entry
 

+ 56 - 4
celery/utils/timeutils.py

@@ -1,8 +1,17 @@
 import math
 
+from kombu.utils import cached_property
+
 from datetime import datetime, timedelta
+from dateutil import tz
 from dateutil.parser import parse as parse_iso8601
 
+try:
+    import pytz
+except ImportError:
+    pytz = None
+
+
 DAYNAMES = "sun", "mon", "tue", "wed", "thu", "fri", "sat"
 WEEKDAYS = dict((name, dow) for name, dow in zip(DAYNAMES, range(7)))
 
@@ -10,7 +19,8 @@ RATE_MODIFIER_MAP = {"s": lambda n: n,
                      "m": lambda n: n / 60.0,
                      "h": lambda n: n / 60.0 / 60.0}
 
-HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, "total_seconds")
+
+HAS_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, "total_seconds")
 
 TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
               ("hour", 60 * 60, lambda n: int(math.ceil(n))),
@@ -18,6 +28,48 @@ TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
               ("second", 1, lambda n: "%.2f" % n))
 
 
+class UnknownTimezone(Exception):
+    """No specification exists for the timezone specified.  Consider
+    installing the pytz library to get access to more timezones."""
+
+
+def _is_naive(dt):
+    return bool(dt.tzinfo)
+
+
+class _Zone(object):
+
+    def tz_or_local(self, tzinfo=None):
+        if tzinfo is None:
+            return self.local
+        return self.get_timezone(tzinfo)
+
+    def to_local(self, dt, local=None, orig=None):
+        return dt.replace(tzinfo=orig or self.utc).astimezone(
+                    self.tz_or_local(local))
+
+    def get_timezone(self, zone):
+        if isinstance(zone, basestring):
+            if pytz:
+                return pytz.timezone(zone)
+            zone = tz.gettz(zone)
+            if zone is None:
+                raise UnknownTimezone(UnknownTimezone.__doc__)
+            return zone
+        return zone
+
+    @cached_property
+    def local(self):
+        return tz.tzlocal()
+
+    @cached_property
+    def utc(self):
+        return self.get_timezone("UTC")
+
+timezone = _Zone()
+
+
+
 def maybe_timedelta(delta):
     """Coerces integer to timedelta if `delta` is an integer."""
     if isinstance(delta, (int, float)):
@@ -31,7 +83,7 @@ def timedelta_seconds(delta):  # pragma: no cover
     Doesn't account for negative values.
 
     """
-    if HAVE_TIMEDELTA_TOTAL_SECONDS:
+    if HAS_TIMEDELTA_TOTAL_SECONDS:
         # Should return 0 for negative seconds
         return max(delta.total_seconds(), 0)
     if delta.days < 0:
@@ -72,10 +124,10 @@ def remaining(start, ends_in, now=None, relative=True):
         calculated using :func:`delta_resolution` (i.e. rounded to the
         resolution of `ends_in`).
     :keyword now: Function returning the current time and date,
-        defaults to :func:`datetime.now`.
+        defaults to :func:`datetime.utcnow`.
 
     """
-    now = now or datetime.now()
+    now = now or datetime.utcnow()
 
     end_date = start + ends_in
     if not relative:

+ 1 - 1
celery/worker/control/builtins.py

@@ -126,7 +126,7 @@ def dump_schedule(panel, safe=False, **kwargs):
         return []
 
     formatitem = lambda (i, item): "%s. %s pri%s %r" % (i,
-            datetime.fromtimestamp(item["eta"]),
+            datetime.utcfromtimestamp(item["eta"]),
             item["priority"],
             item["item"])
     info = map(formatitem, enumerate(schedule.info()))

+ 16 - 4
celery/worker/job.py

@@ -18,7 +18,7 @@ from ..execute.trace import TaskTrace
 from ..utils import (noop, kwdict, fun_takes_kwargs,
                      get_symbol_by_name, truncate_text)
 from ..utils.encoding import safe_repr, safe_str, default_encoding
-from ..utils.timeutils import maybe_iso8601
+from ..utils.timeutils import maybe_iso8601, timezone
 
 from . import state
 
@@ -253,7 +253,7 @@ class TaskRequest(object):
             on_ack=noop, retries=0, delivery_info=None, hostname=None,
             email_subject=None, email_body=None, logger=None,
             eventer=None, eta=None, expires=None, app=None,
-            taskset_id=None, chord=None, **opts):
+            taskset_id=None, chord=None, tz=0x1, **opts):
         self.app = app_or_default(app)
         self.task_name = task_name
         self.task_id = task_id
@@ -277,6 +277,15 @@ class TaskRequest(object):
         if self.task.ignore_result:
             self._store_errors = self.task.store_errors_even_if_ignored
 
+        # timezone means the message is timezone-aware, and the only timezone
+        # supported at this point is UTC.
+        self.tzlocal = timezone.tz_or_local(self.app.conf.CELERY_TIMEZONE)
+        tz = tz and timezone.utc or self.tzlocal
+        if self.eta is not None:
+            self.eta = timezone.to_local(self.eta, self.tzlocal, tz)
+        if self.expires is not None:
+            self.expires = timezone.to_local(self.expires, self.tzlocal, tz)
+
     @classmethod
     def from_message(cls, message, body, on_ack=noop, **kw):
         """Create request from a task message.
@@ -302,7 +311,10 @@ class TaskRequest(object):
                    retries=body.get("retries", 0),
                    eta=maybe_iso8601(body.get("eta")),
                    expires=maybe_iso8601(body.get("expires")),
-                   on_ack=on_ack, delivery_info=delivery_info, **kw)
+                   on_ack=on_ack,
+                   delivery_info=delivery_info,
+                   tz=body.get("tz", None),
+                   **kw)
 
     def get_instance_attrs(self, loglevel, logfile):
         return {"logfile": logfile, "loglevel": loglevel,
@@ -393,7 +405,7 @@ class TaskRequest(object):
 
     def maybe_expire(self):
         """If expired, mark the task as revoked."""
-        if self.expires and datetime.now() > self.expires:
+        if self.expires and datetime.now(self.tzlocal) > self.expires:
             state.revoked.add(self.task_id)
             if self._store_errors:
                 self.task.backend.mark_as_revoked(self.task_id)