Sfoglia il codice sorgente

Fixes time vs monotonic problems

Ask Solem 11 anni fa
parent
commit
2e01f59738

+ 3 - 2
celery/concurrency/eventlet.py

@@ -10,6 +10,8 @@ from __future__ import absolute_import
 
 
 import sys
 import sys
 
 
+from time import time
+
 __all__ = ['TaskPool']
 __all__ = ['TaskPool']
 
 
 W_RACE = """\
 W_RACE = """\
@@ -28,7 +30,6 @@ for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
 
 
 
 
 from celery import signals
 from celery import signals
-from celery.five import monotonic
 from celery.utils import timer2
 from celery.utils import timer2
 
 
 from . import base
 from . import base
@@ -52,7 +53,7 @@ class Schedule(timer2.Schedule):
         self._queue = set()
         self._queue = set()
 
 
     def _enter(self, eta, priority, entry):
     def _enter(self, eta, priority, entry):
-        secs = max(eta - monotonic(), 0)
+        secs = max(eta - time(), 0)
         g = self._spawn_after(secs, entry)
         g = self._spawn_after(secs, entry)
         self._queue.add(g)
         self._queue.add(g)
         g.link(self._entry_exit, entry)
         g.link(self._entry_exit, entry)

+ 3 - 2
celery/concurrency/gevent.py

@@ -8,12 +8,13 @@
 """
 """
 from __future__ import absolute_import
 from __future__ import absolute_import
 
 
+from time import time
+
 try:
 try:
     from gevent import Timeout
     from gevent import Timeout
 except ImportError:  # pragma: no cover
 except ImportError:  # pragma: no cover
     Timeout = None  # noqa
     Timeout = None  # noqa
 
 
-from celery.five import monotonic
 from celery.utils import timer2
 from celery.utils import timer2
 
 
 from .base import apply_target, BasePool
 from .base import apply_target, BasePool
@@ -47,7 +48,7 @@ class Schedule(timer2.Schedule):
         self._queue = set()
         self._queue = set()
 
 
     def _enter(self, eta, priority, entry):
     def _enter(self, eta, priority, entry):
-        secs = max(eta - monotonic(), 0)
+        secs = max(eta - time(), 0)
         g = self._Greenlet.spawn_later(secs, entry)
         g = self._Greenlet.spawn_later(secs, entry)
         self._queue.add(g)
         self._queue.add(g)
         g.link(self._entry_exit)
         g.link(self._entry_exit)

+ 5 - 4
celery/concurrency/processes.py

@@ -23,6 +23,7 @@ import random
 import select
 import select
 import socket
 import socket
 import struct
 import struct
+import time
 
 
 from collections import deque, namedtuple
 from collections import deque, namedtuple
 from pickle import HIGHEST_PROTOCOL
 from pickle import HIGHEST_PROTOCOL
@@ -47,7 +48,7 @@ from celery import signals
 from celery._state import set_default_app
 from celery._state import set_default_app
 from celery.app import trace
 from celery.app import trace
 from celery.concurrency.base import BasePool
 from celery.concurrency.base import BasePool
-from celery.five import Counter, items, values, monotonic
+from celery.five import Counter, items, values
 from celery.utils.log import get_logger
 from celery.utils.log import get_logger
 
 
 __all__ = ['TaskPool']
 __all__ = ['TaskPool']
@@ -338,7 +339,7 @@ class AsynPool(_pool.Pool):
 
 
         hub.on_tick.add(self.on_poll_start)
         hub.on_tick.add(self.on_poll_start)
 
 
-    def _create_timelimit_handlers(self, hub, now=monotonic):
+    def _create_timelimit_handlers(self, hub, now=time.time):
         """For async pool this sets up the handlers used
         """For async pool this sets up the handlers used
         to implement time limits."""
         to implement time limits."""
         call_later = hub.call_later
         call_later = hub.call_later
@@ -368,11 +369,11 @@ class AsynPool(_pool.Pool):
             _discard_tref(R._job)
             _discard_tref(R._job)
         self.on_timeout_cancel = on_timeout_cancel
         self.on_timeout_cancel = on_timeout_cancel
 
 
-    def _on_soft_timeout(self, job, soft, hard, hub, now=monotonic):
+    def _on_soft_timeout(self, job, soft, hard, hub, now=time.time):
         # only used by async pool.
         # only used by async pool.
         if hard:
         if hard:
             self._tref_for_id[job] = hub.call_at(
             self._tref_for_id[job] = hub.call_at(
-                now() + (hard - soft), self._on_hard_timeout, job,
+                time() + (hard - soft), self._on_hard_timeout, job,
             )
             )
         try:
         try:
             result = self._cache[job]
             result = self._cache[job]

+ 4 - 3
celery/datastructures.py

@@ -9,6 +9,7 @@
 from __future__ import absolute_import, print_function, unicode_literals
 from __future__ import absolute_import, print_function, unicode_literals
 
 
 import sys
 import sys
+import time
 
 
 from collections import defaultdict, Mapping, MutableMapping, MutableSet
 from collections import defaultdict, Mapping, MutableMapping, MutableSet
 from heapq import heapify, heappush, heappop
 from heapq import heapify, heappush, heappop
@@ -19,7 +20,7 @@ from billiard.einfo import ExceptionInfo  # noqa
 from kombu.utils.encoding import safe_str
 from kombu.utils.encoding import safe_str
 from kombu.utils.limits import TokenBucket  # noqa
 from kombu.utils.limits import TokenBucket  # noqa
 
 
-from celery.five import items, monotonic
+from celery.five import items
 from celery.utils.functional import LRUCache, first, uniq  # noqa
 from celery.utils.functional import LRUCache, first, uniq  # noqa
 
 
 DOT_HEAD = """
 DOT_HEAD = """
@@ -560,7 +561,7 @@ class LimitedSet(object):
         self.__len__ = self._data.__len__
         self.__len__ = self._data.__len__
         self.__contains__ = self._data.__contains__
         self.__contains__ = self._data.__contains__
 
 
-    def add(self, value, now=monotonic):
+    def add(self, value, now=time.time):
         """Add a new member."""
         """Add a new member."""
         # offset is there to modify the length of the list,
         # offset is there to modify the length of the list,
         # this way we can expire an item before inserting the value,
         # this way we can expire an item before inserting the value,
@@ -588,7 +589,7 @@ class LimitedSet(object):
         self._data.pop(value, None)
         self._data.pop(value, None)
     pop_value = discard  # XXX compat
     pop_value = discard  # XXX compat
 
 
-    def purge(self, limit=None, offset=0, now=monotonic):
+    def purge(self, limit=None, offset=0, now=time.time):
         """Purge expired items."""
         """Purge expired items."""
         H, maxlen = self._heap, self.maxlen
         H, maxlen = self._heap, self.maxlen
         if not maxlen:
         if not maxlen:

+ 2 - 3
celery/events/__init__.py

@@ -25,7 +25,6 @@ from kombu.mixins import ConsumerMixin
 from kombu.utils import cached_property
 from kombu.utils import cached_property
 
 
 from celery.app import app_or_default
 from celery.app import app_or_default
-from celery.five import monotonic
 from celery.utils import uuid
 from celery.utils import uuid
 from celery.utils.functional import dictfilter
 from celery.utils.functional import dictfilter
 from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
 from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
@@ -45,7 +44,7 @@ def get_exchange(conn):
     return ex
     return ex
 
 
 
 
-def Event(type, _fields=None, __dict__=dict, __now__=monotonic, **fields):
+def Event(type, _fields=None, __dict__=dict, __now__=time.time, **fields):
     """Create an event.
     """Create an event.
 
 
     An event is a dictionary, the only required field is ``type``.
     An event is a dictionary, the only required field is ``type``.
@@ -321,7 +320,7 @@ class EventReceiver(ConsumerMixin):
                                    channel=channel)
                                    channel=channel)
 
 
     def event_from_message(self, body, localize=True,
     def event_from_message(self, body, localize=True,
-                           now=monotonic, tzfields=_TZGETTER,
+                           now=time.time, tzfields=_TZGETTER,
                            adjust_timestamp=adjust_timestamp):
                            adjust_timestamp=adjust_timestamp):
         type = body.get('type', '').lower()
         type = body.get('type', '').lower()
         clock = body.get('clock')
         clock = body.get('clock')

+ 3 - 2
celery/events/cursesmon.py

@@ -15,12 +15,13 @@ import threading
 from datetime import datetime
 from datetime import datetime
 from itertools import count
 from itertools import count
 from textwrap import wrap
 from textwrap import wrap
+from time import time
 from math import ceil
 from math import ceil
 
 
 from celery import VERSION_BANNER
 from celery import VERSION_BANNER
 from celery import states
 from celery import states
 from celery.app import app_or_default
 from celery.app import app_or_default
-from celery.five import items, values, monotonic
+from celery.five import items, values
 from celery.utils.text import abbr, abbrtask
 from celery.utils.text import abbr, abbrtask
 
 
 __all__ = ['CursesMonitor', 'evtop']
 __all__ = ['CursesMonitor', 'evtop']
@@ -333,7 +334,7 @@ class CursesMonitor(object):  # pragma: no cover
         if task.uuid == self.selected_task:
         if task.uuid == self.selected_task:
             attr = curses.A_STANDOUT
             attr = curses.A_STANDOUT
         timestamp = datetime.utcfromtimestamp(
         timestamp = datetime.utcfromtimestamp(
-            task.timestamp or monotonic(),
+            task.timestamp or time(),
         )
         )
         timef = timestamp.strftime('%H:%M:%S')
         timef = timestamp.strftime('%H:%M:%S')
         hostname = task.worker.hostname if task.worker else '*NONE*'
         hostname = task.worker.hostname if task.worker else '*NONE*'

+ 3 - 2
celery/events/state.py

@@ -23,13 +23,14 @@ import threading
 from datetime import datetime
 from datetime import datetime
 from heapq import heappush, heappop
 from heapq import heappush, heappop
 from itertools import islice
 from itertools import islice
+from time import time
 
 
 from kombu.clocks import timetuple
 from kombu.clocks import timetuple
 from kombu.utils import kwdict
 from kombu.utils import kwdict
 
 
 from celery import states
 from celery import states
 from celery.datastructures import AttributeDict
 from celery.datastructures import AttributeDict
-from celery.five import items, values, monotonic
+from celery.five import items, values
 from celery.utils.functional import LRUCache
 from celery.utils.functional import LRUCache
 from celery.utils.log import get_logger
 from celery.utils.log import get_logger
 
 
@@ -139,7 +140,7 @@ class Worker(AttributeDict):
 
 
     @property
     @property
     def alive(self):
     def alive(self):
-        return bool(self.heartbeats and monotonic() < self.heartbeat_expires)
+        return bool(self.heartbeats and time() < self.heartbeat_expires)
 
 
     @property
     @property
     def id(self):
     def id(self):