buckets.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.buckets
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. This module implements the rate limiting of tasks,
  6. by having a token bucket queue for each task type.
  7. When a task is allowed to be processed it's moved
  8. over the the ``ready_queue``
  9. The :mod:`celery.worker.mediator` is then responsible
  10. for moving tasks from the ``ready_queue`` to the worker pool.
  11. :copyright: (c) 2009 - 2012 by Ask Solem.
  12. :license: BSD, see LICENSE for more details.
  13. """
  14. from __future__ import absolute_import
  15. from __future__ import with_statement
  16. import threading
  17. from collections import deque
  18. from time import time, sleep
  19. from Queue import Queue, Empty
  20. from kombu.utils.limits import TokenBucket
  21. from celery.utils import timeutils
  22. from celery.utils.compat import zip_longest, chain_from_iterable
  23. class RateLimitExceeded(Exception):
  24. """The token buckets rate limit has been exceeded."""
  25. class TaskBucket(object):
  26. """This is a collection of token buckets, each task type having
  27. its own token bucket. If the task type doesn't have a rate limit,
  28. it will have a plain :class:`~Queue.Queue` object instead of a
  29. :class:`TokenBucketQueue`.
  30. The :meth:`put` operation forwards the task to its appropriate bucket,
  31. while the :meth:`get` operation iterates over the buckets and retrieves
  32. the first available item.
  33. Say we have three types of tasks in the registry: `twitter.update`,
  34. `feed.refresh` and `video.compress`, the TaskBucket will consist
  35. of the following items::
  36. {"twitter.update": TokenBucketQueue(fill_rate=300),
  37. "feed.refresh": Queue(),
  38. "video.compress": TokenBucketQueue(fill_rate=2)}
  39. The get operation will iterate over these until one of the buckets
  40. is able to return an item. The underlying datastructure is a `dict`,
  41. so the order is ignored here.
  42. :param task_registry: The task registry used to get the task
  43. type class for a given task name.
  44. """
  45. def __init__(self, task_registry):
  46. self.task_registry = task_registry
  47. self.buckets = {}
  48. self.init_with_registry()
  49. self.immediate = deque()
  50. self.mutex = threading.Lock()
  51. self.not_empty = threading.Condition(self.mutex)
  52. def put(self, request):
  53. """Put a :class:`~celery.worker.job.Request` into
  54. the appropiate bucket."""
  55. if request.name not in self.buckets:
  56. self.add_bucket_for_type(request.name)
  57. self.buckets[request.name].put_nowait(request)
  58. with self.mutex:
  59. self.not_empty.notify()
  60. put_nowait = put
  61. def _get_immediate(self):
  62. try:
  63. return self.immediate.popleft()
  64. except IndexError:
  65. raise Empty()
  66. def _get(self):
  67. # If the first bucket is always returning items, we would never
  68. # get to fetch items from the other buckets. So we always iterate over
  69. # all the buckets and put any ready items into a queue called
  70. # "immediate". This queue is always checked for cached items first.
  71. try:
  72. return 0, self._get_immediate()
  73. except Empty:
  74. pass
  75. remaining_times = []
  76. for bucket in self.buckets.values():
  77. remaining = bucket.expected_time()
  78. if not remaining:
  79. try:
  80. # Just put any ready items into the immediate queue.
  81. self.immediate.append(bucket.get_nowait())
  82. except Empty:
  83. pass
  84. except RateLimitExceeded:
  85. remaining_times.append(bucket.expected_time())
  86. else:
  87. remaining_times.append(remaining)
  88. # Try the immediate queue again.
  89. try:
  90. return 0, self._get_immediate()
  91. except Empty:
  92. if not remaining_times:
  93. # No items in any of the buckets.
  94. raise
  95. # There's items, but have to wait before we can retrieve them,
  96. # return the shortest remaining time.
  97. return min(remaining_times), None
  98. def get(self, block=True, timeout=None):
  99. """Retrive the task from the first available bucket.
  100. Available as in, there is an item in the queue and you can
  101. consume tokens from it.
  102. """
  103. tstart = time()
  104. get = self._get
  105. not_empty = self.not_empty
  106. with not_empty:
  107. while 1:
  108. try:
  109. remaining_time, item = get()
  110. except Empty:
  111. if not block or (timeout and time() - tstart > timeout):
  112. raise
  113. not_empty.wait(timeout)
  114. continue
  115. if remaining_time:
  116. if not block or (timeout and time() - tstart > timeout):
  117. raise Empty()
  118. sleep(min(remaining_time, timeout or 1))
  119. else:
  120. return item
  121. def get_nowait(self):
  122. return self.get(block=False)
  123. def init_with_registry(self):
  124. """Initialize with buckets for all the task types in the registry."""
  125. for task in self.task_registry.keys():
  126. self.add_bucket_for_type(task)
  127. def refresh(self):
  128. """Refresh rate limits for all task types in the registry."""
  129. for task in self.task_registry.keys():
  130. self.update_bucket_for_type(task)
  131. def get_bucket_for_type(self, task_name):
  132. """Get the bucket for a particular task type."""
  133. if task_name not in self.buckets:
  134. return self.add_bucket_for_type(task_name)
  135. return self.buckets[task_name]
  136. def _get_queue_for_type(self, task_name):
  137. bucket = self.buckets[task_name]
  138. if isinstance(bucket, TokenBucketQueue):
  139. return bucket.queue
  140. return bucket
  141. def update_bucket_for_type(self, task_name):
  142. task_type = self.task_registry[task_name]
  143. rate_limit = getattr(task_type, "rate_limit", None)
  144. rate_limit = timeutils.rate(rate_limit)
  145. task_queue = FastQueue()
  146. if task_name in self.buckets:
  147. task_queue = self._get_queue_for_type(task_name)
  148. else:
  149. task_queue = FastQueue()
  150. if rate_limit:
  151. task_queue = TokenBucketQueue(rate_limit, queue=task_queue)
  152. self.buckets[task_name] = task_queue
  153. return task_queue
  154. def add_bucket_for_type(self, task_name):
  155. """Add a bucket for a task type.
  156. Will read the tasks rate limit and create a :class:`TokenBucketQueue`
  157. if it has one. If the task doesn't have a rate limit
  158. :class:`FastQueue` will be used instead.
  159. """
  160. if task_name not in self.buckets:
  161. return self.update_bucket_for_type(task_name)
  162. def qsize(self):
  163. """Get the total size of all the queues."""
  164. return sum(bucket.qsize() for bucket in self.buckets.values())
  165. def empty(self):
  166. """Returns :const:`True` if all of the buckets are empty."""
  167. return all(bucket.empty() for bucket in self.buckets.values())
  168. def clear(self):
  169. """Delete the data in all of the buckets."""
  170. for bucket in self.buckets.values():
  171. bucket.clear()
  172. @property
  173. def items(self):
  174. """Flattens the data in all of the buckets into a single list."""
  175. # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
  176. # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
  177. return filter(None, chain_from_iterable(zip_longest(*[bucket.items
  178. for bucket in self.buckets.values()])))
  179. class FastQueue(Queue):
  180. """:class:`Queue.Queue` supporting the interface of
  181. :class:`TokenBucketQueue`."""
  182. def clear(self):
  183. return self.queue.clear()
  184. def expected_time(self, tokens=1):
  185. return 0
  186. def wait(self, block=True):
  187. return self.get(block=block)
  188. @property
  189. def items(self):
  190. return self.queue
  191. class TokenBucketQueue(object):
  192. """Queue with rate limited get operations.
  193. This uses the token bucket algorithm to rate limit the queue on get
  194. operations.
  195. :param fill_rate: The rate in tokens/second that the bucket will
  196. be refilled.
  197. :keyword capacity: Maximum number of tokens in the bucket.
  198. Default is 1.
  199. """
  200. RateLimitExceeded = RateLimitExceeded
  201. def __init__(self, fill_rate, queue=None, capacity=1):
  202. self._bucket = TokenBucket(fill_rate, capacity)
  203. self.queue = queue
  204. if not self.queue:
  205. self.queue = Queue()
  206. def put(self, item, block=True):
  207. """Put an item onto the queue."""
  208. self.queue.put(item, block=block)
  209. def put_nowait(self, item):
  210. """Put an item into the queue without blocking.
  211. :raises Queue.Full: If a free slot is not immediately available.
  212. """
  213. return self.put(item, block=False)
  214. def get(self, block=True):
  215. """Remove and return an item from the queue.
  216. :raises RateLimitExceeded: If a token could not be consumed from the
  217. token bucket (consuming from the queue
  218. too fast).
  219. :raises Queue.Empty: If an item is not immediately available.
  220. """
  221. get = block and self.queue.get or self.queue.get_nowait
  222. if not block and not self.items:
  223. raise Empty()
  224. if not self._bucket.can_consume(1):
  225. raise RateLimitExceeded()
  226. return get()
  227. def get_nowait(self):
  228. """Remove and return an item from the queue without blocking.
  229. :raises RateLimitExceeded: If a token could not be consumed from the
  230. token bucket (consuming from the queue
  231. too fast).
  232. :raises Queue.Empty: If an item is not immediately available.
  233. """
  234. return self.get(block=False)
  235. def qsize(self):
  236. """Returns the size of the queue."""
  237. return self.queue.qsize()
  238. def empty(self):
  239. """Returns :const:`True` if the queue is empty."""
  240. return self.queue.empty()
  241. def clear(self):
  242. """Delete all data in the queue."""
  243. return self.items.clear()
  244. def wait(self, block=False):
  245. """Wait until a token can be retrieved from the bucket and return
  246. the next item."""
  247. get = self.get
  248. expected_time = self.expected_time
  249. while 1:
  250. remaining = expected_time()
  251. if not remaining:
  252. return get(block=block)
  253. sleep(remaining)
  254. def expected_time(self, tokens=1):
  255. """Returns the expected time in seconds of when a new token should be
  256. available."""
  257. if not self.items:
  258. return 0
  259. return self._bucket.expected_time(tokens)
  260. @property
  261. def items(self):
  262. """Underlying data. Do not modify."""
  263. return self.queue.queue