Browse Source

Use time.monotonic when available

Ask Solem 11 years ago
parent
commit
9fe795e137

+ 3 - 4
celery/backends/amqp.py

@@ -11,7 +11,6 @@
 from __future__ import absolute_import
 
 import socket
-import time
 
 from collections import deque
 from operator import itemgetter
@@ -20,7 +19,7 @@ from kombu import Exchange, Queue, Producer, Consumer
 
 from celery import states
 from celery.exceptions import TimeoutError
-from celery.five import range
+from celery.five import range, monotonic
 from celery.utils.functional import dictfilter
 from celery.utils.log import get_logger
 from celery.utils.timeutils import maybe_s_to_ms
@@ -181,7 +180,7 @@ class AMQPBackend(BaseBackend):
     poll = get_task_meta  # XXX compat
 
     def drain_events(self, connection, consumer,
-                     timeout=None, now=time.time, wait=None):
+                     timeout=None, now=monotonic, wait=None):
         wait = wait or connection.drain_events
         results = {}
 
@@ -218,7 +217,7 @@ class AMQPBackend(BaseBackend):
         return [self._create_binding(task_id) for task_id in ids]
 
     def get_many(self, task_ids, timeout=None,
-                 now=time.time, getfields=itemgetter('status', 'task_id'),
+                 now=monotonic, getfields=itemgetter('status', 'task_id'),
                  READY_STATES=states.READY_STATES, **kwargs):
         with self.app.pool.acquire_channel(block=True) as (conn, channel):
             ids = set(task_ids)

+ 3 - 2
celery/backends/cassandra.py

@@ -20,6 +20,7 @@ import time
 
 from celery import states
 from celery.exceptions import ImproperlyConfigured
+from celery.five import monotonic
 from celery.utils.log import get_logger
 from celery.utils.timeutils import maybe_timedelta, timedelta_seconds
 
@@ -102,7 +103,7 @@ class CassandraBackend(BaseBackend):
         self._column_family = None
 
     def _retry_on_error(self, fun, *args, **kwargs):
-        ts = time.time() + self._retry_timeout
+        ts = monotonic() + self._retry_timeout
         while 1:
             try:
                 return fun(*args, **kwargs)
@@ -113,7 +114,7 @@ class CassandraBackend(BaseBackend):
                     socket.error,
                     socket.timeout,
                     Thrift.TException) as exc:
-                if time.time() > ts:
+                if monotonic() > ts:
                     raise
                 logger.warning('Cassandra error: %r. Retrying...', exc)
                 time.sleep(self._retry_wait)

+ 3 - 3
celery/beat.py

@@ -24,7 +24,7 @@ from kombu.utils.functional import maybe_evaluate
 from . import __version__
 from . import platforms
 from . import signals
-from .five import items, reraise, values
+from .five import items, reraise, values, monotonic
 from .schedules import maybe_schedule, crontab
 from .utils.imports import instantiate
 from .utils.timeutils import humanize_seconds
@@ -215,7 +215,7 @@ class Scheduler(object):
 
     def should_sync(self):
         return (not self._last_sync or
-                (time.time() - self._last_sync) > self.sync_every)
+                (monotonic() - self._last_sync) > self.sync_every)
 
     def reserve(self, entry):
         new_entry = self.schedule[entry.name] = next(entry)
@@ -257,7 +257,7 @@ class Scheduler(object):
             debug('beat: Synchronizing schedule...')
             self.sync()
         finally:
-            self._last_sync = time.time()
+            self._last_sync = monotonic()
 
     def sync(self):
         pass

+ 2 - 2
celery/concurrency/base.py

@@ -10,10 +10,10 @@ from __future__ import absolute_import
 
 import logging
 import os
-import time
 
 from kombu.utils.encoding import safe_repr
 
+from celery.five import monotonic
 from celery.utils import timer2
 from celery.utils.log import get_logger
 
@@ -25,7 +25,7 @@ logger = get_logger('celery.pool')
 def apply_target(target, args=(), kwargs={}, callback=None,
                  accept_callback=None, pid=None, **_):
     if accept_callback:
-        accept_callback(pid or os.getpid(), time.time())
+        accept_callback(pid or os.getpid(), monotonic())
     callback(target(*args, **kwargs))
 
 

+ 2 - 3
celery/concurrency/eventlet.py

@@ -27,9 +27,8 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
             warnings.warn(RuntimeWarning(W_RACE % side))
 
 
-from time import time
-
 from celery import signals
+from celery.five import monotonic
 from celery.utils import timer2
 
 from . import base
@@ -53,7 +52,7 @@ class Schedule(timer2.Schedule):
         self._queue = set()
 
     def _enter(self, eta, priority, entry):
-        secs = max(eta - time(), 0)
+        secs = max(eta - monotonic(), 0)
         g = self._spawn_after(secs, entry)
         self._queue.add(g)
         g.link(self._entry_exit, entry)

+ 2 - 3
celery/concurrency/gevent.py

@@ -13,8 +13,7 @@ try:
 except ImportError:  # pragma: no cover
     Timeout = None  # noqa
 
-from time import time
-
+from celery.five import monotonic
 from celery.utils import timer2
 
 from .base import apply_target, BasePool
@@ -48,7 +47,7 @@ class Schedule(timer2.Schedule):
         self._queue = set()
 
     def _enter(self, eta, priority, entry):
-        secs = max(eta - time(), 0)
+        secs = max(eta - monotonic(), 0)
         g = self._Greenlet.spawn_later(secs, entry)
         self._queue.add(g)
         g.link(self._entry_exit)

+ 4 - 4
celery/concurrency/processes.py

@@ -26,7 +26,7 @@ import struct
 
 from collections import deque, namedtuple
 from pickle import HIGHEST_PROTOCOL
-from time import sleep, time
+from time import sleep
 from weakref import WeakValueDictionary, ref
 
 from amqp.utils import promise
@@ -47,7 +47,7 @@ from celery import signals
 from celery._state import set_default_app
 from celery.app import trace
 from celery.concurrency.base import BasePool
-from celery.five import Counter, items, values
+from celery.five import Counter, items, values, monotonic
 from celery.utils.log import get_logger
 
 __all__ = ['TaskPool']
@@ -338,7 +338,7 @@ class AsynPool(_pool.Pool):
 
         hub.on_tick.add(self.on_poll_start)
 
-    def _create_timelimit_handlers(self, hub, now=time):
+    def _create_timelimit_handlers(self, hub, now=monotonic):
         """For async pool this sets up the handlers used
         to implement time limits."""
         call_later = hub.call_later
@@ -368,7 +368,7 @@ class AsynPool(_pool.Pool):
             _discard_tref(R._job)
         self.on_timeout_cancel = on_timeout_cancel
 
-    def _on_soft_timeout(self, job, soft, hard, hub, now=time):
+    def _on_soft_timeout(self, job, soft, hard, hub, now=monotonic):
         # only used by async pool.
         if hard:
             self._tref_for_id[job] = hub.call_at(

+ 3 - 4
celery/datastructures.py

@@ -9,7 +9,6 @@
 from __future__ import absolute_import, print_function, unicode_literals
 
 import sys
-import time
 
 from collections import defaultdict, Mapping, MutableMapping, MutableSet
 from heapq import heapify, heappush, heappop
@@ -20,7 +19,7 @@ from billiard.einfo import ExceptionInfo  # noqa
 from kombu.utils.encoding import safe_str
 from kombu.utils.limits import TokenBucket  # noqa
 
-from celery.five import items
+from celery.five import items, monotonic
 from celery.utils.functional import LRUCache, first, uniq  # noqa
 
 DOT_HEAD = """
@@ -561,7 +560,7 @@ class LimitedSet(object):
         self.__len__ = self._data.__len__
         self.__contains__ = self._data.__contains__
 
-    def add(self, value, now=time.time):
+    def add(self, value, now=monotonic):
         """Add a new member."""
         # offset is there to modify the length of the list,
         # this way we can expire an item before inserting the value,
@@ -589,7 +588,7 @@ class LimitedSet(object):
         self._data.pop(value, None)
     pop_value = discard  # XXX compat
 
-    def purge(self, limit=None, offset=0, now=time.time):
+    def purge(self, limit=None, offset=0, now=monotonic):
         """Purge expired items."""
         H, maxlen = self._heap, self.maxlen
         if not maxlen:

+ 3 - 2
celery/events/__init__.py

@@ -25,6 +25,7 @@ from kombu.mixins import ConsumerMixin
 from kombu.utils import cached_property
 
 from celery.app import app_or_default
+from celery.five import monotonic
 from celery.utils import uuid
 from celery.utils.functional import dictfilter
 from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
@@ -44,7 +45,7 @@ def get_exchange(conn):
     return ex
 
 
-def Event(type, _fields=None, __dict__=dict, __now__=time.time, **fields):
+def Event(type, _fields=None, __dict__=dict, __now__=monotonic, **fields):
     """Create an event.
 
     An event is a dictionary, the only required field is ``type``.
@@ -320,7 +321,7 @@ class EventReceiver(ConsumerMixin):
                                    channel=channel)
 
     def event_from_message(self, body, localize=True,
-                           now=time.time, tzfields=_TZGETTER,
+                           now=monotonic, tzfields=_TZGETTER,
                            adjust_timestamp=adjust_timestamp):
         type = body.get('type', '').lower()
         clock = body.get('clock')

+ 2 - 3
celery/events/cursesmon.py

@@ -11,7 +11,6 @@ from __future__ import absolute_import, print_function
 import curses
 import sys
 import threading
-import time
 
 from datetime import datetime
 from itertools import count
@@ -21,7 +20,7 @@ from math import ceil
 from celery import VERSION_BANNER
 from celery import states
 from celery.app import app_or_default
-from celery.five import items, values
+from celery.five import items, values, monotonic
 from celery.utils.text import abbr, abbrtask
 
 __all__ = ['CursesMonitor', 'evtop']
@@ -334,7 +333,7 @@ class CursesMonitor(object):  # pragma: no cover
         if task.uuid == self.selected_task:
             attr = curses.A_STANDOUT
         timestamp = datetime.utcfromtimestamp(
-            task.timestamp or time.time(),
+            task.timestamp or monotonic(),
         )
         timef = timestamp.strftime('%H:%M:%S')
         hostname = task.worker.hostname if task.worker else '*NONE*'

+ 2 - 3
celery/events/state.py

@@ -23,14 +23,13 @@ import threading
 from datetime import datetime
 from heapq import heappush, heappop
 from itertools import islice
-from time import time
 
 from kombu.clocks import timetuple
 from kombu.utils import kwdict
 
 from celery import states
 from celery.datastructures import AttributeDict
-from celery.five import items, values
+from celery.five import items, values, monotonic
 from celery.utils.functional import LRUCache
 from celery.utils.log import get_logger
 
@@ -140,7 +139,7 @@ class Worker(AttributeDict):
 
     @property
     def alive(self):
-        return bool(self.heartbeats and time() < self.heartbeat_expires)
+        return bool(self.heartbeats and monotonic() < self.heartbeat_expires)
 
     @property
     def id(self):

+ 5 - 1
celery/five.py

@@ -16,7 +16,7 @@ __all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
            'nextfun', 'reraise', 'WhateverIO', 'with_metaclass',
            'OrderedDict', 'THREAD_TIMEOUT_MAX', 'format_d',
            'class_property', 'reclassmethod', 'create_module',
-           'recreate_module']
+           'recreate_module', 'monotonic']
 
 try:
     from collections import Counter
@@ -45,6 +45,10 @@ try:
 except ImportError:                         # pragma: no cover
     from collections import UserDict        # noqa
 
+try:
+    from time import monotonic
+except ImportError:
+    from time import time as monotonic  # noqa
 
 if PY3:  # pragma: no cover
     import builtins

+ 3 - 3
celery/result.py

@@ -21,7 +21,7 @@ from . import states
 from .app import app_or_default
 from .datastructures import DependencyGraph, GraphFormatter
 from .exceptions import IncompleteStream, TimeoutError
-from .five import items, range, string_t
+from .five import items, range, string_t, monotonic
 
 __all__ = ['ResultBase', 'AsyncResult', 'ResultSet', 'GroupResult',
            'EagerResult', 'from_serializable']
@@ -499,14 +499,14 @@ class ResultSet(ResultBase):
             seconds.
 
         """
-        time_start = time.time()
+        time_start = monotonic()
         remaining = None
 
         results = []
         for result in self.results:
             remaining = None
             if timeout:
-                remaining = timeout - (time.time() - time_start)
+                remaining = timeout - (monotonic() - time_start)
                 if remaining <= 0.0:
                     raise TimeoutError('join operation timed out')
             results.append(result.get(timeout=remaining,

+ 5 - 4
celery/worker/autoscale.py

@@ -16,11 +16,12 @@ from __future__ import absolute_import
 import os
 import threading
 
-from time import sleep, time
+from time import sleep
 
 from kombu.async.semaphore import DummyLock
 
 from celery import bootsteps
+from celery.five import monotonic
 from celery.utils.log import get_logger
 from celery.utils.threads import bgThread
 
@@ -120,13 +121,13 @@ class Autoscaler(bgThread):
             self._shrink(min(n, self.processes))
 
     def scale_up(self, n):
-        self._last_action = time()
+        self._last_action = monotonic()
         return self._grow(n)
 
     def scale_down(self, n):
         if n and self._last_action and (
-                time() - self._last_action > self.keepalive):
-            self._last_action = time()
+                monotonic() - self._last_action > self.keepalive):
+            self._last_action = monotonic()
             return self._shrink(n)
 
     def _grow(self, n):

+ 2 - 3
celery/worker/job.py

@@ -10,7 +10,6 @@
 from __future__ import absolute_import
 
 import logging
-import time
 import socket
 import sys
 
@@ -27,7 +26,7 @@ from celery.exceptions import (
     SoftTimeLimitExceeded, TimeLimitExceeded,
     WorkerLostError, Terminated, RetryTaskError,
 )
-from celery.five import items
+from celery.five import items, monotonic
 from celery.platforms import signals as _signals
 from celery.utils import fun_takes_kwargs
 from celery.utils.functional import noop
@@ -333,7 +332,7 @@ class Request(object):
         if self.store_errors:
             self.task.backend.mark_as_failure(self.id, exc)
 
-    def on_success(self, ret_value, now=None, nowfun=time.time):
+    def on_success(self, ret_value, now=None, nowfun=monotonic):
         """Handler called if the task was successfully processed."""
         if isinstance(ret_value, ExceptionInfo):
             if isinstance(ret_value.exception, (

+ 3 - 3
celery/worker/state.py

@@ -86,8 +86,8 @@ C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or
 if C_BENCH:  # pragma: no cover
     import atexit
 
-    from time import time
     from billiard import current_process
+    from celery.five import monotonic
     from celery.utils.debug import memdump, sample_mem
 
     all_count = 0
@@ -114,7 +114,7 @@ if C_BENCH:  # pragma: no cover
         global bench_first
         now = None
         if bench_start is None:
-            bench_start = now = time()
+            bench_start = now = monotonic()
         if bench_first is None:
             bench_first = now
 
@@ -126,7 +126,7 @@ if C_BENCH:  # pragma: no cover
         global bench_last
         all_count += 1
         if not all_count % bench_every:
-            now = time()
+            now = monotonic()
             diff = now - bench_start
             print('- Time spent processing {0} tasks (since first '
                   'task received): ~{1:.4f}s\n'.format(bench_every, diff))