123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- # -*- coding: utf-8 -*-
- """
- celery.task.base
- ~~~~~~~~~~~~~~~~
- The task implementation has been moved to :mod:`celery.app.task`.
- This contains the backward compatible Task class used in the old API,
- and shouldn't be used in new applications.
- """
- from __future__ import absolute_import
- from kombu import Exchange
- from celery import current_app
- from celery.app.task import Context, Task as BaseTask, _reprtask
- from celery.five import class_property, reclassmethod, with_metaclass
- from celery.local import Proxy
- from celery.schedules import maybe_schedule
- from celery.utils.log import get_task_logger
- __all__ = ['Context', 'Task', 'TaskType', 'PeriodicTask', 'task']
- #: list of methods that must be classmethods in the old API.
- _COMPAT_CLASSMETHODS = (
- 'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
- 'signature_from_request', 'signature',
- 'AsyncResult', 'subtask', '_get_request', '_get_exec_options',
- )
- class _CompatShared(object):
- def __init__(self, name, cons):
- self.name = name
- self.cons = cons
- def __hash__(self):
- return hash(self.name)
- def __repr__(self):
- return '<OldTask: %r>' % (self.name, )
- def __call__(self, app):
- return self.cons(app)
- class TaskType(type):
- """Meta class for tasks.
- Automatically registers the task in the task registry (except
- if the :attr:`Task.abstract`` attribute is set).
- If no :attr:`Task.name` attribute is provided, then the name is generated
- from the module and class name.
- """
- _creation_count = {} # used by old non-abstract task classes
- def __new__(cls, name, bases, attrs):
- new = super(TaskType, cls).__new__
- task_module = attrs.get('__module__') or '__main__'
- # - Abstract class: abstract attribute should not be inherited.
- abstract = attrs.pop('abstract', None)
- if abstract or not attrs.get('autoregister', True):
- return new(cls, name, bases, attrs)
- # The 'app' attribute is now a property, with the real app located
- # in the '_app' attribute. Previously this was a regular attribute,
- # so we should support classes defining it.
- app = attrs.pop('_app', None) or attrs.pop('app', None)
- # Attempt to inherit app from one the bases
- if not isinstance(app, Proxy) and app is None:
- for base in bases:
- if getattr(base, '_app', None):
- app = base._app
- break
- else:
- app = current_app._get_current_object()
- attrs['_app'] = app
- # - Automatically generate missing/empty name.
- task_name = attrs.get('name')
- if not task_name:
- attrs['name'] = task_name = app.gen_task_name(name, task_module)
- if not attrs.get('_decorated'):
- # non decorated tasks must also be shared in case
- # an app is created multiple times due to modules
- # imported under multiple names.
- # Hairy stuff, here to be compatible with 2.x.
- # People should not use non-abstract task classes anymore,
- # use the task decorator.
- from celery._state import connect_on_app_finalize
- unique_name = '.'.join([task_module, name])
- if unique_name not in cls._creation_count:
- # the creation count is used as a safety
- # so that the same task is not added recursively
- # to the set of constructors.
- cls._creation_count[unique_name] = 1
- connect_on_app_finalize(_CompatShared(
- unique_name,
- lambda app: TaskType.__new__(cls, name, bases,
- dict(attrs, _app=app)),
- ))
- # - Create and register class.
- # Because of the way import happens (recursively)
- # we may or may not be the first time the task tries to register
- # with the framework. There should only be one class for each task
- # name, so we always return the registered version.
- tasks = app._tasks
- if task_name not in tasks:
- tasks.register(new(cls, name, bases, attrs))
- instance = tasks[task_name]
- instance.bind(app)
- return instance.__class__
- def __repr__(cls):
- return _reprtask(cls)
- @with_metaclass(TaskType)
- class Task(BaseTask):
- """Deprecated Task base class.
- Modern applications should use :class:`celery.Task` instead.
- """
- abstract = True
- __bound__ = False
- __v2_compat__ = True
- # - Deprecated compat. attributes -:
- queue = None
- routing_key = None
- exchange = None
- exchange_type = None
- delivery_mode = None
- mandatory = False # XXX deprecated
- immediate = False # XXX deprecated
- priority = None
- type = 'regular'
- disable_error_emails = False
- from_config = BaseTask.from_config + (
- ('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),
- ('delivery_mode', 'CELERY_DEFAULT_DELIVERY_MODE'),
- )
- # In old Celery the @task decorator didn't exist, so one would create
- # classes instead and use them directly (e.g. MyTask.apply_async()).
- # the use of classmethods was a hack so that it was not necessary
- # to instantiate the class before using it, but it has only
- # given us pain (like all magic).
- for name in _COMPAT_CLASSMETHODS:
- locals()[name] = reclassmethod(getattr(BaseTask, name))
- @class_property
- def request(cls):
- return cls._get_request()
- @class_property
- def backend(cls):
- if cls._backend is None:
- return cls.app.backend
- return cls._backend
- @backend.setter
- def backend(cls, value): # noqa
- cls._backend = value
- @classmethod
- def get_logger(self, **kwargs):
- return get_task_logger(self.name)
- @classmethod
- def establish_connection(self):
- """Deprecated method used to get a broker connection.
- Should be replaced with :meth:`@Celery.connection`
- instead, or by acquiring connections from the connection pool:
- .. code-block:: python
- # using the connection pool
- with celery.pool.acquire(block=True) as conn:
- ...
- # establish fresh connection
- with celery.connection() as conn:
- ...
- """
- return self._get_app().connection()
- def get_publisher(self, connection=None, exchange=None,
- exchange_type=None, **options):
- """Deprecated method to get the task publisher (now called producer).
- Should be replaced with :class:`@kombu.Producer`:
- .. code-block:: python
- with app.connection() as conn:
- with app.amqp.Producer(conn) as prod:
- my_task.apply_async(producer=prod)
- or event better is to use the :class:`@amqp.producer_pool`:
- .. code-block:: python
- with app.producer_or_acquire() as prod:
- my_task.apply_async(producer=prod)
- """
- exchange = self.exchange if exchange is None else exchange
- if exchange_type is None:
- exchange_type = self.exchange_type
- connection = connection or self.establish_connection()
- return self._get_app().amqp.Producer(
- connection,
- exchange=exchange and Exchange(exchange, exchange_type),
- routing_key=self.routing_key, **options
- )
- @classmethod
- def get_consumer(self, connection=None, queues=None, **kwargs):
- """Deprecated method used to get consumer for the queue
- this task is sent to.
- Should be replaced with :class:`@amqp.TaskConsumer` instead:
- """
- Q = self._get_app().amqp
- connection = connection or self.establish_connection()
- if queues is None:
- queues = Q.queues[self.queue] if self.queue else Q.default_queue
- return Q.TaskConsumer(connection, queues, **kwargs)
- class PeriodicTask(Task):
- """A periodic task is a task that adds itself to the
- :setting:`CELERYBEAT_SCHEDULE` setting."""
- abstract = True
- ignore_result = True
- relative = False
- options = None
- compat = True
- def __init__(self):
- if not hasattr(self, 'run_every'):
- raise NotImplementedError(
- 'Periodic tasks must have a run_every attribute')
- self.run_every = maybe_schedule(self.run_every, self.relative)
- super(PeriodicTask, self).__init__()
- @classmethod
- def on_bound(cls, app):
- app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
- 'task': cls.name,
- 'schedule': cls.run_every,
- 'args': (),
- 'kwargs': {},
- 'options': cls.options or {},
- 'relative': cls.relative,
- }
- def task(*args, **kwargs):
- """Deprecated decorator, please use :func:`celery.task`."""
- return current_app.task(*args, **dict({'base': Task}, **kwargs))
- def periodic_task(*args, **options):
- """Deprecated decorator, please use :setting:`CELERYBEAT_SCHEDULE`."""
- return task(**dict({'base': PeriodicTask}, **options))
|