hub.py 5.6 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.hub
  4. ~~~~~~~~~~~~~~~~~
  5. Event-loop implementation.
  6. """
  7. from __future__ import absolute_import
  8. from kombu.utils import cached_property
  9. from kombu.utils import eventio
  10. from celery.five import items, range
  11. from celery.utils.timer2 import Schedule
  12. READ, WRITE, ERR = eventio.READ, eventio.WRITE, eventio.ERR
  13. class BoundedSemaphore(object):
  14. """Asynchronous Bounded Semaphore.
  15. Bounded means that the value will stay within the specified
  16. range even if it is released more times than it was acquired.
  17. This type is *not thread safe*.
  18. Example:
  19. >>> x = BoundedSemaphore(2)
  20. >>> def callback(i):
  21. ... print('HELLO {0!r}'.format(i))
  22. >>> x.acquire(callback, 1)
  23. HELLO 1
  24. >>> x.acquire(callback, 2)
  25. HELLO 2
  26. >>> x.acquire(callback, 3)
  27. >>> x._waiters # private, do not access directly
  28. [(callback, 3)]
  29. >>> x.release()
  30. HELLO 3
  31. """
  32. def __init__(self, value):
  33. self.initial_value = self.value = value
  34. self._waiting = []
  35. def acquire(self, callback, *partial_args):
  36. """Acquire semaphore, applying ``callback`` when
  37. the semaphore is ready.
  38. :param callback: The callback to apply.
  39. :param \*partial_args: partial arguments to callback.
  40. """
  41. if self.value <= 0:
  42. self._waiting.append((callback, partial_args))
  43. return False
  44. else:
  45. self.value = max(self.value - 1, 0)
  46. callback(*partial_args)
  47. return True
  48. def release(self):
  49. """Release semaphore.
  50. This will apply any waiting callbacks from previous
  51. calls to :meth:`acquire` done when the semaphore was busy.
  52. """
  53. self.value = min(self.value + 1, self.initial_value)
  54. if self._waiting:
  55. waiter, args = self._waiting.pop()
  56. waiter(*args)
  57. def grow(self, n=1):
  58. """Change the size of the semaphore to hold more values."""
  59. self.initial_value += n
  60. self.value += n
  61. [self.release() for _ in range(n)]
  62. def shrink(self, n=1):
  63. """Change the size of the semaphore to hold less values."""
  64. self.initial_value = max(self.initial_value - n, 0)
  65. self.value = max(self.value - n, 0)
  66. def clear(self):
  67. """Reset the sempahore, including wiping out any waiting callbacks."""
  68. self._waiting[:] = []
  69. self.value = self.initial_value
  70. class Hub(object):
  71. """Event loop object.
  72. :keyword timer: Specify custom :class:`~celery.utils.timer2.Schedule`.
  73. """
  74. #: Flag set if reading from an fd will not block.
  75. READ = READ
  76. #: Flag set if writing to an fd will not block.
  77. WRITE = WRITE
  78. #: Flag set on error, and the fd should be read from asap.
  79. ERR = ERR
  80. #: List of callbacks to be called when the loop is initialized,
  81. #: applied with the hub instance as sole argument.
  82. on_init = None
  83. #: List of callbacks to be called when the loop is exiting,
  84. #: applied with the hub instance as sole argument.
  85. on_close = None
  86. #: List of callbacks to be called when a task is received.
  87. #: Takes no arguments.
  88. on_task = None
  89. def __init__(self, timer=None):
  90. self.timer = Schedule() if timer is None else timer
  91. self.readers = {}
  92. self.writers = {}
  93. self.on_init = []
  94. self.on_close = []
  95. self.on_task = []
  96. def start(self):
  97. """Called by Hub bootstep at worker startup."""
  98. self.poller = eventio.poll()
  99. def stop(self):
  100. """Called by Hub bootstep at worker shutdown."""
  101. self.poller.close()
  102. def init(self):
  103. for callback in self.on_init:
  104. callback(self)
  105. def fire_timers(self, min_delay=1, max_delay=10, max_timers=10):
  106. delay = None
  107. if self.timer._queue:
  108. for i in range(max_timers):
  109. delay, entry = next(self.scheduler)
  110. if entry is None:
  111. break
  112. self.timer.apply_entry(entry)
  113. return min(max(delay or 0, min_delay), max_delay)
  114. def add(self, fd, callback, flags):
  115. self.poller.register(fd, flags)
  116. if not isinstance(fd, int):
  117. fd = fd.fileno()
  118. if flags & READ:
  119. self.readers[fd] = callback
  120. if flags & WRITE:
  121. self.writers[fd] = callback
  122. def add_reader(self, fd, callback):
  123. return self.add(fd, callback, READ | ERR)
  124. def add_writer(self, fd, callback):
  125. return self.add(fd, callback, WRITE)
  126. def update_readers(self, map):
  127. [self.add_reader(*x) for x in items(map)]
  128. def update_writers(self, map):
  129. [self.add_writer(*x) for x in items(map)]
  130. def _unregister(self, fd):
  131. try:
  132. self.poller.unregister(fd)
  133. except (KeyError, OSError):
  134. pass
  135. def remove(self, fd):
  136. fileno = fd.fileno() if not isinstance(fd, int) else fd
  137. self.readers.pop(fileno, None)
  138. self.writers.pop(fileno, None)
  139. self._unregister(fd)
  140. def __enter__(self):
  141. self.init()
  142. return self
  143. def close(self, *args):
  144. [self._unregister(fd) for fd in self.readers]
  145. self.readers.clear()
  146. [self._unregister(fd) for fd in self.writers]
  147. self.writers.clear()
  148. for callback in self.on_close:
  149. callback(self)
  150. __exit__ = close
  151. @cached_property
  152. def scheduler(self):
  153. return iter(self.timer)
  154. class DummyLock(object):
  155. """Pretending to be a lock."""
  156. def __enter__(self):
  157. return self
  158. def __exit__(self, *exc_info):
  159. pass