123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- # -*- coding: utf-8 -*-
- """
- celery.task.base
- ~~~~~~~~~~~~~~~~
- The task implementation has been moved to :mod:`celery.app.task`.
- This contains the backward compatible Task class used in the old API,
- and shouldn't be used in new applications.
- """
- from __future__ import absolute_import
- from kombu import Exchange
- from celery import current_app
- from celery.__compat__ import class_property, reclassmethod
- from celery.app.task import Context, TaskType, Task as BaseTask # noqa
- from celery.schedules import maybe_schedule
- from celery.utils.log import get_task_logger
- #: list of methods that must be classmethods in the old API.
- _COMPAT_CLASSMETHODS = (
- 'delay', 'apply_async', 'retry', 'apply',
- 'AsyncResult', 'subtask', '_get_request',
- )
- class Task(BaseTask):
- """Deprecated Task base class.
- Modern applications should use :class:`celery.Task` instead.
- """
- abstract = True
- __bound__ = False
- #- Deprecated compat. attributes -:
- queue = None
- routing_key = None
- exchange = None
- exchange_type = None
- delivery_mode = None
- mandatory = False
- immediate = False
- priority = None
- type = 'regular'
- disable_error_emails = False
- accept_magic_kwargs = False
- from_config = BaseTask.from_config + (
- ('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),
- ('delivery_mode', 'CELERY_DEFAULT_DELIVERY_MODE'),
- )
- # In old Celery the @task decorator didn't exist, so one would create
- # classes instead and use them directly (e.g. MyTask.apply_async()).
- # the use of classmethods was a hack so that it was not necessary
- # to instantiate the class before using it, but it has only
- # given us pain (like all magic).
- for name in _COMPAT_CLASSMETHODS:
- locals()[name] = reclassmethod(getattr(BaseTask, name))
- @class_property
- @classmethod
- def request(cls):
- return cls._get_request()
- @classmethod
- def get_logger(self, **kwargs):
- return get_task_logger(self.name)
- @classmethod
- def establish_connection(self):
- """Deprecated method used to get a broker connection.
- Should be replaced with :meth:`@Celery.connection`
- instead, or by acquiring connections from the connection pool:
- .. code-block:: python
- # using the connection pool
- with celery.pool.acquire(block=True) as conn:
- ...
- # establish fresh connection
- with celery.connection() as conn:
- ...
- """
- return self._get_app().connection()
- def get_publisher(self, connection=None, exchange=None,
- exchange_type=None, **options):
- """Deprecated method to get the task publisher (now called producer).
- Should be replaced with :class:`@amqp.TaskProducer`:
- .. code-block:: python
- with celery.connection() as conn:
- with celery.amqp.TaskProducer(conn) as prod:
- my_task.apply_async(producer=prod)
- """
- exchange = self.exchange if exchange is None else exchange
- if exchange_type is None:
- exchange_type = self.exchange_type
- connection = connection or self.establish_connection()
- return self._get_app().amqp.TaskProducer(connection,
- exchange=exchange and Exchange(exchange, exchange_type),
- routing_key=self.routing_key, **options)
- @classmethod
- def get_consumer(self, connection=None, queues=None, **kwargs):
- """Deprecated method used to get consumer for the queue
- this task is sent to.
- Should be replaced with :class:`@amqp.TaskConsumer` instead:
- """
- Q = self._get_app().amqp
- connection = connection or self.establish_connection()
- if queues is None:
- queues = Q.queues[self.queue] if self.queue else Q.default_queue
- return Q.TaskConsumer(connection, queues, **kwargs)
- class PeriodicTask(Task):
- """A periodic task is a task that adds itself to the
- :setting:`CELERYBEAT_SCHEDULE` setting."""
- abstract = True
- ignore_result = True
- relative = False
- options = None
- compat = True
- def __init__(self):
- if not hasattr(self, 'run_every'):
- raise NotImplementedError(
- 'Periodic tasks must have a run_every attribute')
- self.run_every = maybe_schedule(self.run_every, self.relative)
- super(PeriodicTask, self).__init__()
- @classmethod
- def on_bound(cls, app):
- app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
- 'task': cls.name,
- 'schedule': cls.run_every,
- 'args': (),
- 'kwargs': {},
- 'options': cls.options or {},
- 'relative': cls.relative,
- }
- def task(*args, **kwargs):
- """Decorator to create a task class out of any callable.
- **Examples**
- .. code-block:: python
- @task()
- def refresh_feed(url):
- return Feed.objects.get(url=url).refresh()
- With setting extra options and using retry.
- .. code-block:: python
- @task(max_retries=10)
- def refresh_feed(url):
- try:
- return Feed.objects.get(url=url).refresh()
- except socket.error as exc:
- refresh_feed.retry(exc=exc)
- Calling the resulting task:
- >>> refresh_feed('http://example.com/rss') # Regular
- <Feed: http://example.com/rss>
- >>> refresh_feed.delay('http://example.com/rss') # Async
- <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
- """
- return current_app.task(*args, **dict({'accept_magic_kwargs': False,
- 'base': Task}, **kwargs))
- def periodic_task(*args, **options):
- """Decorator to create a task class out of any callable.
- .. admonition:: Examples
- .. code-block:: python
- @task()
- def refresh_feed(url):
- return Feed.objects.get(url=url).refresh()
- With setting extra options and using retry.
- .. code-block:: python
- from celery.task import current
- @task(exchange='feeds')
- def refresh_feed(url):
- try:
- return Feed.objects.get(url=url).refresh()
- except socket.error as exc:
- current.retry(exc=exc)
- Calling the resulting task:
- >>> refresh_feed('http://example.com/rss') # Regular
- <Feed: http://example.com/rss>
- >>> refresh_feed.delay('http://example.com/rss') # Async
- <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
- """
- return task(**dict({'base': PeriodicTask}, **options))
|