base.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.base
  4. ~~~~~~~~~~~~~~~~
  5. The task implementation has been moved to :mod:`celery.app.task`.
  6. This contains the backward compatible Task class used in the old API,
  7. and shouldn't be used in new applications.
  8. """
  9. from __future__ import absolute_import
  10. from kombu import Exchange
  11. from celery import current_app
  12. from celery.__compat__ import class_property, reclassmethod
  13. from celery.app.task import Context, TaskType, Task as BaseTask # noqa
  14. from celery.schedules import maybe_schedule
  15. from celery.utils.log import get_task_logger
  16. #: list of methods that must be classmethods in the old API.
  17. _COMPAT_CLASSMETHODS = (
  18. 'delay', 'apply_async', 'retry', 'apply',
  19. 'AsyncResult', 'subtask', '_get_request',
  20. )
  21. class Task(BaseTask):
  22. """Deprecated Task base class.
  23. Modern applications should use :class:`celery.Task` instead.
  24. """
  25. abstract = True
  26. __bound__ = False
  27. #- Deprecated compat. attributes -:
  28. queue = None
  29. routing_key = None
  30. exchange = None
  31. exchange_type = None
  32. delivery_mode = None
  33. mandatory = False
  34. immediate = False
  35. priority = None
  36. type = 'regular'
  37. disable_error_emails = False
  38. accept_magic_kwargs = False
  39. from_config = BaseTask.from_config + (
  40. ('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),
  41. ('delivery_mode', 'CELERY_DEFAULT_DELIVERY_MODE'),
  42. )
  43. # In old Celery the @task decorator didn't exist, so one would create
  44. # classes instead and use them directly (e.g. MyTask.apply_async()).
  45. # the use of classmethods was a hack so that it was not necessary
  46. # to instantiate the class before using it, but it has only
  47. # given us pain (like all magic).
  48. for name in _COMPAT_CLASSMETHODS:
  49. locals()[name] = reclassmethod(getattr(BaseTask, name))
  50. @class_property
  51. @classmethod
  52. def request(cls):
  53. return cls._get_request()
  54. @classmethod
  55. def get_logger(self, **kwargs):
  56. return get_task_logger(self.name)
  57. @classmethod
  58. def establish_connection(self):
  59. """Deprecated method used to get a broker connection.
  60. Should be replaced with :meth:`@Celery.connection`
  61. instead, or by acquiring connections from the connection pool:
  62. .. code-block:: python
  63. # using the connection pool
  64. with celery.pool.acquire(block=True) as conn:
  65. ...
  66. # establish fresh connection
  67. with celery.connection() as conn:
  68. ...
  69. """
  70. return self._get_app().connection()
  71. def get_publisher(self, connection=None, exchange=None,
  72. exchange_type=None, **options):
  73. """Deprecated method to get the task publisher (now called producer).
  74. Should be replaced with :class:`@amqp.TaskProducer`:
  75. .. code-block:: python
  76. with celery.connection() as conn:
  77. with celery.amqp.TaskProducer(conn) as prod:
  78. my_task.apply_async(producer=prod)
  79. """
  80. exchange = self.exchange if exchange is None else exchange
  81. if exchange_type is None:
  82. exchange_type = self.exchange_type
  83. connection = connection or self.establish_connection()
  84. return self._get_app().amqp.TaskProducer(connection,
  85. exchange=exchange and Exchange(exchange, exchange_type),
  86. routing_key=self.routing_key, **options)
  87. @classmethod
  88. def get_consumer(self, connection=None, queues=None, **kwargs):
  89. """Deprecated method used to get consumer for the queue
  90. this task is sent to.
  91. Should be replaced with :class:`@amqp.TaskConsumer` instead:
  92. """
  93. Q = self._get_app().amqp
  94. connection = connection or self.establish_connection()
  95. if queues is None:
  96. queues = Q.queues[self.queue] if self.queue else Q.default_queue
  97. return Q.TaskConsumer(connection, queues, **kwargs)
  98. class PeriodicTask(Task):
  99. """A periodic task is a task that adds itself to the
  100. :setting:`CELERYBEAT_SCHEDULE` setting."""
  101. abstract = True
  102. ignore_result = True
  103. relative = False
  104. options = None
  105. compat = True
  106. def __init__(self):
  107. if not hasattr(self, 'run_every'):
  108. raise NotImplementedError(
  109. 'Periodic tasks must have a run_every attribute')
  110. self.run_every = maybe_schedule(self.run_every, self.relative)
  111. super(PeriodicTask, self).__init__()
  112. @classmethod
  113. def on_bound(cls, app):
  114. app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
  115. 'task': cls.name,
  116. 'schedule': cls.run_every,
  117. 'args': (),
  118. 'kwargs': {},
  119. 'options': cls.options or {},
  120. 'relative': cls.relative,
  121. }
  122. def task(*args, **kwargs):
  123. """Decorator to create a task class out of any callable.
  124. **Examples**
  125. .. code-block:: python
  126. @task()
  127. def refresh_feed(url):
  128. return Feed.objects.get(url=url).refresh()
  129. With setting extra options and using retry.
  130. .. code-block:: python
  131. @task(max_retries=10)
  132. def refresh_feed(url):
  133. try:
  134. return Feed.objects.get(url=url).refresh()
  135. except socket.error as exc:
  136. refresh_feed.retry(exc=exc)
  137. Calling the resulting task:
  138. >>> refresh_feed('http://example.com/rss') # Regular
  139. <Feed: http://example.com/rss>
  140. >>> refresh_feed.delay('http://example.com/rss') # Async
  141. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  142. """
  143. return current_app.task(*args, **dict({'accept_magic_kwargs': False,
  144. 'base': Task}, **kwargs))
  145. def periodic_task(*args, **options):
  146. """Decorator to create a task class out of any callable.
  147. .. admonition:: Examples
  148. .. code-block:: python
  149. @task()
  150. def refresh_feed(url):
  151. return Feed.objects.get(url=url).refresh()
  152. With setting extra options and using retry.
  153. .. code-block:: python
  154. from celery.task import current
  155. @task(exchange='feeds')
  156. def refresh_feed(url):
  157. try:
  158. return Feed.objects.get(url=url).refresh()
  159. except socket.error as exc:
  160. current.retry(exc=exc)
  161. Calling the resulting task:
  162. >>> refresh_feed('http://example.com/rss') # Regular
  163. <Feed: http://example.com/rss>
  164. >>> refresh_feed.delay('http://example.com/rss') # Async
  165. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  166. """
  167. return task(**dict({'base': PeriodicTask}, **options))