app.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import os
  4. import sys
  5. import signal
  6. from time import sleep
  7. from celery import Celery
  8. from celery import signals
  9. from celery.bin.base import Option
  10. from celery.exceptions import SoftTimeLimitExceeded
  11. from celery.utils.log import get_task_logger
  12. from .templates import use_template, template_names
  13. logger = get_task_logger(__name__)
  14. class App(Celery):
  15. template_selected = False
  16. def __init__(self, *args, **kwargs):
  17. self.template = kwargs.pop('template', None)
  18. super(App, self).__init__(*args, **kwargs)
  19. self.user_options['preload'].add(
  20. Option(
  21. '-Z', '--template', default='default',
  22. help='Configuration template to use: {0}'.format(
  23. template_names(),
  24. ),
  25. )
  26. )
  27. signals.user_preload_options.connect(self.on_preload_parsed)
  28. self.after_configure = None
  29. def on_preload_parsed(self, options=None, **kwargs):
  30. self.use_template(options['template'])
  31. def use_template(self, name='default'):
  32. if self.template_selected:
  33. raise RuntimeError('App already configured')
  34. use_template(self, name)
  35. self.template_selected = True
  36. def _get_config(self):
  37. ret = super(App, self)._get_config()
  38. if self.after_configure:
  39. self.after_configure(ret)
  40. return ret
  41. def on_configure(self):
  42. if not self.template_selected:
  43. self.use_template('default')
  44. app = App('stress', set_as_current=False)
  45. @app.task
  46. def _marker(s, sep='-'):
  47. print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
  48. @app.task
  49. def add(x, y):
  50. return x + y
  51. @app.task
  52. def xsum(x):
  53. return sum(x)
  54. @app.task
  55. def any_(*args, **kwargs):
  56. wait = kwargs.get('sleep')
  57. if wait:
  58. sleep(wait)
  59. @app.task
  60. def any_returning(*args, **kwargs):
  61. any_(*args, **kwargs)
  62. return args, kwargs
  63. @app.task
  64. def exiting(status=0):
  65. sys.exit(status)
  66. @app.task
  67. def kill(sig=getattr(signal, 'SIGKILL', None) or signal.SIGTERM):
  68. os.kill(os.getpid(), sig)
  69. @app.task
  70. def sleeping(i, **_):
  71. sleep(i)
  72. @app.task
  73. def sleeping_ignore_limits(i):
  74. try:
  75. sleep(i)
  76. except SoftTimeLimitExceeded:
  77. sleep(i)
  78. @app.task(bind=True)
  79. def retries(self):
  80. if not self.request.retries:
  81. raise self.retry(countdown=1)
  82. return 10
  83. @app.task
  84. def print_unicode():
  85. print('hiöäüß')
  86. @app.task
  87. def segfault():
  88. import ctypes
  89. ctypes.memset(0, 0, 1)
  90. assert False, 'should not get here'
  91. @app.task
  92. def raising(exc=KeyError()):
  93. raise exc
  94. @app.task
  95. def logs(msg, p=False):
  96. print(msg) if p else logger.info(msg)
  97. def marker(s, sep='-'):
  98. print('{0}{1}'.format(sep, s))
  99. while True:
  100. try:
  101. return _marker.delay(s, sep)
  102. except Exception as exc:
  103. print("Retrying marker.delay(). It failed to start: %s" % exc)