Forráskód Böngészése

Program to stress test celery/carrot in different ways to find connection leakage etc.

Ask Solem 16 éve
szülő
commit
620b03faca
1 módosított fájl, 120 hozzáadás és 0 törlés
  1. 120 0
      contrib/testconn.py

+ 120 - 0
contrib/testconn.py

@@ -0,0 +1,120 @@
+import settings
+from django.core.management import setup_environ
+setup_environ(settings)
+from carrot.connection import DjangoAMQPConnection
+from carrot.messaging import Messaging
+from amqplib import client_0_8 as amqp
+from celery.task import dmap
+import operator
+import simplejson
+import time
+import multiprocessing
+import logging
+
+def get_logger():
+    logger = multiprocessing.get_logger()
+    logger.setLevel(logging.INFO)
+    multiprocessing.log_to_stderr()
+    return logger
+
+
+class MyMessager(Messaging):
+    queue = "conntest"
+    exchange = "conntest"
+    routing_key = "conntest"
+
+def _create_conn():
+    from django.conf import settings
+    conn = amqp.Connection(host=settings.AMQP_SERVER,
+                           userid=settings.AMQP_USER,
+                           password=settings.AMQP_PASSWORD,
+                           virtual_host=settings.AMQP_VHOST,
+                           insist=False)
+    return conn
+
+def _send2(msg):
+    conn = _create_conn()
+    channel = conn.channel()
+    msg = amqp.Message(msg)
+    msg.properties["delivery_mode"] = 2
+    channel.basic_publish(msg, exchange="conntest", routing_key="conntest")
+    conn.close()
+
+def _recv2():
+    conn = _create_conn()
+    channel = conn.channel()
+    channel.queue_declare(queue="conntest", durable=True, exclusive=False,
+                          auto_delete=False)
+    channel.exchange_declare(exchange="conntest", type="direct",
+                             durable=True, auto_delete=False)
+    channel.queue_bind(queue="conntest", exchange="conntest",
+                       routing_key="conntest")
+    m = channel.basic_get("conntest")
+    if m:
+        channel.basic_ack(m.delivery_tag)
+        print("RECEIVED MSG: %s" % m.body)
+    conn.close()
+
+def send_a_message(msg):
+    conn = DjangoAMQPConnection()
+    MyMessager(connection=conn).send({"message": msg})
+    conn.close()
+
+def discard_all():
+    conn = DjangoAMQPConnection()
+    MyMessager(connection=conn).consumer.discard_all()
+    conn.close()
+
+def receive_a_message():
+    logger = get_logger()
+    conn = DjangoAMQPConnection()
+    m = MyMessager(connection=conn).fetch()
+    if m:
+        msg = simplejson.loads(m.body)
+        logger.info("Message receieved: %s" % msg.get("message"))
+        m.ack()
+    conn.close()
+
+def connection_stress_test():
+    message_count = 0
+    discard_all()
+    while True:
+        send_a_message("FOOBARBAZ!!!")
+        time.sleep(0.1)
+        receive_a_message()
+        message_count += 1
+        print("Sent %d message(s)" % message_count)
+
+import multiprocessing
+def connection_stress_test_mp():
+    message_count = 0
+    pool = multiprocessing.Pool(10)
+    discard_all()
+    while True:
+        pool.apply(send_a_message, ["FOOBARBAZ!!!"])
+        time.sleep(0.1)
+        r = pool.apply(receive_a_message)
+        
+        message_count += 1
+        print("Sent %d message(s)" % message_count)
+
+def connection_stress_test2():
+    message_count = 0
+    while True:
+        _send2("FOOBARBAZ!!!")
+        time.sleep(0.1)
+        _recv2()
+        message_count += 1
+        print("Sent %d message(s)" % message_count)
+
+def task_stress_test():
+    task_count = 0
+    while True:
+        r = dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
+        print("[2+2, 4+4, 8+8] = %s" % r)
+        task_count += 3
+        print("Executed %d task(s)" % task_count)
+
+if __name__ == "__main__":
+    #connection_stress_test_mp()
+    task_stress_test()