base.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.base
  4. ~~~~~~~~~~~~~~~~
  5. The task implementation has been moved to :mod:`celery.app.task`.
  6. This contains the backward compatible Task class used in the old API,
  7. and shouldn't be used in new applications.
  8. """
  9. from __future__ import absolute_import
  10. from kombu import Exchange
  11. from celery import current_app
  12. from celery.__compat__ import class_property, reclassmethod
  13. from celery.app.task import Context, TaskType, Task as BaseTask # noqa
  14. from celery.schedules import maybe_schedule
  15. from celery.utils.log import get_task_logger
  16. #: list of methods that must be classmethods in the old API.
  17. _COMPAT_CLASSMETHODS = (
  18. 'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
  19. 'AsyncResult', 'subtask', '_get_request',
  20. )
  21. class Task(BaseTask):
  22. """Deprecated Task base class.
  23. Modern applications should use :class:`celery.Task` instead.
  24. """
  25. abstract = True
  26. __bound__ = False
  27. __v2_compat__ = True
  28. #- Deprecated compat. attributes -:
  29. queue = None
  30. routing_key = None
  31. exchange = None
  32. exchange_type = None
  33. delivery_mode = None
  34. mandatory = False
  35. immediate = False
  36. priority = None
  37. type = 'regular'
  38. disable_error_emails = False
  39. accept_magic_kwargs = False
  40. from_config = BaseTask.from_config + (
  41. ('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),
  42. ('delivery_mode', 'CELERY_DEFAULT_DELIVERY_MODE'),
  43. )
  44. # In old Celery the @task decorator didn't exist, so one would create
  45. # classes instead and use them directly (e.g. MyTask.apply_async()).
  46. # the use of classmethods was a hack so that it was not necessary
  47. # to instantiate the class before using it, but it has only
  48. # given us pain (like all magic).
  49. for name in _COMPAT_CLASSMETHODS:
  50. locals()[name] = reclassmethod(getattr(BaseTask, name))
  51. @class_property
  52. @classmethod
  53. def request(cls):
  54. return cls._get_request()
  55. @classmethod
  56. def get_logger(self, **kwargs):
  57. return get_task_logger(self.name)
  58. @classmethod
  59. def establish_connection(self):
  60. """Deprecated method used to get a broker connection.
  61. Should be replaced with :meth:`@Celery.connection`
  62. instead, or by acquiring connections from the connection pool:
  63. .. code-block:: python
  64. # using the connection pool
  65. with celery.pool.acquire(block=True) as conn:
  66. ...
  67. # establish fresh connection
  68. with celery.connection() as conn:
  69. ...
  70. """
  71. return self._get_app().connection()
  72. def get_publisher(self, connection=None, exchange=None,
  73. exchange_type=None, **options):
  74. """Deprecated method to get the task publisher (now called producer).
  75. Should be replaced with :class:`@amqp.TaskProducer`:
  76. .. code-block:: python
  77. with celery.connection() as conn:
  78. with celery.amqp.TaskProducer(conn) as prod:
  79. my_task.apply_async(producer=prod)
  80. """
  81. exchange = self.exchange if exchange is None else exchange
  82. if exchange_type is None:
  83. exchange_type = self.exchange_type
  84. connection = connection or self.establish_connection()
  85. return self._get_app().amqp.TaskProducer(connection,
  86. exchange=exchange and Exchange(exchange, exchange_type),
  87. routing_key=self.routing_key, **options)
  88. @classmethod
  89. def get_consumer(self, connection=None, queues=None, **kwargs):
  90. """Deprecated method used to get consumer for the queue
  91. this task is sent to.
  92. Should be replaced with :class:`@amqp.TaskConsumer` instead:
  93. """
  94. Q = self._get_app().amqp
  95. connection = connection or self.establish_connection()
  96. if queues is None:
  97. queues = Q.queues[self.queue] if self.queue else Q.default_queue
  98. return Q.TaskConsumer(connection, queues, **kwargs)
  99. class PeriodicTask(Task):
  100. """A periodic task is a task that adds itself to the
  101. :setting:`CELERYBEAT_SCHEDULE` setting."""
  102. abstract = True
  103. ignore_result = True
  104. relative = False
  105. options = None
  106. compat = True
  107. def __init__(self):
  108. if not hasattr(self, 'run_every'):
  109. raise NotImplementedError(
  110. 'Periodic tasks must have a run_every attribute')
  111. self.run_every = maybe_schedule(self.run_every, self.relative)
  112. super(PeriodicTask, self).__init__()
  113. @classmethod
  114. def on_bound(cls, app):
  115. app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
  116. 'task': cls.name,
  117. 'schedule': cls.run_every,
  118. 'args': (),
  119. 'kwargs': {},
  120. 'options': cls.options or {},
  121. 'relative': cls.relative,
  122. }
  123. def task(*args, **kwargs):
  124. """Decorator to create a task class out of any callable.
  125. **Examples**
  126. .. code-block:: python
  127. @task()
  128. def refresh_feed(url):
  129. return Feed.objects.get(url=url).refresh()
  130. With setting extra options and using retry.
  131. .. code-block:: python
  132. @task(max_retries=10)
  133. def refresh_feed(url):
  134. try:
  135. return Feed.objects.get(url=url).refresh()
  136. except socket.error as exc:
  137. refresh_feed.retry(exc=exc)
  138. Calling the resulting task:
  139. >>> refresh_feed('http://example.com/rss') # Regular
  140. <Feed: http://example.com/rss>
  141. >>> refresh_feed.delay('http://example.com/rss') # Async
  142. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  143. """
  144. return current_app.task(*args, **dict({'accept_magic_kwargs': False,
  145. 'base': Task}, **kwargs))
  146. def periodic_task(*args, **options):
  147. """Decorator to create a task class out of any callable.
  148. .. admonition:: Examples
  149. .. code-block:: python
  150. @task()
  151. def refresh_feed(url):
  152. return Feed.objects.get(url=url).refresh()
  153. With setting extra options and using retry.
  154. .. code-block:: python
  155. from celery.task import current
  156. @task(exchange='feeds')
  157. def refresh_feed(url):
  158. try:
  159. return Feed.objects.get(url=url).refresh()
  160. except socket.error as exc:
  161. current.retry(exc=exc)
  162. Calling the resulting task:
  163. >>> refresh_feed('http://example.com/rss') # Regular
  164. <Feed: http://example.com/rss>
  165. >>> refresh_feed.delay('http://example.com/rss') # Async
  166. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  167. """
  168. return task(**dict({'base': PeriodicTask}, **options))