tasks.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, unicode_literals
  3. from time import sleep
  4. from celery import chain, chord, group, shared_task
  5. from celery.exceptions import SoftTimeLimitExceeded
  6. from celery.utils.log import get_task_logger
  7. from .conftest import get_redis_connection
  8. logger = get_task_logger(__name__)
  9. @shared_task
  10. def identity(x):
  11. return x
  12. @shared_task
  13. def add(x, y):
  14. """Add two numbers."""
  15. return x + y
  16. @shared_task
  17. def raise_error():
  18. """Deliberately raise an error."""
  19. raise ValueError("deliberate error")
  20. @shared_task(ignore_result=True)
  21. def add_ignore_result(x, y):
  22. """Add two numbers."""
  23. return x + y
  24. @shared_task
  25. def chain_add(x, y):
  26. (
  27. add.s(x, x) | add.s(y)
  28. ).apply_async()
  29. @shared_task
  30. def chord_add(x, y):
  31. chord(add.s(x, x), add.s(y)).apply_async()
  32. @shared_task
  33. def delayed_sum(numbers, pause_time=1):
  34. """Sum the iterable of numbers."""
  35. # Allow the task to be in STARTED state for
  36. # a limited period of time.
  37. sleep(pause_time)
  38. return sum(numbers)
  39. @shared_task
  40. def delayed_sum_with_soft_guard(numbers, pause_time=1):
  41. """Sum the iterable of numbers."""
  42. try:
  43. sleep(pause_time)
  44. return sum(numbers)
  45. except SoftTimeLimitExceeded:
  46. return 0
  47. @shared_task
  48. def tsum(nums):
  49. """Sum an iterable of numbers"""
  50. return sum(nums)
  51. @shared_task(bind=True)
  52. def add_replaced(self, x, y):
  53. """Add two numbers (via the add task)."""
  54. raise self.replace(add.s(x, y))
  55. @shared_task(bind=True)
  56. def add_to_all(self, nums, val):
  57. """Add the given value to all supplied numbers."""
  58. subtasks = [add.s(num, val) for num in nums]
  59. raise self.replace(group(*subtasks))
  60. @shared_task(bind=True)
  61. def add_to_all_to_chord(self, nums, val):
  62. for num in nums:
  63. self.add_to_chord(add.s(num, val))
  64. return 0
  65. @shared_task(bind=True)
  66. def add_chord_to_chord(self, nums, val):
  67. subtasks = [add.s(num, val) for num in nums]
  68. self.add_to_chord(group(subtasks) | tsum.s())
  69. return 0
  70. @shared_task
  71. def print_unicode(log_message='hå它 valmuefrø', print_message='hiöäüß'):
  72. """Task that both logs and print strings containing funny characters."""
  73. logger.warning(log_message)
  74. print(print_message)
  75. @shared_task
  76. def sleeping(i, **_):
  77. """Task sleeping for ``i`` seconds, and returning nothing."""
  78. sleep(i)
  79. @shared_task(bind=True)
  80. def ids(self, i):
  81. """Returns a tuple of ``root_id``, ``parent_id`` and
  82. the argument passed as ``i``."""
  83. return self.request.root_id, self.request.parent_id, i
  84. @shared_task(bind=True)
  85. def collect_ids(self, res, i):
  86. """Used as a callback in a chain or group where the previous tasks
  87. are :task:`ids`: returns a tuple of::
  88. (previous_result, (root_id, parent_id, i))
  89. """
  90. return res, (self.request.root_id, self.request.parent_id, i)
  91. @shared_task(bind=True, expires=60.0, max_retries=1)
  92. def retry_once(self):
  93. """Task that fails and is retried. Returns the number of retries."""
  94. if self.request.retries:
  95. return self.request.retries
  96. raise self.retry(countdown=0.1)
  97. @shared_task
  98. def redis_echo(message):
  99. """Task that appends the message to a redis list"""
  100. redis_connection = get_redis_connection()
  101. redis_connection.rpush('redis-echo', message)
  102. @shared_task(bind=True)
  103. def second_order_replace1(self, state=False):
  104. redis_connection = get_redis_connection()
  105. if not state:
  106. redis_connection.rpush('redis-echo', 'In A')
  107. new_task = chain(second_order_replace2.s(),
  108. second_order_replace1.si(state=True))
  109. raise self.replace(new_task)
  110. else:
  111. redis_connection.rpush('redis-echo', 'Out A')
  112. @shared_task(bind=True)
  113. def second_order_replace2(self, state=False):
  114. redis_connection = get_redis_connection()
  115. if not state:
  116. redis_connection.rpush('redis-echo', 'In B')
  117. new_task = chain(redis_echo.s("In/Out C"),
  118. second_order_replace2.si(state=True))
  119. raise self.replace(new_task)
  120. else:
  121. redis_connection.rpush('redis-echo', 'Out B')
  122. @shared_task(bind=True)
  123. def build_chain_inside_task(self):
  124. """Task to build a chain.
  125. This task builds a chain and returns the chain's AsyncResult
  126. to verify that Asyncresults are correctly converted into
  127. serializable objects"""
  128. test_chain = (
  129. add.s(1, 1) |
  130. add.s(2) |
  131. group(
  132. add.s(3),
  133. add.s(4)
  134. ) |
  135. add.s(5)
  136. )
  137. result = test_chain()
  138. return result
  139. class ExpectedException(Exception):
  140. pass
  141. @shared_task
  142. def fail(*args):
  143. raise ExpectedException('Task expected to fail')
  144. @shared_task
  145. def chord_error(*args):
  146. return args