123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import settings
- from django.core.management import setup_environ
- setup_environ(settings)
- from carrot.connection import DjangoAMQPConnection
- from carrot.messaging import Messaging
- from amqplib import client_0_8 as amqp
- from celery.task import dmap
- import operator
- import simplejson
- import time
- import multiprocessing
- import logging
- def get_logger():
- logger = multiprocessing.get_logger()
- logger.setLevel(logging.INFO)
- multiprocessing.log_to_stderr()
- return logger
- class MyMessager(Messaging):
- queue = "conntest"
- exchange = "conntest"
- routing_key = "conntest"
- def _create_conn():
- from django.conf import settings
- conn = amqp.Connection(host=settings.AMQP_SERVER,
- userid=settings.AMQP_USER,
- password=settings.AMQP_PASSWORD,
- virtual_host=settings.AMQP_VHOST,
- insist=False)
- return conn
- def _send2(msg):
- conn = _create_conn()
- channel = conn.channel()
- msg = amqp.Message(msg)
- msg.properties["delivery_mode"] = 2
- channel.basic_publish(msg, exchange="conntest", routing_key="conntest")
- conn.close()
- def _recv2():
- conn = _create_conn()
- channel = conn.channel()
- channel.queue_declare(queue="conntest", durable=True, exclusive=False,
- auto_delete=False)
- channel.exchange_declare(exchange="conntest", type="direct",
- durable=True, auto_delete=False)
- channel.queue_bind(queue="conntest", exchange="conntest",
- routing_key="conntest")
- m = channel.basic_get("conntest")
- if m:
- channel.basic_ack(m.delivery_tag)
- print("RECEIVED MSG: %s" % m.body)
- conn.close()
- def send_a_message(msg):
- conn = DjangoAMQPConnection()
- MyMessager(connection=conn).send({"message": msg})
- conn.close()
- def discard_all():
- conn = DjangoAMQPConnection()
- MyMessager(connection=conn).consumer.discard_all()
- conn.close()
- def receive_a_message():
- logger = get_logger()
- conn = DjangoAMQPConnection()
- m = MyMessager(connection=conn).fetch()
- if m:
- msg = simplejson.loads(m.body)
- logger.info("Message receieved: %s" % msg.get("message"))
- m.ack()
- conn.close()
- def connection_stress_test():
- message_count = 0
- discard_all()
- while True:
- send_a_message("FOOBARBAZ!!!")
- time.sleep(0.1)
- receive_a_message()
- message_count += 1
- print("Sent %d message(s)" % message_count)
- import multiprocessing
- def connection_stress_test_mp():
- message_count = 0
- pool = multiprocessing.Pool(10)
- discard_all()
- while True:
- pool.apply(send_a_message, ["FOOBARBAZ!!!"])
- time.sleep(0.1)
- r = pool.apply(receive_a_message)
-
- message_count += 1
- print("Sent %d message(s)" % message_count)
- def connection_stress_test2():
- message_count = 0
- while True:
- _send2("FOOBARBAZ!!!")
- time.sleep(0.1)
- _recv2()
- message_count += 1
- print("Sent %d message(s)" % message_count)
- def task_stress_test():
- task_count = 0
- while True:
- r = dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
- print("[2+2, 4+4, 8+8] = %s" % r)
- task_count += 3
- print("Executed %d task(s)" % task_count)
- if __name__ == "__main__":
- #connection_stress_test_mp()
- task_stress_test()
|