app.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, print_function, unicode_literals
  3. import celery
  4. import os
  5. import sys
  6. import signal
  7. from time import sleep
  8. from celery import signals
  9. from celery.bin.base import Option
  10. from celery.exceptions import SoftTimeLimitExceeded
  11. from celery.utils.log import get_task_logger
  12. from .templates import use_template, template_names
  13. logger = get_task_logger(__name__)
  14. IS_CELERY_4 = celery.VERSION[0] >= 4
  15. class App(celery.Celery):
  16. template_selected = False
  17. def __init__(self, *args, **kwargs):
  18. self.template = kwargs.pop('template', None)
  19. super(App, self).__init__(*args, **kwargs)
  20. self.user_options['preload'].add(
  21. Option(
  22. '-Z', '--template', default='default',
  23. help='Configuration template to use: {0}'.format(
  24. template_names(),
  25. ),
  26. )
  27. )
  28. signals.user_preload_options.connect(self.on_preload_parsed)
  29. if IS_CELERY_4:
  30. self.on_configure.connect(self._maybe_use_default_template)
  31. def on_preload_parsed(self, options=None, **kwargs):
  32. self.use_template(options['template'])
  33. def use_template(self, name='default'):
  34. if self.template_selected:
  35. raise RuntimeError('App already configured')
  36. use_template(self, name)
  37. self.template_selected = True
  38. def _maybe_use_default_template(self, **kwargs):
  39. if not self.template_selected:
  40. self.use_template('default')
  41. if not IS_CELERY_4:
  42. after_configure = None
  43. def _get_config(self):
  44. ret = super(App, self)._get_config()
  45. if self.after_configure:
  46. self.after_configure(ret)
  47. return ret
  48. def on_configure(self):
  49. self._maybe_use_default_template()
  50. app = App('stress', set_as_current=False)
  51. @app.task
  52. def _marker(s, sep='-'):
  53. print('{0} {1} {2}'.format(sep * 3, s, sep * 3))
  54. @app.task
  55. def add(x, y):
  56. return x + y
  57. @app.task(bind=True)
  58. def ids(self, i):
  59. return (self.request.root_id, self.request.parent_id, i)
  60. @app.task(bind=True)
  61. def collect_ids(self, ids, i):
  62. return ids, (self.request.root_id, self.request.parent_id, i)
  63. @app.task
  64. def xsum(x):
  65. return sum(x)
  66. @app.task
  67. def any_(*args, **kwargs):
  68. wait = kwargs.get('sleep')
  69. if wait:
  70. sleep(wait)
  71. @app.task
  72. def any_returning(*args, **kwargs):
  73. any_(*args, **kwargs)
  74. return args, kwargs
  75. @app.task
  76. def exiting(status=0):
  77. sys.exit(status)
  78. @app.task
  79. def kill(sig=getattr(signal, 'SIGKILL', None) or signal.SIGTERM):
  80. os.kill(os.getpid(), sig)
  81. @app.task
  82. def sleeping(i, **_):
  83. sleep(i)
  84. @app.task
  85. def sleeping_ignore_limits(i):
  86. try:
  87. sleep(i)
  88. except SoftTimeLimitExceeded:
  89. sleep(i)
  90. @app.task(bind=True)
  91. def retries(self):
  92. if not self.request.retries:
  93. raise self.retry(countdown=1)
  94. return 10
  95. @app.task
  96. def print_unicode():
  97. logger.warning('hå它 valmuefrø')
  98. print('hiöäüß')
  99. @app.task
  100. def segfault():
  101. import ctypes
  102. ctypes.memset(0, 0, 1)
  103. assert False, 'should not get here'
  104. @app.task(bind=True)
  105. def chord_adds(self, x):
  106. self.add_to_chord(add.s(x, x))
  107. return 42
  108. @app.task(bind=True)
  109. def chord_replace(self, x):
  110. return self.replace_in_chord(add.s(x, x))
  111. @app.task
  112. def raising(exc=KeyError()):
  113. raise exc
  114. @app.task
  115. def logs(msg, p=False):
  116. print(msg) if p else logger.info(msg)
  117. def marker(s, sep='-'):
  118. print('{0}{1}'.format(sep, s))
  119. while True:
  120. try:
  121. return _marker.delay(s, sep)
  122. except Exception as exc:
  123. print("Retrying marker.delay(). It failed to start: %s" % exc)