buckets.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. import time
  2. from collections import deque
  3. from Queue import Queue, Empty as QueueEmpty
  4. from celery.utils import all
  5. from celery.utils import timeutils
  6. from celery.utils.compat import izip_longest, chain_from_iterable
  7. class RateLimitExceeded(Exception):
  8. """The token buckets rate limit has been exceeded."""
  9. class TaskBucket(object):
  10. """This is a collection of token buckets, each task type having
  11. its own token bucket. If the task type doesn't have a rate limit,
  12. it will have a plain :class:`Queue` object instead of a
  13. :class:`TokenBucketQueue`.
  14. The :meth:`put` operation forwards the task to its appropriate bucket,
  15. while the :meth:`get` operation iterates over the buckets and retrieves
  16. the first available item.
  17. Say we have three types of tasks in the registry: ``celery.ping``,
  18. ``feed.refresh`` and ``video.compress``, the TaskBucket will consist
  19. of the following items::
  20. {"celery.ping": TokenBucketQueue(fill_rate=300),
  21. "feed.refresh": Queue(),
  22. "video.compress": TokenBucketQueue(fill_rate=2)}
  23. The get operation will iterate over these until one of the buckets
  24. is able to return an item. The underlying datastructure is a ``dict``,
  25. so the order is ignored here.
  26. :param task_registry: The task registry used to get the task
  27. type class for a given task name.
  28. """
  29. def __init__(self, task_registry):
  30. self.task_registry = task_registry
  31. self.buckets = {}
  32. self.init_with_registry()
  33. self.immediate = deque()
  34. def put(self, request):
  35. """Put a :class:`~celery.worker.job.TaskRequest` into
  36. the appropiate bucket."""
  37. if request.task_name not in self.buckets:
  38. self.add_bucket_for_type(request.task_name)
  39. self.buckets[request.task_name].put_nowait(request)
  40. put_nowait = put
  41. def _get_immediate(self):
  42. try:
  43. return self.immediate.popleft()
  44. except IndexError: # Empty
  45. raise QueueEmpty()
  46. def _get(self):
  47. # If the first bucket is always returning items, we would never
  48. # get to fetch items from the other buckets. So we always iterate over
  49. # all the buckets and put any ready items into a queue called
  50. # "immediate". This queue is always checked for cached items first.
  51. try:
  52. return 0, self._get_immediate()
  53. except QueueEmpty:
  54. pass
  55. remaining_times = []
  56. for bucket in self.buckets.values():
  57. remaining = bucket.expected_time()
  58. if not remaining:
  59. try:
  60. # Just put any ready items into the immediate queue.
  61. self.immediate.append(bucket.get_nowait())
  62. except QueueEmpty:
  63. pass
  64. except RateLimitExceeded:
  65. remaining_times.append(bucket.expected_time())
  66. else:
  67. remaining_times.append(remaining)
  68. # Try the immediate queue again.
  69. try:
  70. return 0, self._get_immediate()
  71. except QueueEmpty:
  72. if not remaining_times:
  73. # No items in any of the buckets.
  74. raise
  75. # There's items, but have to wait before we can retrieve them,
  76. # return the shortest remaining time.
  77. return min(remaining_times), None
  78. def get(self, block=True, timeout=None):
  79. """Retrive the task from the first available bucket.
  80. Available as in, there is an item in the queue and you can
  81. consume tokens from it.
  82. """
  83. time_start = time.time()
  84. did_timeout = lambda: timeout and time.time() - time_start > timeout
  85. while True:
  86. remaining_time, item = self._get()
  87. if remaining_time:
  88. if not block or did_timeout():
  89. raise QueueEmpty
  90. time.sleep(min(remaining_time, timeout or 1))
  91. else:
  92. return item
  93. def get_nowait(self):
  94. return self.get(block=False)
  95. def init_with_registry(self):
  96. """Initialize with buckets for all the task types in the registry."""
  97. map(self.add_bucket_for_type, self.task_registry.keys())
  98. def refresh(self):
  99. """Refresh rate limits for all task types in the registry."""
  100. map(self.update_bucket_for_type, self.task_registry.keys())
  101. def get_bucket_for_type(self, task_name):
  102. """Get the bucket for a particular task type."""
  103. if task_name not in self.buckets:
  104. return self.add_bucket_for_type(task_name)
  105. return self.buckets[task_name]
  106. def _get_queue_for_type(self, task_name):
  107. bucket = self.buckets[task_name]
  108. if isinstance(bucket, TokenBucketQueue):
  109. return bucket.queue
  110. return bucket
  111. def update_bucket_for_type(self, task_name):
  112. task_type = self.task_registry[task_name]
  113. rate_limit = getattr(task_type, "rate_limit", None)
  114. rate_limit = timeutils.rate(rate_limit)
  115. task_queue = FastQueue()
  116. if task_name in self.buckets:
  117. task_queue = self._get_queue_for_type(task_name)
  118. else:
  119. task_queue = FastQueue()
  120. if rate_limit:
  121. task_queue = TokenBucketQueue(rate_limit, queue=task_queue)
  122. self.buckets[task_name] = task_queue
  123. return task_queue
  124. def add_bucket_for_type(self, task_name):
  125. """Add a bucket for a task type.
  126. Will read the tasks rate limit and create a :class:`TokenBucketQueue`
  127. if it has one. If the task doesn't have a rate limit a regular Queue
  128. will be used.
  129. """
  130. if task_name not in self.buckets:
  131. return self.update_bucket_for_type(task_name)
  132. def qsize(self):
  133. """Get the total size of all the queues."""
  134. return sum(bucket.qsize() for bucket in self.buckets.values())
  135. def empty(self):
  136. return all(bucket.empty() for bucket in self.buckets.values())
  137. def clear(self):
  138. for bucket in self.buckets.values():
  139. bucket.clear()
  140. @property
  141. def items(self):
  142. # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
  143. # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
  144. return filter(None, chain_from_iterable(izip_longest(*[bucket.items
  145. for bucket in self.buckets.values()])))
  146. class FastQueue(Queue):
  147. """:class:`Queue.Queue` supporting the interface of
  148. :class:`TokenBucketQueue`."""
  149. def clear(self):
  150. return self.queue.clear()
  151. def expected_time(self, tokens=1):
  152. return 0
  153. def can_consume(self, tokens=1):
  154. return True
  155. def wait(self, block=True):
  156. return self.get(block=block)
  157. @property
  158. def items(self):
  159. return self.queue
  160. class TokenBucketQueue(object):
  161. """Queue with rate limited get operations.
  162. This uses the token bucket algorithm to rate limit the queue on get
  163. operations.
  164. See http://en.wikipedia.org/wiki/Token_Bucket
  165. Most of this code was stolen from an entry in the ASPN Python Cookbook:
  166. http://code.activestate.com/recipes/511490/
  167. :param fill_rate: see :attr:`fill_rate`.
  168. :keyword capacity: see :attr:`capacity`.
  169. .. attribute:: fill_rate
  170. The rate in tokens/second that the bucket will be refilled.
  171. .. attribute:: capacity
  172. Maximum number of tokens in the bucket. Default is ``1``.
  173. .. attribute:: timestamp
  174. Timestamp of the last time a token was taken out of the bucket.
  175. """
  176. RateLimitExceeded = RateLimitExceeded
  177. def __init__(self, fill_rate, queue=None, capacity=1):
  178. self.capacity = float(capacity)
  179. self._tokens = self.capacity
  180. self.queue = queue
  181. if not self.queue:
  182. self.queue = Queue()
  183. self.fill_rate = float(fill_rate)
  184. self.timestamp = time.time()
  185. def put(self, item, block=True):
  186. """Put an item into the queue.
  187. Also see :meth:`Queue.Queue.put`.
  188. """
  189. self.queue.put(item, block=block)
  190. def put_nowait(self, item):
  191. """Put an item into the queue without blocking.
  192. :raises Queue.Full: If a free slot is not immediately available.
  193. Also see :meth:`Queue.Queue.put_nowait`
  194. """
  195. return self.put(item, block=False)
  196. def get(self, block=True):
  197. """Remove and return an item from the queue.
  198. :raises RateLimitExceeded: If a token could not be consumed from the
  199. token bucket (consuming from the queue too fast).
  200. :raises Queue.Empty: If an item is not immediately available.
  201. Also see :meth:`Queue.Queue.get`.
  202. """
  203. get = block and self.queue.get or self.queue.get_nowait
  204. if not self.can_consume(1):
  205. raise RateLimitExceeded()
  206. return get()
  207. def get_nowait(self):
  208. """Remove and return an item from the queue without blocking.
  209. :raises RateLimitExceeded: If a token could not be consumed from the
  210. token bucket (consuming from the queue too fast).
  211. :raises Queue.Empty: If an item is not immediately available.
  212. Also see :meth:`Queue.Queue.get_nowait`.
  213. """
  214. return self.get(block=False)
  215. def qsize(self):
  216. """Returns the size of the queue.
  217. See :meth:`Queue.Queue.qsize`.
  218. """
  219. return self.queue.qsize()
  220. def empty(self):
  221. return self.queue.empty()
  222. def clear(self):
  223. return self.items.clear()
  224. def wait(self, block=False):
  225. """Wait until a token can be retrieved from the bucket and return
  226. the next item."""
  227. while True:
  228. remaining = self.expected_time()
  229. if not remaining:
  230. return self.get(block=block)
  231. time.sleep(remaining)
  232. def can_consume(self, tokens=1):
  233. """Consume tokens from the bucket. Returns True if there were
  234. sufficient tokens otherwise False."""
  235. if tokens <= self._get_tokens():
  236. self._tokens -= tokens
  237. return True
  238. return False
  239. def expected_time(self, tokens=1):
  240. """Returns the expected time in seconds when a new token should be
  241. available."""
  242. tokens = max(tokens, self._get_tokens())
  243. return (tokens - self._get_tokens()) / self.fill_rate
  244. def _get_tokens(self):
  245. if self._tokens < self.capacity:
  246. now = time.time()
  247. delta = self.fill_rate * (now - self.timestamp)
  248. self._tokens = min(self.capacity, self._tokens + delta)
  249. self.timestamp = now
  250. return self._tokens
  251. @property
  252. def items(self):
  253. return self.queue.queue