|
@@ -5,10 +5,15 @@ import time
|
|
|
import unittest
|
|
|
from itertools import chain, izip
|
|
|
|
|
|
-from celery.worker import buckets
|
|
|
+from billiard.utils.functional import curry
|
|
|
+
|
|
|
+from celery.task.base import Task
|
|
|
from celery.utils import gen_unique_id
|
|
|
+from celery.tests.utils import skip_if_environ
|
|
|
+from celery.worker import buckets
|
|
|
from celery.registry import TaskRegistry
|
|
|
-from celery.task.base import Task
|
|
|
+
|
|
|
+skip_if_disabled = curry(skip_if_environ("SKIP_RLIMITS"))
|
|
|
|
|
|
|
|
|
class MockJob(object):
|
|
@@ -35,10 +40,12 @@ class MockJob(object):
|
|
|
|
|
|
class TestTokenBucketQueue(unittest.TestCase):
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def empty_queue_yields_QueueEmpty(self):
|
|
|
x = buckets.TokenBucketQueue(fill_rate=10)
|
|
|
self.assertRaises(buckets.QueueEmpty, x.get)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_bucket__put_get(self):
|
|
|
x = buckets.TokenBucketQueue(fill_rate=10)
|
|
|
x.put("The quick brown fox")
|
|
@@ -48,6 +55,7 @@ class TestTokenBucketQueue(unittest.TestCase):
|
|
|
time.sleep(0.2)
|
|
|
self.assertEquals(x.get_nowait(), "The lazy dog")
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_fill_rate(self):
|
|
|
x = buckets.TokenBucketQueue(fill_rate=10)
|
|
|
# 20 items should take at least one second to complete
|
|
@@ -58,6 +66,7 @@ class TestTokenBucketQueue(unittest.TestCase):
|
|
|
x.wait()
|
|
|
self.assertTrue(time.time() - time_start > 1.5)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_can_consume(self):
|
|
|
x = buckets.TokenBucketQueue(fill_rate=1)
|
|
|
x.put("The quick brown fox")
|
|
@@ -67,12 +76,14 @@ class TestTokenBucketQueue(unittest.TestCase):
|
|
|
x.put("The lazy dog")
|
|
|
self.assertRaises(x.RateLimitExceeded, x.get)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_expected_time(self):
|
|
|
x = buckets.TokenBucketQueue(fill_rate=1)
|
|
|
x.put_nowait("The quick brown fox")
|
|
|
self.assertEqual(x.get_nowait(), "The quick brown fox")
|
|
|
self.assertTrue(x.expected_time())
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_qsize(self):
|
|
|
x = buckets.TokenBucketQueue(fill_rate=1)
|
|
|
x.put("The quick brown fox")
|
|
@@ -82,6 +93,7 @@ class TestTokenBucketQueue(unittest.TestCase):
|
|
|
|
|
|
class TestRateLimitString(unittest.TestCase):
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_conversion(self):
|
|
|
self.assertEquals(buckets.parse_ratelimit_string(999), 999)
|
|
|
self.assertEquals(buckets.parse_ratelimit_string("1456/s"), 1456)
|
|
@@ -123,6 +135,7 @@ class TestTaskBuckets(unittest.TestCase):
|
|
|
for task_cls in self.task_classes:
|
|
|
self.registry.register(task_cls)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_auto_add_on_missing(self):
|
|
|
b = buckets.TaskBucket(task_registry=self.registry)
|
|
|
for task_cls in self.task_classes:
|
|
@@ -132,6 +145,7 @@ class TestTaskBuckets(unittest.TestCase):
|
|
|
self.assertTrue(TaskD.name in b.buckets.keys())
|
|
|
self.registry.unregister(TaskD)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_has_rate_limits(self):
|
|
|
b = buckets.TaskBucket(task_registry=self.registry)
|
|
|
self.assertEqual(b.buckets[TaskA.name].fill_rate, 10)
|
|
@@ -144,17 +158,20 @@ class TestTaskBuckets(unittest.TestCase):
|
|
|
finally:
|
|
|
self.registry.unregister(TaskD)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_on_empty_buckets__get_raises_empty(self):
|
|
|
b = buckets.TaskBucket(task_registry=self.registry)
|
|
|
self.assertRaises(buckets.QueueEmpty, b.get)
|
|
|
self.assertEqual(b.qsize(), 0)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_put__get(self):
|
|
|
b = buckets.TaskBucket(task_registry=self.registry)
|
|
|
job = MockJob(gen_unique_id(), TaskA.name, ["theqbf"], {"foo": "bar"})
|
|
|
b.put(job)
|
|
|
self.assertEquals(b.get(), job)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_fill_rate(self):
|
|
|
b = buckets.TaskBucket(task_registry=self.registry)
|
|
|
|
|
@@ -171,6 +188,7 @@ class TestTaskBuckets(unittest.TestCase):
|
|
|
self.assertEqual(b.get(), job)
|
|
|
self.assertTrue(time.time() - time_start > 1.5)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test__very_busy_queue_doesnt_block_others(self):
|
|
|
b = buckets.TaskBucket(task_registry=self.registry)
|
|
|
|
|
@@ -187,6 +205,7 @@ class TestTaskBuckets(unittest.TestCase):
|
|
|
|
|
|
self.assertTrue(got_ajobs > 2)
|
|
|
|
|
|
+ @skip_if_disabled
|
|
|
def test_thorough__multiple_types(self):
|
|
|
self.registry.register(TaskD)
|
|
|
try:
|