| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 | 
							- import os
 
- import sys
 
- import time
 
- os.environ["NOSETPS"] = "yes"
 
- import anyjson
 
- JSONIMP = os.environ.get("JSONIMP")
 
- if JSONIMP:
 
-     anyjson.force_implementation(JSONIMP)
 
- print("anyjson implementation: %r" % (anyjson.implementation.name, ))
 
- from celery import Celery, group
 
- DEFAULT_ITS = 40000
 
- BROKER_TRANSPORT = os.environ.get("BROKER", "librabbitmq")
 
- if hasattr(sys, "pypy_version_info"):
 
-     BROKER_TRANSPORT = "amqplib"
 
- celery = Celery(__name__)
 
- celery.conf.update(BROKER_TRANSPORT=BROKER_TRANSPORT,
 
-                    BROKER_POOL_LIMIT=10,
 
-                    CELERYD_POOL="solo",
 
-                    CELERY_PREFETCH_MULTIPLIER=0,
 
-                    CELERY_DISABLE_RATE_LIMITS=True,
 
-                    CELERY_DEFAULT_DELIVERY_MODE=1,
 
-                    CELERY_QUEUES = {
 
-                        "bench.worker": {
 
-                            "exchange": "bench.worker",
 
-                            "routing_key": "bench.worker",
 
-                            "no_ack": True,
 
-                            "exchange_durable": False,
 
-                            "queue_durable": False,
 
-                            "auto_delete": True,
 
-                         }
 
-                    },
 
-                    CELERY_TASK_SERIALIZER="json",
 
-                    CELERY_DEFAULT_QUEUE="bench.worker",
 
-                    CELERY_BACKEND=None,
 
-                    )#CELERY_MESSAGE_COMPRESSION="zlib")
 
- def tdiff(then):
 
-     return time.time() - then
 
- @celery.task(cur=0, time_start=None, queue="bench.worker", bare=True)
 
- def it(_, n):
 
-     i = it.cur  # use internal counter, as ordering can be skewed
 
-                 # by previous runs, or the broker.
 
-     if i and not i % 5000:
 
-         print >> sys.stderr, "(%s so far: %ss)" % (i, tdiff(it.subt))
 
-         it.subt = time.time()
 
-     if not i:
 
-         it.subt = it.time_start = time.time()
 
-     elif i == n - 1:
 
-         total = tdiff(it.time_start)
 
-         print >> sys.stderr, "(%s so far: %ss)" % (i, tdiff(it.subt))
 
-         print("-- process %s tasks: %ss total, %s tasks/s} " % (
 
-                 n, total, n / (total + .0)))
 
-         sys.exit()
 
-     it.cur += 1
 
- def bench_apply(n=DEFAULT_ITS):
 
-     time_start = time.time()
 
-     group(it.s(i, n) for i in xrange(n))()
 
-     print("-- apply %s tasks: %ss" % (n, time.time() - time_start, ))
 
- def bench_work(n=DEFAULT_ITS, loglevel="CRITICAL"):
 
-     loglevel = os.environ.get("BENCH_LOGLEVEL") or loglevel
 
-     if loglevel:
 
-         celery.log.setup_logging_subsystem(loglevel=loglevel)
 
-     worker = celery.WorkController(concurrency=15,
 
-                                    queues=["bench.worker"])
 
-     try:
 
-         print("STARTING WORKER")
 
-         worker.start()
 
-     except SystemExit:
 
-         raise
 
-         assert sum(worker.state.total_count.values()) == n + 1
 
- def bench_both(n=DEFAULT_ITS):
 
-     bench_apply(n)
 
-     bench_work(n)
 
- def main(argv=sys.argv):
 
-     n = DEFAULT_ITS
 
-     if len(argv) < 2:
 
-         print("Usage: %s [apply|work|both] [n=20k]" % (
 
-                 os.path.basename(argv[0]), ))
 
-         return sys.exit(1)
 
-     try:
 
-         try:
 
-             n = int(argv[2])
 
-         except IndexError:
 
-             pass
 
-         return {"apply": bench_apply,
 
-                 "work": bench_work,
 
-                 "both": bench_both}[argv[1]](n=n)
 
-     except:
 
-         raise
 
- if __name__ == "__main__":
 
-     main()
 
 
  |