templates.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. from __future__ import absolute_import, unicode_literals
  2. import celery
  3. import os
  4. from functools import partial
  5. from celery.five import items
  6. from kombu import Queue
  7. from kombu.utils import symbol_by_name
  8. CSTRESS_TRANS = os.environ.get('CSTRESS_TRANS', False)
  9. default_queue = 'c.stress.trans' if CSTRESS_TRANS else 'c.stress'
  10. CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', default_queue)
  11. templates = {}
  12. IS_CELERY_4 = celery.VERSION[0] >= 4
  13. def template(name=None):
  14. def _register(cls):
  15. templates[name or cls.__name__] = '.'.join([__name__, cls.__name__])
  16. return cls
  17. return _register
  18. if IS_CELERY_4:
  19. def use_template(app, template='default'):
  20. template = template.split(',')
  21. # mixin the rest of the templates when the config is needed
  22. @app.on_after_configure.connect(weak=False)
  23. def load_template(sender, source, **kwargs):
  24. mixin_templates(template[1:], source)
  25. app.config_from_object(templates[template[0]])
  26. else:
  27. def use_template(app, template='default'): # noqa
  28. template = template.split(',')
  29. app.after_configure = partial(mixin_templates, template[1:])
  30. app.config_from_object(templates[template[0]])
  31. def mixin_templates(templates, conf):
  32. return [mixin_template(template, conf) for template in templates]
  33. def mixin_template(template, conf):
  34. cls = symbol_by_name(templates[template])
  35. conf.update(dict(
  36. (k, v) for k, v in items(vars(cls))
  37. if not k.startswith('_')
  38. ))
  39. def template_names():
  40. return ', '.join(templates)
  41. @template()
  42. class default(object):
  43. CELERY_ACCEPT_CONTENT = ['json']
  44. BROKER_URL = os.environ.get('CSTRESS_BROKER', 'pyamqp://')
  45. BROKER_HEARTBEAT = 30
  46. CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_BACKEND', 'rpc://')
  47. CELERY_RESULT_SERIALIZER = 'json'
  48. CELERY_RESULT_PERSISTENT = True
  49. CELERY_RESULT_EXPIRES = 300
  50. CELERY_MAX_CACHED_RESULTS = 100
  51. CELERY_DEFAULT_QUEUE = CSTRESS_QUEUE
  52. CELERY_TASK_QUEUES = [
  53. Queue(CSTRESS_QUEUE,
  54. durable=not CSTRESS_TRANS,
  55. no_ack=CSTRESS_TRANS),
  56. ]
  57. CELERY_TASK_SERIALIZER = 'json'
  58. CELERY_TASK_PUBLISH_RETRY_POLICY = {
  59. 'max_retries': 100,
  60. 'interval_max': 2,
  61. 'interval_step': 0.1,
  62. }
  63. CELERY_TASK_PROTOCOL = 2
  64. if CSTRESS_TRANS:
  65. CELERY_DEFAULT_DELIVERY_MODE = 1
  66. CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CSTRESS_PREFETCH', 10))
  67. @template()
  68. class redis(default):
  69. BROKER_URL = os.environ.get('CSTRESS_BROKER', 'redis://')
  70. BROKER_TRANSPORT_OPTIONS = {
  71. 'fanout_prefix': True,
  72. 'fanout_patterns': True,
  73. }
  74. CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_BACKEND', 'redis://')
  75. @template()
  76. class redistore(default):
  77. CELERY_RESULT_BACKEND = 'redis://'
  78. @template()
  79. class acks_late(default):
  80. CELERY_ACKS_LATE = True
  81. @template()
  82. class pickle(default):
  83. CELERY_ACCEPT_CONTENT = ['pickle', 'json']
  84. CELERY_TASK_SERIALIZER = 'pickle'
  85. CELERY_RESULT_SERIALIZER = 'pickle'
  86. @template()
  87. class confirms(default):
  88. BROKER_URL = 'pyamqp://'
  89. BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
  90. @template()
  91. class events(default):
  92. CELERY_SEND_EVENTS = True
  93. CELERY_SEND_TASK_SENT_EVENT = True
  94. @template()
  95. class execv(default):
  96. CELERYD_FORCE_EXECV = True
  97. @template()
  98. class sqs(default):
  99. BROKER_URL = 'sqs://'
  100. BROKER_TRANSPORT_OPTIONS = {
  101. 'region': os.environ.get('AWS_REGION', 'us-east-1'),
  102. }
  103. @template()
  104. class proto1(default):
  105. CELERY_TASK_PROTOCOL = 1
  106. @template()
  107. class vagrant1(default):
  108. BROKER_URL = 'pyamqp://testing:t3s71ng@192.168.33.123//testing'
  109. @template()
  110. class vagrant1_redis(redis):
  111. BROKER_URL = 'redis://192.168.33.123'
  112. CELERY_RESULT_BACKEND = 'redis://192.168.33.123'