templates.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from __future__ import absolute_import
  2. import os
  3. from functools import partial
  4. from celery.five import items
  5. from kombu import Exchange, Queue
  6. from kombu.utils import symbol_by_name
  7. CSTRESS_QUEUE = os.environ.get('CSTRESS_QUEUE_NAME', 'c.stress')
  8. templates = {}
  9. def template(name=None):
  10. def _register(cls):
  11. templates[name or cls.__name__] = '.'.join([__name__, cls.__name__])
  12. return cls
  13. return _register
  14. def use_template(app, template='default'):
  15. template = template.split(',')
  16. app.after_configure = partial(mixin_templates, template[1:])
  17. app.config_from_object(templates[template[0]])
  18. def mixin_templates(templates, conf):
  19. return [mixin_template(template, conf) for template in templates]
  20. def mixin_template(template, conf):
  21. cls = symbol_by_name(templates[template])
  22. conf.update(dict(
  23. (k, v) for k, v in items(vars(cls))
  24. if k.isupper() and not k.startswith('_')
  25. ))
  26. def template_names():
  27. return ', '.join(templates)
  28. @template()
  29. class default(object):
  30. CELERY_ACCEPT_CONTENT = ['json']
  31. CELERY_DEFAULT_QUEUE = CSTRESS_QUEUE
  32. CELERY_TASK_SERIALIZER = 'json'
  33. CELERY_RESULT_SERIALIZER = 'json'
  34. CELERY_RESULT_PERSISTENT = True
  35. CELERY_TASK_RESULT_EXPIRES = 300
  36. CELERY_QUEUES = [
  37. Queue(CSTRESS_QUEUE,
  38. exchange=Exchange(CSTRESS_QUEUE),
  39. routing_key=CSTRESS_QUEUE),
  40. ]
  41. BROKER_URL = os.environ.get('CSTRESS_BROKER', 'amqp://')
  42. CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_BACKEND', 'rpc://')
  43. CELERYD_PREFETCH_MULTIPLIER = int(os.environ.get('CSTRESS_PREFETCH', 10))
  44. CELERY_TASK_PUBLISH_RETRY_POLICY = {
  45. 'max_retries': 100,
  46. 'interval_max': 2,
  47. 'interval_step': 0.1,
  48. }
  49. @template()
  50. class redis(default):
  51. BROKER_URL = os.environ.get('CSTRESS_BROKER', 'redis://')
  52. CELERY_RESULT_BACKEND = os.environ.get('CSTRESS_bACKEND', 'redis://')
  53. BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True}
  54. @template()
  55. class redistore(default):
  56. CELERY_RESULT_BACKEND = 'redis://'
  57. @template()
  58. class acks_late(default):
  59. CELERY_ACKS_LATE = True
  60. @template()
  61. class pickle(default):
  62. CELERY_ACCEPT_CONTENT = ['pickle', 'json']
  63. CELERY_TASK_SERIALIZER = 'pickle'
  64. CELERY_RESULT_SERIALIZER = 'pickle'
  65. @template()
  66. class confirms(default):
  67. BROKER_URL = 'pyamqp://'
  68. BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
  69. @template()
  70. class events(default):
  71. CELERY_SEND_EVENTS = True
  72. CELERY_SEND_TASK_SENT_EVENT = True