testconn.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import settings
  2. from django.core.management import setup_environ
  3. setup_environ(settings)
  4. from carrot.connection import DjangoAMQPConnection
  5. from carrot.messaging import Messaging
  6. from amqplib import client_0_8 as amqp
  7. from celery.task import dmap
  8. import operator
  9. import simplejson
  10. import time
  11. import multiprocessing
  12. import logging
  13. def get_logger():
  14. logger = multiprocessing.get_logger()
  15. logger.setLevel(logging.INFO)
  16. multiprocessing.log_to_stderr()
  17. return logger
  18. class MyMessager(Messaging):
  19. queue = "conntest"
  20. exchange = "conntest"
  21. routing_key = "conntest"
  22. def _create_conn():
  23. from django.conf import settings
  24. conn = amqp.Connection(host=settings.AMQP_SERVER,
  25. userid=settings.AMQP_USER,
  26. password=settings.AMQP_PASSWORD,
  27. virtual_host=settings.AMQP_VHOST,
  28. insist=False)
  29. return conn
  30. def _send2(msg):
  31. conn = _create_conn()
  32. channel = conn.channel()
  33. msg = amqp.Message(msg)
  34. msg.properties["delivery_mode"] = 2
  35. channel.basic_publish(msg, exchange="conntest", routing_key="conntest")
  36. conn.close()
  37. def _recv2():
  38. conn = _create_conn()
  39. channel = conn.channel()
  40. channel.queue_declare(queue="conntest", durable=True, exclusive=False,
  41. auto_delete=False)
  42. channel.exchange_declare(exchange="conntest", type="direct",
  43. durable=True, auto_delete=False)
  44. channel.queue_bind(queue="conntest", exchange="conntest",
  45. routing_key="conntest")
  46. m = channel.basic_get("conntest")
  47. if m:
  48. channel.basic_ack(m.delivery_tag)
  49. print("RECEIVED MSG: %s" % m.body)
  50. conn.close()
  51. def send_a_message(msg):
  52. conn = DjangoAMQPConnection()
  53. MyMessager(connection=conn).send({"message": msg})
  54. conn.close()
  55. def discard_all():
  56. conn = DjangoAMQPConnection()
  57. MyMessager(connection=conn).consumer.discard_all()
  58. conn.close()
  59. def receive_a_message():
  60. logger = get_logger()
  61. conn = DjangoAMQPConnection()
  62. m = MyMessager(connection=conn).fetch()
  63. if m:
  64. msg = simplejson.loads(m.body)
  65. logger.info("Message receieved: %s" % msg.get("message"))
  66. m.ack()
  67. conn.close()
  68. def connection_stress_test():
  69. message_count = 0
  70. discard_all()
  71. while True:
  72. send_a_message("FOOBARBAZ!!!")
  73. time.sleep(0.1)
  74. receive_a_message()
  75. message_count += 1
  76. print("Sent %d message(s)" % message_count)
  77. import multiprocessing
  78. def connection_stress_test_mp():
  79. message_count = 0
  80. pool = multiprocessing.Pool(10)
  81. discard_all()
  82. while True:
  83. pool.apply(send_a_message, ["FOOBARBAZ!!!"])
  84. time.sleep(0.1)
  85. r = pool.apply(receive_a_message)
  86. message_count += 1
  87. print("Sent %d message(s)" % message_count)
  88. def connection_stress_test2():
  89. message_count = 0
  90. while True:
  91. _send2("FOOBARBAZ!!!")
  92. time.sleep(0.1)
  93. _recv2()
  94. message_count += 1
  95. print("Sent %d message(s)" % message_count)
  96. def task_stress_test():
  97. task_count = 0
  98. while True:
  99. r = dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
  100. print("[2+2, 4+4, 8+8] = %s" % r)
  101. task_count += 3
  102. print("Executed %d task(s)" % task_count)
  103. if __name__ == "__main__":
  104. #connection_stress_test_mp()
  105. task_stress_test()