tasks.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. from time import sleep
  4. from celery import chain, group, shared_task
  5. from celery.exceptions import SoftTimeLimitExceeded
  6. from celery.utils.log import get_task_logger
  7. logger = get_task_logger(__name__)
  8. @shared_task
  9. def identity(x):
  10. return x
  11. @shared_task
  12. def add(x, y):
  13. """Add two numbers."""
  14. return x + y
  15. @shared_task
  16. def delayed_sum(numbers, pause_time=1):
  17. """Sum the iterable of numbers."""
  18. # Allow the task to be in STARTED state for
  19. # a limited period of time.
  20. sleep(pause_time)
  21. return sum(numbers)
  22. @shared_task
  23. def delayed_sum_with_soft_guard(numbers, pause_time=1):
  24. """Sum the iterable of numbers."""
  25. try:
  26. sleep(pause_time)
  27. return sum(numbers)
  28. except SoftTimeLimitExceeded:
  29. return 0
  30. @shared_task
  31. def tsum(nums):
  32. """Sum an iterable of numbers"""
  33. return sum(nums)
  34. @shared_task(bind=True)
  35. def add_replaced(self, x, y):
  36. """Add two numbers (via the add task)."""
  37. raise self.replace(add.s(x, y))
  38. @shared_task(bind=True)
  39. def add_to_all(self, nums, val):
  40. """Add the given value to all supplied numbers."""
  41. subtasks = [add.s(num, val) for num in nums]
  42. raise self.replace(group(*subtasks))
  43. @shared_task(bind=True)
  44. def add_to_all_to_chord(self, nums, val):
  45. for num in nums:
  46. self.add_to_chord(add.s(num, val))
  47. return 0
  48. @shared_task(bind=True)
  49. def add_chord_to_chord(self, nums, val):
  50. subtasks = [add.s(num, val) for num in nums]
  51. self.add_to_chord(group(subtasks) | tsum.s())
  52. return 0
  53. @shared_task
  54. def print_unicode(log_message='hå它 valmuefrø', print_message='hiöäüß'):
  55. """Task that both logs and print strings containing funny characters."""
  56. logger.warning(log_message)
  57. print(print_message)
  58. @shared_task
  59. def sleeping(i, **_):
  60. """Task sleeping for ``i`` seconds, and returning nothing."""
  61. sleep(i)
  62. @shared_task(bind=True)
  63. def ids(self, i):
  64. """Returns a tuple of ``root_id``, ``parent_id`` and
  65. the argument passed as ``i``."""
  66. return self.request.root_id, self.request.parent_id, i
  67. @shared_task(bind=True)
  68. def collect_ids(self, res, i):
  69. """Used as a callback in a chain or group where the previous tasks
  70. are :task:`ids`: returns a tuple of::
  71. (previous_result, (root_id, parent_id, i))
  72. """
  73. return res, (self.request.root_id, self.request.parent_id, i)
  74. @shared_task(bind=True, expires=60.0, max_retries=1)
  75. def retry_once(self):
  76. """Task that fails and is retried. Returns the number of retries."""
  77. if self.request.retries:
  78. return self.request.retries
  79. raise self.retry(countdown=0.1)
  80. @shared_task
  81. def redis_echo(message):
  82. """Task that appends the message to a redis list"""
  83. from redis import StrictRedis
  84. redis_connection = StrictRedis()
  85. redis_connection.rpush('redis-echo', message)
  86. @shared_task(bind=True)
  87. def second_order_replace1(self, state=False):
  88. from redis import StrictRedis
  89. redis_connection = StrictRedis()
  90. if not state:
  91. redis_connection.rpush('redis-echo', 'In A')
  92. new_task = chain(second_order_replace2.s(),
  93. second_order_replace1.si(state=True))
  94. raise self.replace(new_task)
  95. else:
  96. redis_connection.rpush('redis-echo', 'Out A')
  97. @shared_task(bind=True)
  98. def second_order_replace2(self, state=False):
  99. from redis import StrictRedis
  100. redis_connection = StrictRedis()
  101. if not state:
  102. redis_connection.rpush('redis-echo', 'In B')
  103. new_task = chain(redis_echo.s("In/Out C"),
  104. second_order_replace2.si(state=True))
  105. raise self.replace(new_task)
  106. else:
  107. redis_connection.rpush('redis-echo', 'Out B')