app.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function
  3. import os
  4. import sys
  5. import signal
  6. from kombu import Exchange, Queue
  7. from time import sleep
  8. from celery import Celery
  9. from celery.exceptions import SoftTimeLimitExceeded
  10. CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', 'c.stress')
  11. CSTRESS_BROKER = os.environ.get('CSTRESS_BROKER', 'amqp://')
  12. CSTRESS_BACKEND = os.environ.get('CSTRESS_BACKEND', 'redis://127.0.0.1')
  13. CSTRESS_PREFETCH = int(os.environ.get('CSTRESS_PREFETCH', 1))
  14. app = Celery(
  15. 'stress', broker=CSTRESS_BROKER, backend=CSTRESS_BACKEND,
  16. set_as_current=False,
  17. )
  18. app.conf.update(
  19. CELERYD_PREFETCH_MULTIPLIER=CSTRESS_PREFETCH,
  20. CELERY_DEFAULT_QUEUE=CSTRESS_QUEUE,
  21. CELERY_QUEUES=(
  22. Queue(CSTRESS_QUEUE,
  23. exchange=Exchange(CSTRESS_QUEUE),
  24. routing_key=CSTRESS_QUEUE),
  25. ),
  26. )
  27. @app.task
  28. def _marker(s, sep='-'):
  29. print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
  30. @app.task
  31. def add(x, y):
  32. return x + y
  33. @app.task
  34. def xsum(x):
  35. return sum(x)
  36. @app.task
  37. def any_(*args, **kwargs):
  38. wait = kwargs.get('sleep')
  39. if wait:
  40. sleep(wait)
  41. @app.task
  42. def exiting(status=0):
  43. sys.exit(status)
  44. @app.task
  45. def kill(sig=signal.SIGKILL):
  46. os.kill(os.getpid(), sig)
  47. @app.task
  48. def sleeping(i, **_):
  49. sleep(i)
  50. @app.task
  51. def sleeping_ignore_limits(i):
  52. try:
  53. sleep(i)
  54. except SoftTimeLimitExceeded:
  55. sleep(i)
  56. @app.task(bind=True)
  57. def retries(self):
  58. if not self.request.retries:
  59. raise self.retry(countdown=1)
  60. return 10
  61. @app.task
  62. def segfault():
  63. import ctypes
  64. ctypes.memset(0, 0, 1)
  65. assert False, 'should not get here'
  66. def marker(s, sep='-'):
  67. print('{0}{1}'.format(sep, s))
  68. while True:
  69. try:
  70. return _marker.delay(s, sep)
  71. except Exception as exc:
  72. print("Retrying marker.delay(). It failed to start: %s" % exc)