Ver Fonte

Moved celery.worker.revoke.revoked to celery.worker.state.revoked

Ask Solem há 14 anos atrás
pai
commit
b03244dd25

+ 2 - 1
celery/backends/amqp.py

@@ -38,7 +38,8 @@ class ResultConsumer(Consumer):
     def __init__(self, connection, task_id, expires=None, **kwargs):
         routing_key = task_id.replace("-", "")
         if expires is not None:
-            self.queue_arguments = {"x-expires": expires}
+            pass
+            #self.queue_arguments = {"x-expires": expires}
         super(ResultConsumer, self).__init__(connection,
                 queue=routing_key, routing_key=routing_key, **kwargs)
 

+ 1 - 1
celery/tests/test_worker.py

@@ -353,7 +353,7 @@ class test_CarrotListener(unittest.TestCase):
                            kwargs={}, id=id)
         l.event_dispatcher = MockEventDispatcher()
         l.receive_message(c.decode(), c)
-        from celery.worker.revoke import revoked
+        from celery.worker.state import revoked
         self.assertIn(id, revoked)
 
         l.receive_message(t.decode(), t)

+ 1 - 1
celery/tests/test_worker_control.py

@@ -8,7 +8,7 @@ from celery.task.builtins import PingTask
 from celery.utils import gen_unique_id
 from celery.worker import control
 from celery.worker.buckets import FastQueue
-from celery.worker.revoke import revoked
+from celery.worker.state import revoked
 from celery.worker.scheduler import Scheduler
 
 hostname = socket.gethostname()

+ 1 - 1
celery/tests/test_worker_job.py

@@ -20,7 +20,7 @@ from celery.utils import gen_unique_id
 from celery.worker.job import WorkerTaskTrace, TaskRequest
 from celery.worker.job import execute_and_trace, AlreadyExecutedError
 from celery.worker.job import InvalidTaskError
-from celery.worker.revoke import revoked
+from celery.worker.state import revoked
 
 from celery.tests.compat import catch_warnings
 from celery.tests.utils import execute_context

+ 1 - 1
celery/worker/control/builtins.py

@@ -5,7 +5,7 @@ from celery.backends import default_backend
 from celery.registry import tasks
 from celery.utils import timeutils
 from celery.worker import state
-from celery.worker.revoke import revoked
+from celery.worker.state import revoked
 from celery.worker.control.registry import Panel
 
 TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")

+ 1 - 2
celery/worker/job.py

@@ -13,7 +13,6 @@ from celery.registry import tasks
 from celery.utils import noop, kwdict, fun_takes_kwargs
 from celery.utils.mail import mail_admins
 from celery.worker import state
-from celery.worker.revoke import revoked
 
 # pep8.py borks on a inline signature separator and
 # says "trailing whitespace" ;)
@@ -233,7 +232,7 @@ class TaskRequest(object):
     def revoked(self):
         if self._already_revoked:
             return True
-        if self.task_id in revoked:
+        if self.task_id in state.revoked:
             self.logger.warn("Skipping revoked task: %s[%s]" % (
                 self.task_name, self.task_id))
             self.send_event("task-revoked", uuid=self.task_id)

+ 0 - 6
celery/worker/revoke.py

@@ -1,6 +0,0 @@
-from celery.datastructures import LimitedSet
-
-REVOKES_MAX = 10000
-REVOKE_EXPIRES = 3600 # One hour.
-
-revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)

+ 5 - 0
celery/worker/state.py

@@ -1,7 +1,12 @@
 from celery.utils.compat import defaultdict
+from celery.datastructures import LimitedSet
+
+REVOKES_MAX = 10000
+REVOKE_EXPIRES = 3600 # One hour.
 
 active = defaultdict(lambda: 0)
 total = defaultdict(lambda: 0)
+revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)
 
 
 def task_accepted(task_name):