hub.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.hub
  4. ~~~~~~~~~~~~~~~~~
  5. Event-loop implementation.
  6. """
  7. from __future__ import absolute_import
  8. from kombu.utils import cached_property
  9. from kombu.utils import eventio
  10. from celery.utils.timer2 import Schedule
  11. READ, WRITE, ERR = eventio.READ, eventio.WRITE, eventio.ERR
  12. class BoundedSemaphore(object):
  13. """Asynchronous Bounded Semaphore.
  14. Bounded means that the value will stay within the specified
  15. range even if it is released more times than it was acquired.
  16. This type is *not thread safe*.
  17. Example:
  18. >>> x = BoundedSemaphore(2)
  19. >>> def callback(i):
  20. ... print('HELLO %r' % i)
  21. >>> x.acquire(callback, 1)
  22. HELLO 1
  23. >>> x.acquire(callback, 2)
  24. HELLO 2
  25. >>> x.acquire(callback, 3)
  26. >>> x._waiters # private, do not access directly
  27. [(callback, 3)]
  28. >>> x.release()
  29. HELLO 3
  30. """
  31. def __init__(self, value):
  32. self.initial_value = self.value = value
  33. self._waiting = []
  34. def acquire(self, callback, *partial_args):
  35. """Acquire semaphore, applying ``callback`` when
  36. the semaphore is ready.
  37. :param callback: The callback to apply.
  38. :param \*partial_args: partial arguments to callback.
  39. """
  40. if self.value <= 0:
  41. self._waiting.append((callback, partial_args))
  42. return False
  43. else:
  44. self.value = max(self.value - 1, 0)
  45. callback(*partial_args)
  46. return True
  47. def release(self):
  48. """Release semaphore.
  49. This will apply any waiting callbacks from previous
  50. calls to :meth:`acquire` done when the semaphore was busy.
  51. """
  52. self.value = min(self.value + 1, self.initial_value)
  53. if self._waiting:
  54. waiter, args = self._waiting.pop()
  55. waiter(*args)
  56. def grow(self, n=1):
  57. """Change the size of the semaphore to hold more values."""
  58. self.initial_value += n
  59. self.value += n
  60. [self.release() for _ in xrange(n)]
  61. def shrink(self, n=1):
  62. """Change the size of the semaphore to hold less values."""
  63. self.initial_value = max(self.initial_value - n, 0)
  64. self.value = max(self.value - n, 0)
  65. def clear(self):
  66. """Reset the sempahore, including wiping out any waiting callbacks."""
  67. self._waiting[:] = []
  68. self.value = self.initial_value
  69. class Hub(object):
  70. """Event loop object.
  71. :keyword timer: Specify custom :class:`~celery.utils.timer2.Schedule`.
  72. """
  73. #: Flag set if reading from an fd will not block.
  74. READ = READ
  75. #: Flag set if writing to an fd will not block.
  76. WRITE = WRITE
  77. #: Flag set on error, and the fd should be read from asap.
  78. ERR = ERR
  79. #: List of callbacks to be called when the loop is initialized,
  80. #: applied with the hub instance as sole argument.
  81. on_init = None
  82. #: List of callbacks to be called when the loop is exiting,
  83. #: applied with the hub instance as sole argument.
  84. on_close = None
  85. #: List of callbacks to be called when a task is received.
  86. #: Takes no arguments.
  87. on_task = None
  88. def __init__(self, timer=None):
  89. self.timer = Schedule() if timer is None else timer
  90. self.readers = {}
  91. self.writers = {}
  92. self.on_init = []
  93. self.on_close = []
  94. self.on_task = []
  95. def start(self):
  96. """Called by StartStopComponent at worker startup."""
  97. self.poller = eventio.poll()
  98. def stop(self):
  99. """Called by StartStopComponent at worker shutdown."""
  100. self.poller.close()
  101. def init(self):
  102. for callback in self.on_init:
  103. callback(self)
  104. def fire_timers(self, min_delay=1, max_delay=10, max_timers=10):
  105. delay = None
  106. if self.timer._queue:
  107. for i in xrange(max_timers):
  108. delay, entry = self.scheduler.next()
  109. if entry is None:
  110. break
  111. self.timer.apply_entry(entry)
  112. return min(max(delay, min_delay), max_delay)
  113. def add(self, fd, callback, flags):
  114. self.poller.register(fd, flags)
  115. if not isinstance(fd, int):
  116. fd = fd.fileno()
  117. if flags & READ:
  118. self.readers[fd] = callback
  119. if flags & WRITE:
  120. self.writers[fd] = callback
  121. def add_reader(self, fd, callback):
  122. return self.add(fd, callback, READ | ERR)
  123. def add_writer(self, fd, callback):
  124. return self.add(fd, callback, WRITE)
  125. def update_readers(self, map):
  126. [self.add_reader(*x) for x in map.iteritems()]
  127. def update_writers(self, map):
  128. [self.add_writer(*x) for x in map.iteritems()]
  129. def _unregister(self, fd):
  130. try:
  131. self.poller.unregister(fd)
  132. except (KeyError, OSError):
  133. pass
  134. def remove(self, fd):
  135. fileno = fd.fileno() if not isinstance(fd, int) else fd
  136. self.readers.pop(fileno, None)
  137. self.writers.pop(fileno, None)
  138. self._unregister(fd)
  139. def __enter__(self):
  140. self.init()
  141. return self
  142. def close(self, *args):
  143. [self._unregister(fd) for fd in self.readers]
  144. self.readers.clear()
  145. [self._unregister(fd) for fd in self.writers]
  146. self.writers.clear()
  147. for callback in self.on_close:
  148. callback(self)
  149. __exit__ = close
  150. @cached_property
  151. def scheduler(self):
  152. return iter(self.timer)
  153. class DummyLock(object):
  154. """Pretending to be a lock."""
  155. def __enter__(self):
  156. return self
  157. def __exit__(self, *exc_info):
  158. pass