Ver código fonte

TaskPublisher should be auto_declare=False

Ask Solem 13 anos atrás
pai
commit
0481ee62d9
2 arquivos alterados com 4 adições e 33 exclusões
  1. 1 1
      celery/app/amqp.py
  2. 3 32
      funtests/benchmarks/bench_worker.py

+ 1 - 1
celery/app/amqp.py

@@ -145,7 +145,7 @@ class Queues(dict):
 
 
 class TaskPublisher(messaging.Publisher):
-    auto_declare = True
+    auto_declare = False
     retry = False
     retry_policy = None
 

+ 3 - 32
funtests/benchmarks/bench_worker.py

@@ -4,37 +4,6 @@ import time
 
 os.environ["NOSETPS"] = "yes"
 
-from threading import Lock
-
-class DLock(object):
-
-    def __init__(self):
-        self.l = Lock()
-
-    def acquire(self, *args, **kwargs):
-        print("ACQUIRE: %r %r" % (args, kwargs))
-        import traceback
-        traceback.print_stack()
-        return self.l.acquire(*args, **kwargs)
-
-    def release(self):
-        print("RELEASE")
-        return self.l.release()
-
-    def __enter__(self):
-        self.acquire()
-        return self
-
-    def __exit__(self, *exc_info):
-        self.release()
-
-
-import threading
-#threading.Lock = DLock
-
-
-
-
 import anyjson
 JSONIMP = os.environ.get("JSONIMP")
 if JSONIMP:
@@ -52,12 +21,14 @@ celery.conf.update(BROKER_TRANSPORT="librabbitmq",
                    CELERYD_POOL="solo",
                    CELERY_PREFETCH_MULTIPLIER=0,
                    CELERY_DISABLE_RATE_LIMITS=True,
-                   #CELERY_DEFAULT_DELIVERY_MODE="transient",
+                   CELERY_DEFAULT_DELIVERY_MODE=1,
                    CELERY_QUEUES = {
                        "bench.worker": {
                            "exchange": "bench.worker",
                            "routing_key": "bench.worker",
                            "no_ack": True,
+                           "exchange_durable": False,
+                           "queue_durable": False,
                         }
                    },
                    CELERY_TASK_SERIALIZER="json",