app.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import os
  4. import sys
  5. import signal
  6. from time import sleep
  7. from celery import Celery
  8. from celery import signals
  9. from celery.bin.base import Option
  10. from celery.exceptions import SoftTimeLimitExceeded
  11. from .templates import use_template, template_names
  12. class App(Celery):
  13. template_selected = False
  14. def __init__(self, *args, **kwargs):
  15. self.template = kwargs.pop('template', None)
  16. super(App, self).__init__(*args, **kwargs)
  17. self.user_options['preload'].add(
  18. Option(
  19. '-Z', '--template', default='default',
  20. help='Configuration template to use: {0}'.format(
  21. template_names(),
  22. ),
  23. )
  24. )
  25. signals.user_preload_options.connect(self.on_preload_parsed)
  26. self.after_configure = None
  27. def on_preload_parsed(self, options=None, **kwargs):
  28. self.use_template(options['template'])
  29. def use_template(self, name='default'):
  30. if self.template_selected:
  31. raise RuntimeError('App already configured')
  32. use_template(self, name)
  33. self.template_selected = True
  34. def _get_config(self):
  35. ret = super(App, self)._get_config()
  36. if self.after_configure:
  37. self.after_configure(ret)
  38. return ret
  39. def on_configure(self):
  40. if not self.template_selected:
  41. self.use_template('default')
  42. app = App('stress', set_as_current=False)
  43. @app.task
  44. def _marker(s, sep='-'):
  45. print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
  46. @app.task
  47. def add(x, y):
  48. return x + y
  49. @app.task
  50. def xsum(x):
  51. return sum(x)
  52. @app.task
  53. def any_(*args, **kwargs):
  54. wait = kwargs.get('sleep')
  55. if wait:
  56. sleep(wait)
  57. @app.task
  58. def any_returning(*args, **kwargs):
  59. any_(*args, **kwargs)
  60. return args, kwargs
  61. @app.task
  62. def exiting(status=0):
  63. sys.exit(status)
  64. @app.task
  65. def kill(sig=signal.SIGKILL):
  66. os.kill(os.getpid(), sig)
  67. @app.task
  68. def sleeping(i, **_):
  69. sleep(i)
  70. @app.task
  71. def sleeping_ignore_limits(i):
  72. try:
  73. sleep(i)
  74. except SoftTimeLimitExceeded:
  75. sleep(i)
  76. @app.task(bind=True)
  77. def retries(self):
  78. if not self.request.retries:
  79. raise self.retry(countdown=1)
  80. return 10
  81. @app.task
  82. def unicode():
  83. print('hiöäüß')
  84. @app.task
  85. def segfault():
  86. import ctypes
  87. ctypes.memset(0, 0, 1)
  88. assert False, 'should not get here'
  89. def marker(s, sep='-'):
  90. print('{0}{1}'.format(sep, s))
  91. while True:
  92. try:
  93. return _marker.delay(s, sep)
  94. except Exception as exc:
  95. print("Retrying marker.delay(). It failed to start: %s" % exc)