Explorar el Código

Allow setting the Tasks queue type for the rate limiter, so it can be set to LifoQueue

Ask Solem hace 15 años
padre
commit
79cd3fb5b1
Se han modificado 2 ficheros con 3 adiciones y 1 borrados
  1. 2 0
      celery/task/base.py
  2. 1 1
      celery/worker/buckets.py

+ 2 - 0
celery/task/base.py

@@ -1,5 +1,6 @@
 import sys
 import sys
 from datetime import timedelta
 from datetime import timedelta
+from Queue import Queue
 
 
 from carrot.connection import DjangoBrokerConnection
 from carrot.connection import DjangoBrokerConnection
 
 
@@ -149,6 +150,7 @@ class Task(object):
     default_retry_delay = 3 * 60
     default_retry_delay = 3 * 60
     serializer = conf.TASK_SERIALIZER
     serializer = conf.TASK_SERIALIZER
     rate_limit = conf.DEFAULT_RATE_LIMIT
     rate_limit = conf.DEFAULT_RATE_LIMIT
+    rate_limit_queue_type = Queue
     backend = default_backend
     backend = default_backend
 
 
     MaxRetriesExceededError = MaxRetriesExceededError
     MaxRetriesExceededError = MaxRetriesExceededError

+ 1 - 1
celery/worker/buckets.py

@@ -153,7 +153,7 @@ class TaskBucket(object):
         if task_name in self.buckets:
         if task_name in self.buckets:
             return
             return
         task_type = self.task_registry[task_name]
         task_type = self.task_registry[task_name]
-        task_queue = Queue()
+        task_queue = task_type.rate_limit_queue_type()
         rate_limit = getattr(task_type, "rate_limit", None)
         rate_limit = getattr(task_type, "rate_limit", None)
         rate_limit = parse_ratelimit_string(rate_limit)
         rate_limit = parse_ratelimit_string(rate_limit)
         if rate_limit:
         if rate_limit: