base.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.base
  4. ~~~~~~~~~~~~~~~~
  5. The task implementation has been moved to :mod:`celery.app.task`.
  6. This contains the backward compatible Task class used in the old API,
  7. and shouldn't be used in new applications.
  8. """
  9. from __future__ import absolute_import
  10. from kombu import Exchange
  11. from celery import current_app
  12. from celery.app.task import Context, Task as BaseTask, _reprtask
  13. from celery.five import class_property, reclassmethod, with_metaclass
  14. from celery.local import Proxy
  15. from celery.schedules import maybe_schedule
  16. from celery.utils.log import get_task_logger
  17. __all__ = ['Context', 'Task', 'TaskType', 'PeriodicTask', 'task']
  18. #: list of methods that must be classmethods in the old API.
  19. _COMPAT_CLASSMETHODS = (
  20. 'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
  21. 'signature_from_request', 'signature',
  22. 'AsyncResult', 'subtask', '_get_request', '_get_exec_options',
  23. )
  24. class _CompatShared(object):
  25. def __init__(self, name, cons):
  26. self.name = name
  27. self.cons = cons
  28. def __hash__(self):
  29. return hash(self.name)
  30. def __repr__(self):
  31. return '<OldTask: %r>' % (self.name, )
  32. def __call__(self, app):
  33. return self.cons(app)
  34. class TaskType(type):
  35. """Meta class for tasks.
  36. Automatically registers the task in the task registry (except
  37. if the :attr:`Task.abstract`` attribute is set).
  38. If no :attr:`Task.name` attribute is provided, then the name is generated
  39. from the module and class name.
  40. """
  41. _creation_count = {} # used by old non-abstract task classes
  42. def __new__(cls, name, bases, attrs):
  43. new = super(TaskType, cls).__new__
  44. task_module = attrs.get('__module__') or '__main__'
  45. # - Abstract class: abstract attribute should not be inherited.
  46. abstract = attrs.pop('abstract', None)
  47. if abstract or not attrs.get('autoregister', True):
  48. return new(cls, name, bases, attrs)
  49. # The 'app' attribute is now a property, with the real app located
  50. # in the '_app' attribute. Previously this was a regular attribute,
  51. # so we should support classes defining it.
  52. app = attrs.pop('_app', None) or attrs.pop('app', None)
  53. # Attempt to inherit app from one the bases
  54. if not isinstance(app, Proxy) and app is None:
  55. for base in bases:
  56. if getattr(base, '_app', None):
  57. app = base._app
  58. break
  59. else:
  60. app = current_app._get_current_object()
  61. attrs['_app'] = app
  62. # - Automatically generate missing/empty name.
  63. task_name = attrs.get('name')
  64. if not task_name:
  65. attrs['name'] = task_name = app.gen_task_name(name, task_module)
  66. if not attrs.get('_decorated'):
  67. # non decorated tasks must also be shared in case
  68. # an app is created multiple times due to modules
  69. # imported under multiple names.
  70. # Hairy stuff, here to be compatible with 2.x.
  71. # People should not use non-abstract task classes anymore,
  72. # use the task decorator.
  73. from celery._state import connect_on_app_finalize
  74. unique_name = '.'.join([task_module, name])
  75. if unique_name not in cls._creation_count:
  76. # the creation count is used as a safety
  77. # so that the same task is not added recursively
  78. # to the set of constructors.
  79. cls._creation_count[unique_name] = 1
  80. connect_on_app_finalize(_CompatShared(
  81. unique_name,
  82. lambda app: TaskType.__new__(cls, name, bases,
  83. dict(attrs, _app=app)),
  84. ))
  85. # - Create and register class.
  86. # Because of the way import happens (recursively)
  87. # we may or may not be the first time the task tries to register
  88. # with the framework. There should only be one class for each task
  89. # name, so we always return the registered version.
  90. tasks = app._tasks
  91. if task_name not in tasks:
  92. tasks.register(new(cls, name, bases, attrs))
  93. instance = tasks[task_name]
  94. instance.bind(app)
  95. return instance.__class__
  96. def __repr__(cls):
  97. return _reprtask(cls)
  98. @with_metaclass(TaskType)
  99. class Task(BaseTask):
  100. """Deprecated Task base class.
  101. Modern applications should use :class:`celery.Task` instead.
  102. """
  103. abstract = True
  104. __bound__ = False
  105. __v2_compat__ = True
  106. # - Deprecated compat. attributes -:
  107. queue = None
  108. routing_key = None
  109. exchange = None
  110. exchange_type = None
  111. delivery_mode = None
  112. mandatory = False # XXX deprecated
  113. immediate = False # XXX deprecated
  114. priority = None
  115. type = 'regular'
  116. disable_error_emails = False
  117. from_config = BaseTask.from_config + (
  118. ('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),
  119. ('delivery_mode', 'CELERY_DEFAULT_DELIVERY_MODE'),
  120. )
  121. # In old Celery the @task decorator didn't exist, so one would create
  122. # classes instead and use them directly (e.g. MyTask.apply_async()).
  123. # the use of classmethods was a hack so that it was not necessary
  124. # to instantiate the class before using it, but it has only
  125. # given us pain (like all magic).
  126. for name in _COMPAT_CLASSMETHODS:
  127. locals()[name] = reclassmethod(getattr(BaseTask, name))
  128. @class_property
  129. def request(cls):
  130. return cls._get_request()
  131. @class_property
  132. def backend(cls):
  133. if cls._backend is None:
  134. return cls.app.backend
  135. return cls._backend
  136. @backend.setter
  137. def backend(cls, value): # noqa
  138. cls._backend = value
  139. @classmethod
  140. def get_logger(self, **kwargs):
  141. return get_task_logger(self.name)
  142. @classmethod
  143. def establish_connection(self):
  144. """Deprecated method used to get a broker connection.
  145. Should be replaced with :meth:`@Celery.connection`
  146. instead, or by acquiring connections from the connection pool:
  147. .. code-block:: python
  148. # using the connection pool
  149. with celery.pool.acquire(block=True) as conn:
  150. ...
  151. # establish fresh connection
  152. with celery.connection() as conn:
  153. ...
  154. """
  155. return self._get_app().connection()
  156. def get_publisher(self, connection=None, exchange=None,
  157. exchange_type=None, **options):
  158. """Deprecated method to get the task publisher (now called producer).
  159. Should be replaced with :class:`@kombu.Producer`:
  160. .. code-block:: python
  161. with app.connection() as conn:
  162. with app.amqp.Producer(conn) as prod:
  163. my_task.apply_async(producer=prod)
  164. or event better is to use the :class:`@amqp.producer_pool`:
  165. .. code-block:: python
  166. with app.producer_or_acquire() as prod:
  167. my_task.apply_async(producer=prod)
  168. """
  169. exchange = self.exchange if exchange is None else exchange
  170. if exchange_type is None:
  171. exchange_type = self.exchange_type
  172. connection = connection or self.establish_connection()
  173. return self._get_app().amqp.Producer(
  174. connection,
  175. exchange=exchange and Exchange(exchange, exchange_type),
  176. routing_key=self.routing_key, **options
  177. )
  178. @classmethod
  179. def get_consumer(self, connection=None, queues=None, **kwargs):
  180. """Deprecated method used to get consumer for the queue
  181. this task is sent to.
  182. Should be replaced with :class:`@amqp.TaskConsumer` instead:
  183. """
  184. Q = self._get_app().amqp
  185. connection = connection or self.establish_connection()
  186. if queues is None:
  187. queues = Q.queues[self.queue] if self.queue else Q.default_queue
  188. return Q.TaskConsumer(connection, queues, **kwargs)
  189. class PeriodicTask(Task):
  190. """A periodic task is a task that adds itself to the
  191. :setting:`CELERYBEAT_SCHEDULE` setting."""
  192. abstract = True
  193. ignore_result = True
  194. relative = False
  195. options = None
  196. compat = True
  197. def __init__(self):
  198. if not hasattr(self, 'run_every'):
  199. raise NotImplementedError(
  200. 'Periodic tasks must have a run_every attribute')
  201. self.run_every = maybe_schedule(self.run_every, self.relative)
  202. super(PeriodicTask, self).__init__()
  203. @classmethod
  204. def on_bound(cls, app):
  205. app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
  206. 'task': cls.name,
  207. 'schedule': cls.run_every,
  208. 'args': (),
  209. 'kwargs': {},
  210. 'options': cls.options or {},
  211. 'relative': cls.relative,
  212. }
  213. def task(*args, **kwargs):
  214. """Deprecated decorator, please use :func:`celery.task`."""
  215. return current_app.task(*args, **dict({'base': Task}, **kwargs))
  216. def periodic_task(*args, **options):
  217. """Deprecated decorator, please use :setting:`CELERYBEAT_SCHEDULE`."""
  218. return task(**dict({'base': PeriodicTask}, **options))