templates.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. from __future__ import absolute_import
  2. import os
  3. from celery.five import items
  4. from kombu import Exchange, Queue
  5. from kombu.utils import symbol_by_name
  6. CSTRESS_TRANS = os.environ.get('CSTRESS_TRANS', False)
  7. default_queue = 'c.stress.trans' if CSTRESS_TRANS else 'c.stress'
  8. CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', default_queue)
  9. templates = {}
  10. def template(name=None):
  11. def _register(cls):
  12. templates[name or cls.__name__] = '.'.join([__name__, cls.__name__])
  13. return cls
  14. return _register
  15. def use_template(app, template='default'):
  16. template = template.split(',')
  17. # mixin the rest of the templates when the config is needed
  18. @app.on_after_configure.connect(weak=False)
  19. def load_template(sender, source, **kwargs):
  20. mixin_templates(template[1:], source)
  21. app.config_from_object(templates[template[0]])
  22. def mixin_templates(templates, conf):
  23. return [mixin_template(template, conf) for template in templates]
  24. def mixin_template(template, conf):
  25. cls = symbol_by_name(templates[template])
  26. conf.update(dict(
  27. (k, v) for k, v in items(vars(cls))
  28. if k.isupper() and not k.startswith('_')
  29. ))
  30. def template_names():
  31. return ', '.join(templates)
  32. @template()
  33. class default(object):
  34. BROKER_HEARTBEAT = 30
  35. CELERY_ACCEPT_CONTENT = ['json']
  36. CELERY_DEFAULT_QUEUE = CSTRESS_QUEUE
  37. CELERY_TASK_SERIALIZER = 'json'
  38. CELERY_RESULT_SERIALIZER = 'json'
  39. CELERY_RESULT_PERSISTENT = True
  40. CELERY_TASK_RESULT_EXPIRES = 300
  41. CELERY_QUEUES = [
  42. Queue(CSTRESS_QUEUE,
  43. exchange=Exchange(CSTRESS_QUEUE),
  44. routing_key=CSTRESS_QUEUE,
  45. durable=not CSTRESS_TRANS,
  46. no_ack=CSTRESS_TRANS),
  47. ]
  48. CELERY_MAX_CACHED_RESULTS = -1
  49. BROKER_URL = os.environ.get('CSTRESS_BROKER', 'amqp://')
  50. CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_BACKEND', 'rpc://')
  51. CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CSTRESS_PREFETCH', 10))
  52. CELERY_TASK_PUBLISH_RETRY_POLICY = {
  53. 'max_retries': 100,
  54. 'interval_max': 2,
  55. 'interval_step': 0.1,
  56. }
  57. CELERY_TASK_PROTOCOL = 2
  58. if CSTRESS_TRANS:
  59. CELERY_DEFAULT_DELIVERY_MODE = 1
  60. @template()
  61. class redis(default):
  62. BROKER_URL = os.environ.get('CSTRESS_BROKER', 'redis://')
  63. CELERY_RESULT_BACKEND = os.environ.get(
  64. 'CSTRESS_BACKEND', 'redis://?new_join=1',
  65. )
  66. BROKER_TRANSPORT_OPTIONS = {
  67. 'fanout_prefix': True,
  68. 'fanout_patterns': True,
  69. }
  70. @template()
  71. class redistore(default):
  72. CELERY_RESULT_BACKEND = 'redis://?new_join=1'
  73. @template()
  74. class acks_late(default):
  75. CELERY_ACKS_LATE = True
  76. @template()
  77. class pickle(default):
  78. CELERY_ACCEPT_CONTENT = ['pickle', 'json']
  79. CELERY_TASK_SERIALIZER = 'pickle'
  80. CELERY_RESULT_SERIALIZER = 'pickle'
  81. @template()
  82. class confirms(default):
  83. BROKER_URL = 'pyamqp://'
  84. BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
  85. @template()
  86. class events(default):
  87. CELERY_SEND_EVENTS = True
  88. CELERY_SEND_TASK_SENT_EVENT = True
  89. @template()
  90. class execv(default):
  91. CELERYD_FORCE_EXECV = True
  92. @template()
  93. class sqs(default):
  94. BROKER_URL = 'sqs://'
  95. BROKER_TRANSPORT_OPTIONS = {
  96. 'region': os.environ.get('AWS_REGION', 'us-east-1'),
  97. }
  98. @template()
  99. class proto1(default):
  100. CELERY_TASK_PROTOCOL = 1