123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- import threading
- import time
- from collections import deque
- from Queue import Queue, Empty as QueueEmpty
- from celery.datastructures import TokenBucket
- from celery.utils import timeutils
- from celery.utils.compat import all, izip_longest, chain_from_iterable
- class RateLimitExceeded(Exception):
- """The token buckets rate limit has been exceeded."""
- class TaskBucket(object):
- """This is a collection of token buckets, each task type having
- its own token bucket. If the task type doesn't have a rate limit,
- it will have a plain :class:`Queue` object instead of a
- :class:`TokenBucketQueue`.
- The :meth:`put` operation forwards the task to its appropriate bucket,
- while the :meth:`get` operation iterates over the buckets and retrieves
- the first available item.
- Say we have three types of tasks in the registry: ``celery.ping``,
- ``feed.refresh`` and ``video.compress``, the TaskBucket will consist
- of the following items::
- {"celery.ping": TokenBucketQueue(fill_rate=300),
- "feed.refresh": Queue(),
- "video.compress": TokenBucketQueue(fill_rate=2)}
- The get operation will iterate over these until one of the buckets
- is able to return an item. The underlying datastructure is a ``dict``,
- so the order is ignored here.
- :param task_registry: The task registry used to get the task
- type class for a given task name.
- """
- def __init__(self, task_registry):
- self.task_registry = task_registry
- self.buckets = {}
- self.init_with_registry()
- self.immediate = deque()
- self.mutex = threading.Lock()
- self.not_empty = threading.Condition(self.mutex)
- def put(self, request):
- """Put a :class:`~celery.worker.job.TaskRequest` into
- the appropiate bucket."""
- self.mutex.acquire()
- try:
- if request.task_name not in self.buckets:
- self.add_bucket_for_type(request.task_name)
- self.buckets[request.task_name].put_nowait(request)
- self.not_empty.notify()
- finally:
- self.mutex.release()
- put_nowait = put
- def _get_immediate(self):
- try:
- return self.immediate.popleft()
- except IndexError: # Empty
- raise QueueEmpty()
- def _get(self):
- # If the first bucket is always returning items, we would never
- # get to fetch items from the other buckets. So we always iterate over
- # all the buckets and put any ready items into a queue called
- # "immediate". This queue is always checked for cached items first.
- try:
- return 0, self._get_immediate()
- except QueueEmpty:
- pass
- remaining_times = []
- for bucket in self.buckets.values():
- remaining = bucket.expected_time()
- if not remaining:
- try:
- # Just put any ready items into the immediate queue.
- self.immediate.append(bucket.get_nowait())
- except QueueEmpty:
- pass
- except RateLimitExceeded:
- remaining_times.append(bucket.expected_time())
- else:
- remaining_times.append(remaining)
- # Try the immediate queue again.
- try:
- return 0, self._get_immediate()
- except QueueEmpty:
- if not remaining_times:
- # No items in any of the buckets.
- raise
- # There's items, but have to wait before we can retrieve them,
- # return the shortest remaining time.
- return min(remaining_times), None
- def get(self, block=True, timeout=None):
- """Retrive the task from the first available bucket.
- Available as in, there is an item in the queue and you can
- consume tokens from it.
- """
- time_start = time.time()
- did_timeout = lambda: timeout and time.time() - time_start > timeout
- self.not_empty.acquire()
- try:
- while True:
- try:
- remaining_time, item = self._get()
- except QueueEmpty:
- if not block or did_timeout():
- raise
- self.not_empty.wait(timeout)
- continue
- if remaining_time:
- if not block or did_timeout():
- raise QueueEmpty
- time.sleep(min(remaining_time, timeout or 1))
- else:
- return item
- finally:
- self.not_empty.release()
- def get_nowait(self):
- return self.get(block=False)
- def init_with_registry(self):
- """Initialize with buckets for all the task types in the registry."""
- for task in self.task_registry.keys():
- self.add_bucket_for_type(task)
- def refresh(self):
- """Refresh rate limits for all task types in the registry."""
- for task in self.task_registry.keys():
- self.update_bucket_for_type(task)
- def get_bucket_for_type(self, task_name):
- """Get the bucket for a particular task type."""
- if task_name not in self.buckets:
- return self.add_bucket_for_type(task_name)
- return self.buckets[task_name]
- def _get_queue_for_type(self, task_name):
- bucket = self.buckets[task_name]
- if isinstance(bucket, TokenBucketQueue):
- return bucket.queue
- return bucket
- def update_bucket_for_type(self, task_name):
- task_type = self.task_registry[task_name]
- rate_limit = getattr(task_type, "rate_limit", None)
- rate_limit = timeutils.rate(rate_limit)
- task_queue = FastQueue()
- if task_name in self.buckets:
- task_queue = self._get_queue_for_type(task_name)
- else:
- task_queue = FastQueue()
- if rate_limit:
- task_queue = TokenBucketQueue(rate_limit, queue=task_queue)
- self.buckets[task_name] = task_queue
- return task_queue
- def add_bucket_for_type(self, task_name):
- """Add a bucket for a task type.
- Will read the tasks rate limit and create a :class:`TokenBucketQueue`
- if it has one. If the task doesn't have a rate limit a regular Queue
- will be used.
- """
- if task_name not in self.buckets:
- return self.update_bucket_for_type(task_name)
- def qsize(self):
- """Get the total size of all the queues."""
- return sum(bucket.qsize() for bucket in self.buckets.values())
- def empty(self):
- return all(bucket.empty() for bucket in self.buckets.values())
- def clear(self):
- for bucket in self.buckets.values():
- bucket.clear()
- @property
- def items(self):
- # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
- # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
- return filter(None, chain_from_iterable(izip_longest(*[bucket.items
- for bucket in self.buckets.values()])))
- class FastQueue(Queue):
- """:class:`Queue.Queue` supporting the interface of
- :class:`TokenBucketQueue`."""
- def clear(self):
- return self.queue.clear()
- def expected_time(self, tokens=1):
- return 0
- def wait(self, block=True):
- return self.get(block=block)
- @property
- def items(self):
- return self.queue
- class TokenBucketQueue(object):
- """Queue with rate limited get operations.
- This uses the token bucket algorithm to rate limit the queue on get
- operations.
- :param fill_rate: The rate in tokens/second that the bucket will
- be refilled.
- :keyword capacity: Maximum number of tokens in the bucket. Default is 1.
- """
- RateLimitExceeded = RateLimitExceeded
- def __init__(self, fill_rate, queue=None, capacity=1):
- self._bucket = TokenBucket(fill_rate, capacity)
- self.queue = queue
- if not self.queue:
- self.queue = Queue()
- def put(self, item, block=True):
- """Put an item into the queue.
- Also see :meth:`Queue.Queue.put`.
- """
- self.queue.put(item, block=block)
- def put_nowait(self, item):
- """Put an item into the queue without blocking.
- :raises Queue.Full: If a free slot is not immediately available.
- Also see :meth:`Queue.Queue.put_nowait`
- """
- return self.put(item, block=False)
- def get(self, block=True):
- """Remove and return an item from the queue.
- :raises RateLimitExceeded: If a token could not be consumed from the
- token bucket (consuming from the queue too fast).
- :raises Queue.Empty: If an item is not immediately available.
- Also see :meth:`Queue.Queue.get`.
- """
- get = block and self.queue.get or self.queue.get_nowait
- if not self._bucket.can_consume(1):
- raise RateLimitExceeded()
- return get()
- def get_nowait(self):
- """Remove and return an item from the queue without blocking.
- :raises RateLimitExceeded: If a token could not be consumed from the
- token bucket (consuming from the queue too fast).
- :raises Queue.Empty: If an item is not immediately available.
- Also see :meth:`Queue.Queue.get_nowait`.
- """
- return self.get(block=False)
- def qsize(self):
- """Returns the size of the queue.
- See :meth:`Queue.Queue.qsize`.
- """
- return self.queue.qsize()
- def empty(self):
- return self.queue.empty()
- def clear(self):
- return self.items.clear()
- def wait(self, block=False):
- """Wait until a token can be retrieved from the bucket and return
- the next item."""
- while True:
- remaining = self.expected_time()
- if not remaining:
- return self.get(block=block)
- time.sleep(remaining)
- def expected_time(self, tokens=1):
- """Returns the expected time in seconds when a new token should be
- available."""
- return self._bucket.expected_time(tokens)
- @property
- def items(self):
- return self.queue.queue
|