Explorar o código

Small fixes applied after reviewing the code last night.

Ask Solem %!s(int64=15) %!d(string=hai) anos
pai
achega
80df576b7a
Modificáronse 3 ficheiros con 51 adicións e 18 borrados
  1. 29 16
      celery/buckets.py
  2. 3 2
      celery/task/base.py
  3. 19 0
      celery/tests/test_buckets.py

+ 29 - 16
celery/buckets.py

@@ -1,5 +1,6 @@
 import time
-from Queue import Queue, Empty as QueueEmpty
+from Queue import Queue
+from Queue import Empty as QueueEmpty
 
 
 RATE_MODIFIER_MAP = {"s": lambda n: n,
@@ -17,7 +18,11 @@ class RateLimitExceeded(Exception):
 
 def parse_ratelimit_string(rate_limit):
     """Parse rate limit configurations such as ``"100/m"`` or ``"2/h"``
-        and convert them into seconds."""
+        and convert them into seconds.
+
+    Returns ``0`` for no rate limit.
+
+    """
 
     if rate_limit:
         if isinstance(rate_limit, basestring):
@@ -32,9 +37,7 @@ def parse_ratelimit_string(rate_limit):
 
 
 class TaskBucket(object):
-    """A bucket with buckets of tasks. (eh. seriously.)
-
-    This is a collection of token buckets, each task type having
+    """This is a collection of token buckets, each task type having
     its own token bucket. If the task type doesn't have a rate limit,
     it will have a plain Queue object instead of a token bucket queue.
 
@@ -50,7 +53,7 @@ class TaskBucket(object):
          "feed.refresh": Queue(),
          "video.compress": TokenBucketQueue(fill_rate=2)}
 
-    The get operation will iterate over these until one of them
+    The get operation will iterate over these until one of the buckets
     is able to return an item. The underlying datastructure is a ``dict``,
     so the order is ignored here.
 
@@ -92,6 +95,8 @@ class TaskBucket(object):
                     self.immediate.put_nowait(bucket.get_nowait())
                 except QueueEmpty:
                     pass
+                except RateLimitExceeded:
+                    remaining_times.append(bucket.expected_time())
             else:
                 remaining_times.append(remaining)
 
@@ -182,6 +187,9 @@ class TaskBucket(object):
         """Get the total size of all the queues."""
         return sum(bucket.qsize() for bucket in self.buckets.values())
 
+    def empty(self):
+        return all(bucket.empty() for bucket in self.buckets.values())
+
 
 class TokenBucketQueue(object):
     """Queue with rate limited get operations.
@@ -228,6 +236,16 @@ class TokenBucketQueue(object):
         put = self.queue.put if block else self.queue.put_nowait
         put(item)
 
+    def put_nowait(self, item):
+        """Put an item into the queue without blocking.
+
+        :raises Queue.Full: If a free slot is not immediately available.
+
+        Also see :meth:`Queue.Queue.put_nowait`
+
+        """
+        return self.put(item, block=False)
+
     def get(self, block=True):
         """Remove and return an item from the queue.
 
@@ -252,18 +270,10 @@ class TokenBucketQueue(object):
             token bucket (consuming from the queue too fast).
         :raises Queue.Empty: If an item is not immediately available.
 
-        Also see :meth:`Queue.Queue.get_nowait`."""
-        return self.get(block=False)
-
-    def put_nowait(self, item):
-        """Put an item into the queue without blocking.
-
-        :raises Queue.Full: If a free slot is not immediately available.
-
-        Also see :meth:`Queue.Queue.put_nowait`
+        Also see :meth:`Queue.Queue.get_nowait`.
 
         """
-        return self.put(item, block=False)
+        return self.get(block=False)
 
     def qsize(self):
         """Returns the size of the queue.
@@ -273,6 +283,9 @@ class TokenBucketQueue(object):
         """
         return self.queue.qsize()
 
+    def empty(self):
+        return self.queue.empty()
+
     def wait(self, block=False):
         """Wait until a token can be retrieved from the bucket and return
         the next item."""

+ 3 - 2
celery/task/base.py

@@ -75,7 +75,8 @@ class Task(object):
         The rate limits can be specified in seconds, minutes or hours
         by appending ``"/s"``, ``"/m"`` or "``/h"``". If this is an integer
         it is interpreted as seconds. Example: ``"100/m" (hundred tasks a
-        minute).
+        minute). Default is the ``CELERY_DEFAULT_RATE_LIMIT`` setting (which
+        is off if not specified).
 
     .. attribute:: ignore_result
 
@@ -145,7 +146,7 @@ class Task(object):
     max_retries = 3
     default_retry_delay = 3 * 60
     serializer = conf.TASK_SERIALIZER
-    rate_limit = None
+    rate_limit = conf.DEFAULT_RATE_LIMIT
 
     MaxRetriesExceededError = MaxRetriesExceededError
 

+ 19 - 0
celery/tests/test_buckets.py

@@ -170,16 +170,35 @@ class TestTaskBuckets(unittest.TestCase):
             self.assertEqual(b.get(), job)
         self.assertTrue(time.time() - time_start > 1.5)
 
+    def test__very_busy_queue_doesnt_block_others(self):
+        b = buckets.TaskBucket(task_registry=self.registry)
+
+        cjob = lambda i, t: MockJob(gen_unique_id(), t.name, [i], {})
+        ajobs = [cjob(i, TaskA) for i in xrange(10)]
+        bjobs = [cjob(i, TaskB) for i in xrange(20)]
+        jobs = list(chain(*izip(bjobs, ajobs)))
+        map(b.put, jobs)
+
+        got_ajobs = 0
+        for job in (b.get() for i in xrange(20)):
+            if job.task_name == TaskA.name:
+                got_ajobs += 1
+
+        self.assertTrue(got_ajobs > 2)
+
+
     def test_thorough__multiple_types(self):
         self.registry.register(TaskD)
         try:
             b = buckets.TaskBucket(task_registry=self.registry)
 
             cjob = lambda i, t: MockJob(gen_unique_id(), t.name, [i], {})
+
             ajobs = [cjob(i, TaskA) for i in xrange(10)]
             bjobs = [cjob(i, TaskB) for i in xrange(10)]
             cjobs = [cjob(i, TaskC) for i in xrange(10)]
             djobs = [cjob(i, TaskD) for i in xrange(10)]
+
             # Spread the jobs around.
             jobs = list(chain(*izip(ajobs, bjobs, cjobs, djobs)))