base.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.base
  4. ~~~~~~~~~~~~~~~~
  5. The task implementation has been moved to :mod:`celery.app.task`.
  6. This contains the backward compatible Task class used in the old API,
  7. and shouldn't be used in new applications.
  8. """
  9. from __future__ import absolute_import, unicode_literals
  10. from kombu import Exchange
  11. from celery import current_app
  12. from celery.app.task import Context, Task as BaseTask, _reprtask
  13. from celery.five import (
  14. class_property, reclassmethod,
  15. python_2_unicode_compatible, with_metaclass,
  16. )
  17. from celery.local import Proxy
  18. from celery.schedules import maybe_schedule
  19. from celery.utils.log import get_task_logger
  20. __all__ = ['Context', 'Task', 'TaskType', 'PeriodicTask', 'task']
  21. #: list of methods that must be classmethods in the old API.
  22. _COMPAT_CLASSMETHODS = (
  23. 'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
  24. 'signature_from_request', 'signature',
  25. 'AsyncResult', 'subtask', '_get_request', '_get_exec_options',
  26. )
  27. @python_2_unicode_compatible
  28. class _CompatShared(object):
  29. def __init__(self, name, cons):
  30. self.name = name
  31. self.cons = cons
  32. def __hash__(self):
  33. return hash(self.name)
  34. def __repr__(self):
  35. return '<OldTask: %r>' % (self.name,)
  36. def __call__(self, app):
  37. return self.cons(app)
  38. class TaskType(type):
  39. """Meta class for tasks.
  40. Automatically registers the task in the task registry (except
  41. if the :attr:`Task.abstract`` attribute is set).
  42. If no :attr:`Task.name` attribute is provided, then the name is generated
  43. from the module and class name.
  44. """
  45. _creation_count = {} # used by old non-abstract task classes
  46. def __new__(cls, name, bases, attrs):
  47. new = super(TaskType, cls).__new__
  48. task_module = attrs.get('__module__') or '__main__'
  49. # - Abstract class: abstract attribute should not be inherited.
  50. abstract = attrs.pop('abstract', None)
  51. if abstract or not attrs.get('autoregister', True):
  52. return new(cls, name, bases, attrs)
  53. # The 'app' attribute is now a property, with the real app located
  54. # in the '_app' attribute. Previously this was a regular attribute,
  55. # so we should support classes defining it.
  56. app = attrs.pop('_app', None) or attrs.pop('app', None)
  57. # Attempt to inherit app from one the bases
  58. if not isinstance(app, Proxy) and app is None:
  59. for base in bases:
  60. if getattr(base, '_app', None):
  61. app = base._app
  62. break
  63. else:
  64. app = current_app._get_current_object()
  65. attrs['_app'] = app
  66. # - Automatically generate missing/empty name.
  67. task_name = attrs.get('name')
  68. if not task_name:
  69. attrs['name'] = task_name = app.gen_task_name(name, task_module)
  70. if not attrs.get('_decorated'):
  71. # non decorated tasks must also be shared in case
  72. # an app is created multiple times due to modules
  73. # imported under multiple names.
  74. # Hairy stuff, here to be compatible with 2.x.
  75. # People should not use non-abstract task classes anymore,
  76. # use the task decorator.
  77. from celery._state import connect_on_app_finalize
  78. unique_name = '.'.join([task_module, name])
  79. if unique_name not in cls._creation_count:
  80. # the creation count is used as a safety
  81. # so that the same task is not added recursively
  82. # to the set of constructors.
  83. cls._creation_count[unique_name] = 1
  84. connect_on_app_finalize(_CompatShared(
  85. unique_name,
  86. lambda app: TaskType.__new__(cls, name, bases,
  87. dict(attrs, _app=app)),
  88. ))
  89. # - Create and register class.
  90. # Because of the way import happens (recursively)
  91. # we may or may not be the first time the task tries to register
  92. # with the framework. There should only be one class for each task
  93. # name, so we always return the registered version.
  94. tasks = app._tasks
  95. if task_name not in tasks:
  96. tasks.register(new(cls, name, bases, attrs))
  97. instance = tasks[task_name]
  98. instance.bind(app)
  99. return instance.__class__
  100. def __repr__(cls):
  101. return _reprtask(cls)
  102. @with_metaclass(TaskType)
  103. @python_2_unicode_compatible
  104. class Task(BaseTask):
  105. """Deprecated Task base class.
  106. Modern applications should use :class:`celery.Task` instead.
  107. """
  108. abstract = True
  109. __bound__ = False
  110. __v2_compat__ = True
  111. # - Deprecated compat. attributes -:
  112. queue = None
  113. routing_key = None
  114. exchange = None
  115. exchange_type = None
  116. delivery_mode = None
  117. mandatory = False # XXX deprecated
  118. immediate = False # XXX deprecated
  119. priority = None
  120. type = 'regular'
  121. from_config = BaseTask.from_config + (
  122. ('exchange_type', 'task_default_exchange_type'),
  123. ('delivery_mode', 'task_default_delivery_mode'),
  124. )
  125. # In old Celery the @task decorator didn't exist, so one would create
  126. # classes instead and use them directly (e.g. MyTask.apply_async()).
  127. # the use of classmethods was a hack so that it was not necessary
  128. # to instantiate the class before using it, but it has only
  129. # given us pain (like all magic).
  130. for name in _COMPAT_CLASSMETHODS:
  131. locals()[name] = reclassmethod(getattr(BaseTask, name))
  132. @class_property
  133. def request(cls):
  134. return cls._get_request()
  135. @class_property
  136. def backend(cls):
  137. if cls._backend is None:
  138. return cls.app.backend
  139. return cls._backend
  140. @backend.setter
  141. def backend(cls, value): # noqa
  142. cls._backend = value
  143. @classmethod
  144. def get_logger(self, **kwargs):
  145. return get_task_logger(self.name)
  146. @classmethod
  147. def establish_connection(self):
  148. """Deprecated method used to get a broker connection.
  149. Should be replaced with :meth:`@Celery.connection`
  150. instead, or by acquiring connections from the connection pool:
  151. .. code-block:: python
  152. # using the connection pool
  153. with celery.pool.acquire(block=True) as conn:
  154. ...
  155. # establish fresh connection
  156. with celery.connection_for_write() as conn:
  157. ...
  158. """
  159. return self._get_app().connection_for_write()
  160. def get_publisher(self, connection=None, exchange=None,
  161. exchange_type=None, **options):
  162. """Deprecated method to get the task publisher (now called producer).
  163. Should be replaced with :class:`kombu.Producer`:
  164. .. code-block:: python
  165. with app.connection_for_write() as conn:
  166. with app.amqp.Producer(conn) as prod:
  167. my_task.apply_async(producer=prod)
  168. or event better is to use the :class:`@amqp.producer_pool`:
  169. .. code-block:: python
  170. with app.producer_or_acquire() as prod:
  171. my_task.apply_async(producer=prod)
  172. """
  173. exchange = self.exchange if exchange is None else exchange
  174. if exchange_type is None:
  175. exchange_type = self.exchange_type
  176. connection = connection or self.establish_connection()
  177. return self._get_app().amqp.Producer(
  178. connection,
  179. exchange=exchange and Exchange(exchange, exchange_type),
  180. routing_key=self.routing_key, **options
  181. )
  182. @classmethod
  183. def get_consumer(self, connection=None, queues=None, **kwargs):
  184. """Deprecated method used to get consumer for the queue
  185. this task is sent to.
  186. Should be replaced with :class:`@amqp.TaskConsumer` instead:
  187. """
  188. Q = self._get_app().amqp
  189. connection = connection or self.establish_connection()
  190. if queues is None:
  191. queues = Q.queues[self.queue] if self.queue else Q.default_queue
  192. return Q.TaskConsumer(connection, queues, **kwargs)
  193. class PeriodicTask(Task):
  194. """A periodic task is a task that adds itself to the
  195. :setting:`beat_schedule` setting."""
  196. abstract = True
  197. ignore_result = True
  198. relative = False
  199. options = None
  200. compat = True
  201. def __init__(self):
  202. if not hasattr(self, 'run_every'):
  203. raise NotImplementedError(
  204. 'Periodic tasks must have a run_every attribute')
  205. self.run_every = maybe_schedule(self.run_every, self.relative)
  206. super(PeriodicTask, self).__init__()
  207. @classmethod
  208. def on_bound(cls, app):
  209. app.conf.beat_schedule[cls.name] = {
  210. 'task': cls.name,
  211. 'schedule': cls.run_every,
  212. 'args': (),
  213. 'kwargs': {},
  214. 'options': cls.options or {},
  215. 'relative': cls.relative,
  216. }
  217. def task(*args, **kwargs):
  218. """Deprecated decorator, please use :func:`celery.task`."""
  219. return current_app.task(*args, **dict({'base': Task}, **kwargs))
  220. def periodic_task(*args, **options):
  221. """Deprecated decorator, please use :setting:`beat_schedule`."""
  222. return task(**dict({'base': PeriodicTask}, **options))