task.py 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067
  1. # -*- coding: utf-8 -*-
  2. """Task implementation: request context and the task base class."""
  3. import sys
  4. from datetime import datetime
  5. from typing import (
  6. Any, Awaitable, Callable, Iterable, Mapping, Sequence, Tuple, Union,
  7. )
  8. from billiard.einfo import ExceptionInfo
  9. from kombu.exceptions import OperationalError
  10. from kombu.types import ProducerT
  11. from kombu.utils.uuid import uuid
  12. from celery import current_app, group
  13. from celery import states
  14. from celery._state import _task_stack
  15. from celery.canvas import signature
  16. from celery.exceptions import Ignore, MaxRetriesExceededError, Reject, Retry
  17. from celery.local import class_property
  18. from celery.result import EagerResult
  19. from celery.types import (
  20. AppT, BackendT, ResultT, SignatureT, TaskT, TracerT, WorkerConsumerT,
  21. )
  22. from celery.utils import abstract
  23. from celery.utils.functional import mattrgetter, maybe_list
  24. from celery.utils.imports import instantiate
  25. from celery.utils.threads import LocalStack
  26. from .annotations import resolve_all as resolve_all_annotations
  27. from .registry import _unpickle_task_v2
  28. from .utils import appstr
  29. __all__ = ['Context', 'Task']
  30. #: extracts attributes related to publishing a message from an object.
  31. extract_exec_options = mattrgetter(
  32. 'queue', 'routing_key', 'exchange', 'priority', 'expires',
  33. 'serializer', 'delivery_mode', 'compression', 'time_limit',
  34. 'soft_time_limit', 'immediate', 'mandatory', # imm+man is deprecated
  35. )
  36. # We take __repr__ very seriously around here ;)
  37. R_BOUND_TASK = '<class {0.__name__} of {app}{flags}>'
  38. R_UNBOUND_TASK = '<unbound {0.__name__}{flags}>'
  39. R_SELF_TASK = '<@task {0.name} bound to other {0.__self__}>'
  40. R_INSTANCE = '<@task: {0.name} of {app}{flags}>'
  41. #: Here for backwards compatibility as tasks no longer use a custom meta-class.
  42. TaskType = type
  43. def _strflags(flags: Sequence, default: str = '') -> str:
  44. if flags:
  45. return ' ({0})'.format(', '.join(flags))
  46. return default
  47. def _reprtask(task: TaskT, fmt: str = None, flags: Sequence = None) -> str:
  48. flags = list(flags) if flags is not None else []
  49. if not fmt:
  50. fmt = R_BOUND_TASK if task._app else R_UNBOUND_TASK
  51. return fmt.format(
  52. task, flags=_strflags(flags),
  53. app=appstr(task._app) if task._app else None,
  54. )
  55. class Context:
  56. """Task request variables (Task.request)."""
  57. logfile: str = None
  58. loglevel: int = None
  59. hostname: str = None
  60. id: str = None
  61. args: Sequence = None
  62. kwargs: Mapping = None
  63. retries: int = 0
  64. eta: datetime = None
  65. expires: Union[float, datetime] = None
  66. is_eager: bool = False
  67. headers: Mapping = None
  68. delivery_info: Mapping = None
  69. reply_to: str = None
  70. root_id: str = None
  71. parent_id: str = None
  72. correlation_id: str = None
  73. # compat alias to group
  74. taskset: str = None
  75. group: str = None
  76. chord: SignatureT = None
  77. chain: Sequence[SignatureT] = None
  78. utc: bool = None
  79. called_directly: bool = True
  80. callbacks: Sequence[SignatureT] = None
  81. errbacks: Sequence[SignatureT] = None
  82. timelimit: Tuple[float, float] = None
  83. origin: str = None
  84. # see property
  85. _children: Sequence[ResultT] = None
  86. _protected: int = 0
  87. def __init__(self, *args, **kwargs) -> None:
  88. self.update(*args, **kwargs)
  89. def update(self, *args, **kwargs) -> None:
  90. self.__dict__.update(*args, **kwargs)
  91. def clear(self) -> None:
  92. self.__dict__.clear()
  93. def get(self, key: str, default: Any = None) -> Any:
  94. return getattr(self, key, default)
  95. def __repr__(self) -> str:
  96. return '<Context: {0!r}>'.format(vars(self))
  97. def as_execution_options(self) -> Mapping:
  98. limit_hard, limit_soft = self.timelimit or (None, None)
  99. return {
  100. 'task_id': self.id,
  101. 'root_id': self.root_id,
  102. 'parent_id': self.parent_id,
  103. 'group_id': self.group,
  104. 'chord': self.chord,
  105. 'chain': self.chain,
  106. 'link': self.callbacks,
  107. 'link_error': self.errbacks,
  108. 'expires': self.expires,
  109. 'soft_time_limit': limit_soft,
  110. 'time_limit': limit_hard,
  111. 'headers': self.headers,
  112. 'retries': self.retries,
  113. 'reply_to': self.reply_to,
  114. 'origin': self.origin,
  115. }
  116. @property
  117. def children(self) -> Sequence[ResultT]:
  118. # children must be an empy list for every thread
  119. if self._children is None:
  120. self._children = []
  121. return self._children
  122. @abstract.CallableTask.register
  123. class Task:
  124. """Task base class.
  125. Note:
  126. When called tasks apply the :meth:`run` method. This method must
  127. be defined by all tasks (that is unless the :meth:`__call__` method
  128. is overridden).
  129. """
  130. __trace__: TracerT = None
  131. __v2_compat__ = False # set by old base in celery.task.base
  132. MaxRetriesExceededError = MaxRetriesExceededError
  133. OperationalError = OperationalError
  134. #: Execution strategy used, or the qualified name of one.
  135. Strategy = 'celery.worker.strategy:default'
  136. #: This is the instance bound to if the task is a method of a class.
  137. __self__: Any = None
  138. #: The application instance associated with this task class.
  139. _app: AppT = None
  140. #: Name of the task.
  141. name: str = None
  142. #: Enable argument checking.
  143. #: You can set this to false if you don't want the signature to be
  144. #: checked when calling the task.
  145. #: Defaults to :attr:`app.strict_typing <@Celery.strict_typing>`.
  146. typing: bool = None
  147. #: Maximum number of retries before giving up. If set to :const:`None`,
  148. #: it will **never** stop retrying.
  149. max_retries: int = 3
  150. #: Default time in seconds before a retry of the task should be
  151. #: executed. 3 minutes by default.
  152. default_retry_delay = 180.0
  153. #: Rate limit for this task type. Examples: :const:`None` (no rate
  154. #: limit), `'100/s'` (hundred tasks a second), `'100/m'` (hundred tasks
  155. #: a minute),`'100/h'` (hundred tasks an hour)
  156. rate_limit: Union[int, str] = None
  157. #: If enabled the worker won't store task state and return values
  158. #: for this task. Defaults to the :setting:`task_ignore_result`
  159. #: setting.
  160. ignore_result: bool = None
  161. #: If enabled the request will keep track of subtasks started by
  162. #: this task, and this information will be sent with the result
  163. #: (``result.children``).
  164. trail: bool = True
  165. #: If enabled the worker will send monitoring events related to
  166. #: this task (but only if the worker is configured to send
  167. #: task related events).
  168. #: Note that this has no effect on the task-failure event case
  169. #: where a task is not registered (as it will have no task class
  170. #: to check this flag).
  171. send_events: bool = True
  172. #: When enabled errors will be stored even if the task is otherwise
  173. #: configured to ignore results.
  174. store_errors_even_if_ignored: bool = None
  175. #: The name of a serializer that are registered with
  176. #: :mod:`kombu.serialization.registry`. Default is `'pickle'`.
  177. serializer: str = None
  178. #: Hard time limit.
  179. #: Defaults to the :setting:`task_time_limit` setting.
  180. time_limit: float = None
  181. #: Soft time limit.
  182. #: Defaults to the :setting:`task_soft_time_limit` setting.
  183. soft_time_limit: float = None
  184. #: The result store backend used for this task.
  185. backend: BackendT = None
  186. #: If disabled this task won't be registered automatically.
  187. autoregister: bool = True
  188. #: If enabled the task will report its status as 'started' when the task
  189. #: is executed by a worker. Disabled by default as the normal behavior
  190. #: is to not report that level of granularity. Tasks are either pending,
  191. #: finished, or waiting to be retried.
  192. #:
  193. #: Having a 'started' status can be useful for when there are long
  194. #: running tasks and there's a need to report what task is currently
  195. #: running.
  196. #:
  197. #: The application default can be overridden using the
  198. #: :setting:`task_track_started` setting.
  199. track_started: bool = None
  200. #: When enabled messages for this task will be acknowledged **after**
  201. #: the task has been executed, and not *just before* (the
  202. #: default behavior).
  203. #:
  204. #: Please note that this means the task may be executed twice if the
  205. #: worker crashes mid execution.
  206. #:
  207. #: The application default can be overridden with the
  208. #: :setting:`task_acks_late` setting.
  209. acks_late: bool = None
  210. #: Even if :attr:`acks_late` is enabled, the worker will
  211. #: acknowledge tasks when the worker process executing them abruptly
  212. #: exits or is signaled (e.g., :sig:`KILL`/:sig:`INT`, etc).
  213. #:
  214. #: Setting this to true allows the message to be re-queued instead,
  215. #: so that the task will execute again by the same worker, or another
  216. #: worker.
  217. #:
  218. #: Warning: Enabling this can cause message loops; make sure you know
  219. #: what you're doing.
  220. reject_on_worker_lost: bool = None
  221. #: Tuple of expected exceptions.
  222. #:
  223. #: These are errors that are expected in normal operation
  224. #: and that shouldn't be regarded as a real error by the worker.
  225. #: Currently this means that the state will be updated to an error
  226. #: state, but the worker won't log the event as an error.
  227. throws: Tuple[type] = ()
  228. #: Default task expiry time.
  229. expires: float = None
  230. #: Max length of result representation used in logs and events.
  231. resultrepr_maxsize: int = 1024
  232. #: Task request stack, the current request will be the topmost.
  233. request_stack: LocalStack = None
  234. #: Some may expect a request to exist even if the task hasn't been
  235. #: called. This should probably be deprecated.
  236. _default_request: Context = None
  237. #: Deprecated attribute ``abstract`` here for compatibility.
  238. abstract: bool = True
  239. _exec_options: Mapping = None
  240. __bound__: bool = False
  241. from_config: Tuple[Tuple[str, str], ...] = (
  242. ('serializer', 'task_serializer'),
  243. ('rate_limit', 'task_default_rate_limit'),
  244. ('track_started', 'task_track_started'),
  245. ('acks_late', 'task_acks_late'),
  246. ('reject_on_worker_lost', 'task_reject_on_worker_lost'),
  247. ('ignore_result', 'task_ignore_result'),
  248. ('store_errors_even_if_ignored', 'task_store_errors_even_if_ignored'),
  249. )
  250. # set by backend property.
  251. _backend: BackendT = None
  252. # - Tasks are lazily bound, so that configuration is not set
  253. # - until the task is actually used
  254. @classmethod
  255. def bind(cls, app: AppT) -> AppT:
  256. was_bound, cls.__bound__ = cls.__bound__, True
  257. cls._app = app
  258. conf = app.conf
  259. cls._exec_options = None # clear option cache
  260. if cls.typing is None:
  261. cls.typing = app.strict_typing
  262. for attr_name, config_name in cls.from_config:
  263. if getattr(cls, attr_name, None) is None:
  264. setattr(cls, attr_name, conf[config_name])
  265. # decorate with annotations from config.
  266. if not was_bound:
  267. cls.annotate()
  268. from celery.utils.threads import LocalStack
  269. cls.request_stack = LocalStack()
  270. # PeriodicTask uses this to add itself to the PeriodicTask schedule.
  271. cls.on_bound(app)
  272. return app
  273. @classmethod
  274. def on_bound(cls, app: AppT) -> None:
  275. """Called when the task is bound to an app.
  276. Note:
  277. This class method can be defined to do additional actions when
  278. the task class is bound to an app.
  279. """
  280. ...
  281. @classmethod
  282. def _get_app(cls) -> AppT:
  283. if cls._app is None:
  284. cls._app = current_app
  285. if not cls.__bound__:
  286. # The app property's __set__ method is not called
  287. # if Task.app is set (on the class), so must bind on use.
  288. cls.bind(cls._app)
  289. return cls._app
  290. app = class_property(_get_app, bind)
  291. @classmethod
  292. def annotate(cls) -> None:
  293. for d in resolve_all_annotations(cls.app.annotations, cls):
  294. for key, value in d.items():
  295. if key.startswith('@'):
  296. cls.add_around(key[1:], value)
  297. else:
  298. setattr(cls, key, value)
  299. @classmethod
  300. def add_around(cls, attr: str, around: Callable) -> None:
  301. orig = getattr(cls, attr)
  302. if getattr(orig, '__wrapped__', None):
  303. orig = orig.__wrapped__
  304. meth = around(orig)
  305. meth.__wrapped__ = orig
  306. setattr(cls, attr, meth)
  307. def __call__(self, *args, **kwargs) -> Any:
  308. _task_stack.push(self)
  309. self.push_request(args=args, kwargs=kwargs)
  310. try:
  311. # add self if this is a bound task
  312. if self.__self__ is not None:
  313. return self.run(self.__self__, *args, **kwargs)
  314. return self.run(*args, **kwargs)
  315. finally:
  316. self.pop_request()
  317. _task_stack.pop()
  318. def __reduce__(self) -> Tuple:
  319. # - tasks are pickled into the name of the task only, and the reciever
  320. # - simply grabs it from the local registry.
  321. # - in later versions the module of the task is also included,
  322. # - and the receiving side tries to import that module so that
  323. # - it will work even if the task hasn't been registered.
  324. mod = type(self).__module__
  325. mod = mod if mod and mod in sys.modules else None
  326. return (_unpickle_task_v2, (self.name, mod), None)
  327. def run(self, *args, **kwargs) -> Any:
  328. """The body of the task executed by workers."""
  329. raise NotImplementedError('Tasks must define the run method.')
  330. def start_strategy(self, app: AppT, consumer: WorkerConsumerT,
  331. **kwargs) -> Callable:
  332. return instantiate(self.Strategy, self, app, consumer, **kwargs)
  333. def delay(self, *args, **kwargs) -> ResultT:
  334. """Star argument version of :meth:`apply_async`.
  335. Does not support the extra options enabled by :meth:`apply_async`.
  336. Arguments:
  337. *args (Any): Positional arguments passed on to the task.
  338. **kwargs (Any): Keyword arguments passed on to the task.
  339. Returns:
  340. celery.result.AsyncResult: Future promise.
  341. """
  342. return self.apply_async(args, kwargs)
  343. def apply_async(self,
  344. args: Sequence = None,
  345. kwargs: Mapping = None,
  346. task_id: str = None,
  347. producer: ProducerT = None,
  348. link: Sequence[SignatureT] = None,
  349. link_error: Sequence[SignatureT] = None,
  350. shadow: str = None,
  351. **options) -> ResultT:
  352. """Apply tasks asynchronously by sending a message.
  353. Arguments:
  354. args (Tuple): The positional arguments to pass on to the task.
  355. kwargs (Dict): The keyword arguments to pass on to the task.
  356. countdown (float): Number of seconds into the future that the
  357. task should execute. Defaults to immediate execution.
  358. eta (~datetime.datetime): Absolute time and date of when the task
  359. should be executed. May not be specified if `countdown`
  360. is also supplied.
  361. expires (float, ~datetime.datetime): Datetime or
  362. seconds in the future for the task should expire.
  363. The task won't be executed after the expiration time.
  364. shadow (str): Override task name used in logs/monitoring.
  365. Default is retrieved from :meth:`shadow_name`.
  366. connection (kombu.Connection): Re-use existing broker connection
  367. instead of acquiring one from the connection pool.
  368. retry (bool): If enabled sending of the task message will be
  369. retried in the event of connection loss or failure.
  370. Default is taken from the :setting:`task_publish_retry`
  371. setting. Note that you need to handle the
  372. producer/connection manually for this to work.
  373. retry_policy (Mapping): Override the retry policy used.
  374. See the :setting:`task_publish_retry_policy` setting.
  375. queue (str, kombu.Queue): The queue to route the task to.
  376. This must be a key present in :setting:`task_queues`, or
  377. :setting:`task_create_missing_queues` must be
  378. enabled. See :ref:`guide-routing` for more
  379. information.
  380. exchange (str, kombu.Exchange): Named custom exchange to send the
  381. task to. Usually not used in combination with the ``queue``
  382. argument.
  383. routing_key (str): Custom routing key used to route the task to a
  384. worker server. If in combination with a ``queue`` argument
  385. only used to specify custom routing keys to topic exchanges.
  386. priority (int): The task priority, a number between 0 and 9.
  387. Defaults to the :attr:`priority` attribute.
  388. serializer (str): Serialization method to use.
  389. Can be `pickle`, `json`, `yaml`, `msgpack` or any custom
  390. serialization method that's been registered
  391. with :mod:`kombu.serialization.registry`.
  392. Defaults to the :attr:`serializer` attribute.
  393. compression (str): Optional compression method
  394. to use. Can be one of ``zlib``, ``bzip2``,
  395. or any custom compression methods registered with
  396. :func:`kombu.compression.register`.
  397. Defaults to the :setting:`task_compression` setting.
  398. link (~@Signature): A single, or a list of tasks signatures
  399. to apply if the task returns successfully.
  400. link_error (~@Signature): A single, or a list of task signatures
  401. to apply if an error occurs while executing the task.
  402. producer (kombu.Producer): custom producer to use when publishing
  403. the task.
  404. add_to_parent (bool): If set to True (default) and the task
  405. is applied while executing another task, then the result
  406. will be appended to the parent tasks ``request.children``
  407. attribute. Trailing can also be disabled by default using the
  408. :attr:`trail` attribute
  409. headers (Dict): Message headers to be included in the message.
  410. Returns:
  411. ~@AsyncResult: Promise of future evaluation.
  412. Raises:
  413. TypeError: If not enough arguments are passed, or too many
  414. arguments are passed. Note that signature checks may
  415. be disabled by specifying ``@task(typing=False)``.
  416. kombu.exceptions.OperationalError: If a connection to the
  417. transport cannot be made, or if the connection is lost.
  418. Note:
  419. Also supports all keyword arguments supported by
  420. :meth:`kombu.Producer.publish`.
  421. """
  422. if self.typing:
  423. try:
  424. check_arguments = self.__header__
  425. except AttributeError: # pragma: no cover
  426. pass
  427. else:
  428. check_arguments(*(args or ()), **(kwargs or {}))
  429. app = self._get_app()
  430. if app.conf.task_always_eager:
  431. return self.apply(args, kwargs, task_id=task_id or uuid(),
  432. link=link, link_error=link_error, **options)
  433. # add 'self' if this is a "task_method".
  434. if self.__self__ is not None:
  435. args = args if isinstance(args, tuple) else tuple(args or ())
  436. args = (self.__self__,) + args
  437. shadow = shadow or self.shadow_name(args, kwargs, options)
  438. preopts = self._get_exec_options()
  439. options = dict(preopts, **options) if options else preopts
  440. return app.send_task(
  441. self.name, args, kwargs, task_id=task_id, producer=producer,
  442. link=link, link_error=link_error, result_cls=self.AsyncResult,
  443. shadow=shadow, task_type=self,
  444. **options
  445. )
  446. def shadow_name(self,
  447. args: Sequence,
  448. kwargs: Mapping,
  449. options: Mapping) -> str:
  450. """Override for custom task name in worker logs/monitoring.
  451. Example:
  452. .. code-block:: python
  453. from celery.utils.imports import qualname
  454. def shadow_name(task, args, kwargs, options):
  455. return qualname(args[0])
  456. @app.task(shadow_name=shadow_name, serializer='pickle')
  457. def apply_function_async(fun, *args, **kwargs):
  458. return fun(*args, **kwargs)
  459. Arguments:
  460. args (Tuple): Task positional arguments.
  461. kwargs (Dict): Task keyword arguments.
  462. options (Dict): Task execution options.
  463. """
  464. pass
  465. def signature_from_request(self,
  466. request: Context = None,
  467. args: Sequence = None,
  468. kwargs: Mapping = None,
  469. queue: str = None,
  470. **extra_options) -> SignatureT:
  471. request = self.request if request is None else request
  472. args = request.args if args is None else args
  473. kwargs = request.kwargs if kwargs is None else kwargs
  474. options = request.as_execution_options()
  475. if queue:
  476. options['queue'] = queue
  477. else:
  478. delivery_info = request.delivery_info or {}
  479. exchange = delivery_info.get('exchange')
  480. routing_key = delivery_info.get('routing_key')
  481. if exchange == '' and routing_key:
  482. # sent to anon-exchange
  483. options['queue'] = routing_key
  484. else:
  485. options.update(delivery_info)
  486. return self.signature(
  487. args, kwargs, options, type=self, **extra_options
  488. )
  489. subtask_from_request = signature_from_request # XXX compat
  490. def retry(self,
  491. args: Sequence = None,
  492. kwargs: Mapping = None,
  493. exc: Exception = None,
  494. throw: bool = True,
  495. eta: datetime = None,
  496. countdown: float = None,
  497. max_retries: int = None,
  498. **options) -> None:
  499. """Retry the task.
  500. Example:
  501. >>> from imaginary_twitter_lib import Twitter
  502. >>> from proj.celery import app
  503. >>> @app.task(bind=True)
  504. ... def tweet(self, auth, message):
  505. ... twitter = Twitter(oauth=auth)
  506. ... try:
  507. ... twitter.post_status_update(message)
  508. ... except twitter.FailWhale as exc:
  509. ... # Retry in 5 minutes.
  510. ... raise self.retry(countdown=60 * 5, exc=exc)
  511. Note:
  512. Although the task will never return above as `retry` raises an
  513. exception to notify the worker, we use `raise` in front of the
  514. retry to convey that the rest of the block won't be executed.
  515. Arguments:
  516. args (Tuple): Positional arguments to retry with.
  517. kwargs (Dict): Keyword arguments to retry with.
  518. exc (Exception): Custom exception to report when the max restart
  519. limit has been exceeded (default:
  520. :exc:`~@MaxRetriesExceededError`).
  521. If this argument is set and retry is called while
  522. an exception was raised (``sys.exc_info()`` is set)
  523. it will attempt to re-raise the current exception.
  524. If no exception was raised it will raise the ``exc``
  525. argument provided.
  526. countdown (float): Time in seconds to delay the retry for.
  527. eta (~datetime.dateime): Explicit time and date to run the
  528. retry at.
  529. max_retries (int): If set, overrides the default retry limit for
  530. this execution. Changes to this parameter don't propagate to
  531. subsequent task retry attempts. A value of :const:`None`,
  532. means "use the default", so if you want infinite retries you'd
  533. have to set the :attr:`max_retries` attribute of the task to
  534. :const:`None` first.
  535. time_limit (int): If set, overrides the default time limit.
  536. soft_time_limit (int): If set, overrides the default soft
  537. time limit.
  538. throw (bool): If this is :const:`False`, don't raise the
  539. :exc:`~@Retry` exception, that tells the worker to mark
  540. the task as being retried. Note that this means the task
  541. will be marked as failed if the task raises an exception,
  542. or successful if it returns after the retry call.
  543. **options (Any): Extra options to pass on to :meth:`apply_async`.
  544. Raises:
  545. celery.exceptions.Retry:
  546. To tell the worker that the task has been re-sent for retry.
  547. This always happens, unless the `throw` keyword argument
  548. has been explicitly set to :const:`False`, and is considered
  549. normal operation.
  550. """
  551. request = self.request
  552. retries = request.retries + 1
  553. max_retries = self.max_retries if max_retries is None else max_retries
  554. # Not in worker or emulated by (apply/always_eager),
  555. # so just raise the original exception.
  556. if request.called_directly:
  557. # raises orig stack if PyErr_Occurred,
  558. # and augments with exc' if that argument is defined.
  559. exc = exc or Retry('Task can be retried', None)
  560. raise exc from sys.exc_info()[2]
  561. if not eta and countdown is None:
  562. countdown = self.default_retry_delay
  563. is_eager = request.is_eager
  564. S = self.signature_from_request(
  565. request, args, kwargs,
  566. countdown=countdown, eta=eta, retries=retries,
  567. **options
  568. )
  569. if max_retries is not None and retries > max_retries:
  570. if exc:
  571. # On Py3: will augment any current exception with
  572. # the exc' argument provided (raise exc from orig)
  573. raise exc from sys.exc_info()[2]
  574. raise self.MaxRetriesExceededError(
  575. "Can't retry {0}[{1}] args:{2} kwargs:{3}".format(
  576. self.name, request.id, S.args, S.kwargs))
  577. ret = Retry(exc=exc, when=eta or countdown)
  578. if is_eager:
  579. # if task was executed eagerly using apply(),
  580. # then the retry must also be executed eagerly.
  581. S.apply().get()
  582. if throw:
  583. raise ret
  584. return ret
  585. try:
  586. S.apply_async()
  587. except Exception as exc:
  588. raise Reject(exc, requeue=False)
  589. if throw:
  590. raise ret
  591. return ret
  592. def apply(self,
  593. args: Sequence = None,
  594. kwargs: Mapping = None,
  595. link: Sequence[SignatureT] = None,
  596. link_error: Sequence[SignatureT] = None,
  597. task_id: str = None,
  598. retries: int = None,
  599. throw: bool = None,
  600. logfile: str = None,
  601. loglevel: int = None,
  602. headers: Mapping = None,
  603. **options) -> ResultT:
  604. """Execute this task locally, by blocking until the task returns.
  605. Arguments:
  606. args (Tuple): positional arguments passed on to the task.
  607. kwargs (Dict): keyword arguments passed on to the task.
  608. throw (bool): Re-raise task exceptions.
  609. Defaults to the :setting:`task_eager_propagates` setting.
  610. Returns:
  611. celery.result.EagerResult: pre-evaluated result.
  612. """
  613. # trace imports Task, so need to import inline.
  614. from celery.app.trace import build_tracer
  615. app = self._get_app()
  616. args = args or ()
  617. # add 'self' if this is a bound method.
  618. if self.__self__ is not None:
  619. args = (self.__self__,) + tuple(args)
  620. kwargs = kwargs or {}
  621. task_id = task_id or uuid()
  622. retries = retries or 0
  623. if throw is None:
  624. throw = app.conf.task_eager_propagates
  625. # Make sure we get the task instance, not class.
  626. task = app._tasks[self.name]
  627. request = {
  628. 'id': task_id,
  629. 'retries': retries,
  630. 'is_eager': True,
  631. 'logfile': logfile,
  632. 'loglevel': loglevel or 0,
  633. 'callbacks': maybe_list(link),
  634. 'errbacks': maybe_list(link_error),
  635. 'headers': headers,
  636. 'delivery_info': {'is_eager': True},
  637. }
  638. tb = None
  639. tracer = build_tracer(
  640. task.name, task, eager=True,
  641. propagate=throw, app=self._get_app(),
  642. )
  643. ret = tracer(task_id, args, kwargs, request)
  644. retval = ret.retval
  645. if isinstance(retval, ExceptionInfo):
  646. retval, tb = retval.exception, retval.traceback
  647. state = states.SUCCESS if ret.info is None else ret.info.state
  648. return EagerResult(task_id, retval, state, traceback=tb)
  649. def AsyncResult(self, task_id: str, **kwargs) -> ResultT:
  650. """Get AsyncResult instance for this kind of task.
  651. Arguments:
  652. task_id (str): Task id to get result for.
  653. """
  654. return self._get_app().AsyncResult(
  655. task_id, backend=self.backend, **kwargs)
  656. def signature(self, args: Sequence = None,
  657. *starargs, **starkwargs) -> SignatureT:
  658. """Create signature.
  659. Returns:
  660. :class:`~celery.signature`: object for
  661. this task, wrapping arguments and execution options
  662. for a single task invocation.
  663. """
  664. starkwargs.setdefault('app', self.app)
  665. return signature(self, args, *starargs, **starkwargs)
  666. subtask = signature
  667. def s(self, *args, **kwargs) -> SignatureT:
  668. """Create signature.
  669. Shortcut for ``.s(*a, **k) -> .signature(a, k)``.
  670. """
  671. return self.signature(args, kwargs)
  672. def si(self, *args, **kwargs) -> SignatureT:
  673. """Create immutable signature.
  674. Shortcut for ``.si(*a, **k) -> .signature(a, k, immutable=True)``.
  675. """
  676. return self.signature(args, kwargs, immutable=True)
  677. def chunks(self, it: Iterable, n: int) -> SignatureT:
  678. """Create a :class:`~celery.canvas.chunks` task for this task."""
  679. from celery import chunks
  680. return chunks(self.s(), it, n, app=self.app)
  681. def map(self, it: Iterable) -> SignatureT:
  682. """Create a :class:`~celery.canvas.xmap` task from ``it``."""
  683. from celery import xmap
  684. return xmap(self.s(), it, app=self.app)
  685. def starmap(self, it: Iterable) -> SignatureT:
  686. """Create a :class:`~celery.canvas.xstarmap` task from ``it``."""
  687. from celery import xstarmap
  688. return xstarmap(self.s(), it, app=self.app)
  689. def send_event(self, type_: str,
  690. retry: bool = True,
  691. retry_policy: Mapping = None,
  692. **fields) -> Awaitable:
  693. """Send monitoring event message.
  694. This can be used to add custom event types in :pypi:`Flower`
  695. and other monitors.
  696. Arguments:
  697. type_ (str): Type of event, e.g. ``"task-failed"``.
  698. Keyword Arguments:
  699. retry (bool): Retry sending the message
  700. if the connection is lost. Default is taken from the
  701. :setting:`task_publish_retry` setting.
  702. retry_policy (Mapping): Retry settings. Default is taken
  703. from the :setting:`task_publish_retry_policy` setting.
  704. **fields (Any): Map containing information about the event.
  705. Must be JSON serializable.
  706. """
  707. req = self.request
  708. if retry_policy is None:
  709. retry_policy = self.app.conf.task_publish_retry_policy
  710. with self.app.events.default_dispatcher(hostname=req.hostname) as d:
  711. return d.send(
  712. type_,
  713. uuid=req.id, retry=retry, retry_policy=retry_policy, **fields)
  714. def replace(self, sig: SignatureT) -> None:
  715. """Replace this task, with a new task inheriting the task id.
  716. .. versionadded:: 4.0
  717. Arguments:
  718. sig (~@Signature): signature to replace with.
  719. Raises:
  720. ~@Ignore: This is always raised, so the best practice
  721. is to always use ``raise self.replace(...)`` to convey
  722. to the reader that the task won't continue after being replaced.
  723. """
  724. chord = self.request.chord
  725. if 'chord' in sig.options:
  726. if chord:
  727. chord = sig.options['chord'] | chord
  728. else:
  729. chord = sig.options['chord']
  730. if isinstance(sig, group):
  731. sig |= self.app.tasks['celery.accumulate'].s(index=0).set(
  732. chord=chord,
  733. link=self.request.callbacks,
  734. link_error=self.request.errbacks,
  735. )
  736. chord = None
  737. if self.request.chain:
  738. for t in self.request.chain:
  739. sig |= signature(t, app=self.app)
  740. sig.freeze(self.request.id,
  741. group_id=self.request.group,
  742. chord=chord,
  743. root_id=self.request.root_id)
  744. sig.delay()
  745. raise Ignore('Replaced by new task')
  746. def add_to_chord(self, sig: SignatureT,
  747. lazy: bool = False) -> Union[ResultT, SignatureT]:
  748. """Add signature to the chord the current task is a member of.
  749. .. versionadded:: 4.0
  750. Currently only supported by the Redis result backend.
  751. Arguments:
  752. sig (~@Signature): Signature to extend chord with.
  753. lazy (bool): If enabled the new task won't actually be called,
  754. and ``sig.delay()`` must be called manually.
  755. """
  756. if not self.request.chord:
  757. raise ValueError('Current task is not member of any chord')
  758. result = sig.freeze(group_id=self.request.group,
  759. chord=self.request.chord,
  760. root_id=self.request.root_id)
  761. self.backend.add_to_chord(self.request.group, result)
  762. return sig.delay() if not lazy else sig
  763. def update_state(self,
  764. task_id: str = None,
  765. state: str = None,
  766. meta: Mapping = None) -> None:
  767. """Update task state.
  768. Arguments:
  769. task_id (str): Id of the task to update.
  770. Defaults to the id of the current task.
  771. state (str): New state.
  772. meta (Dict): State meta-data.
  773. """
  774. if task_id is None:
  775. task_id = self.request.id
  776. self.backend.store_result(task_id, meta, state)
  777. def on_success(self,
  778. retval: Any,
  779. task_id: str,
  780. args: Sequence,
  781. kwargs: Mapping) -> None:
  782. """Success handler.
  783. Run by the worker if the task executes successfully.
  784. Arguments:
  785. retval (Any): The return value of the task.
  786. task_id (str): Unique id of the executed task.
  787. args (Tuple): Original arguments for the executed task.
  788. kwargs (Dict): Original keyword arguments for the executed task.
  789. Returns:
  790. None: The return value of this handler is ignored.
  791. """
  792. ...
  793. def on_retry(self,
  794. exc: Exception,
  795. task_id: str,
  796. args: Sequence,
  797. kwargs: Mapping,
  798. einfo: ExceptionInfo) -> None:
  799. """Retry handler.
  800. This is run by the worker when the task is to be retried.
  801. Arguments:
  802. exc (Exception): The exception sent to :meth:`retry`.
  803. task_id (str): Unique id of the retried task.
  804. args (Tuple): Original arguments for the retried task.
  805. kwargs (Dict): Original keyword arguments for the retried task.
  806. einfo (~billiard.einfo.ExceptionInfo): Exception information.
  807. Returns:
  808. None: The return value of this handler is ignored.
  809. """
  810. ...
  811. def on_failure(self,
  812. exc: Exception,
  813. task_id: str,
  814. args: Sequence,
  815. kwargs: Mapping,
  816. einfo: ExceptionInfo) -> None:
  817. """Error handler.
  818. This is run by the worker when the task fails.
  819. Arguments:
  820. exc (Exception): The exception raised by the task.
  821. task_id (str): Unique id of the failed task.
  822. args (Tuple): Original arguments for the task that failed.
  823. kwargs (Dict): Original keyword arguments for the task that failed.
  824. einfo (~billiard.einfo.ExceptionInfo): Exception information.
  825. Returns:
  826. None: The return value of this handler is ignored.
  827. """
  828. ...
  829. def after_return(self, status: str, retval: Any, task_id: str,
  830. args: Sequence, kwargs: Mapping,
  831. einfo: ExceptionInfo) -> None:
  832. """Handler called after the task returns.
  833. Arguments:
  834. status (str): Current task state.
  835. retval (Any): Task return value/exception.
  836. task_id (str): Unique id of the task.
  837. args (Tuple): Original arguments for the task.
  838. kwargs (Dict): Original keyword arguments for the task.
  839. einfo (~billiard.einfo.ExceptionInfo): Exception information.
  840. Returns:
  841. None: The return value of this handler is ignored.
  842. """
  843. ...
  844. def add_trail(self, result: ResultT) -> ResultT:
  845. if self.trail:
  846. self.request.children.append(result)
  847. return result
  848. def push_request(self, *args, **kwargs) -> None:
  849. self.request_stack.push(Context(*args, **kwargs))
  850. def pop_request(self) -> None:
  851. self.request_stack.pop()
  852. def __repr__(self) -> str:
  853. """``repr(task)``."""
  854. return _reprtask(self, R_SELF_TASK if self.__self__ else R_INSTANCE)
  855. def _get_request(self) -> Context:
  856. """Get current request object."""
  857. req = self.request_stack.top
  858. if req is None:
  859. # task was not called, but some may still expect a request
  860. # to be there, perhaps that should be deprecated.
  861. if self._default_request is None:
  862. self._default_request = Context()
  863. return self._default_request
  864. return req
  865. request = property(_get_request)
  866. def _get_exec_options(self) -> Mapping:
  867. if self._exec_options is None:
  868. self._exec_options = extract_exec_options(self)
  869. return self._exec_options
  870. @property
  871. def backend(self) -> BackendT:
  872. backend = self._backend
  873. if backend is None:
  874. return self.app.backend
  875. return backend
  876. @backend.setter
  877. def backend(self, value: BackendT) -> None: # noqa
  878. self._backend = value
  879. @property
  880. def __name__(self) -> str:
  881. return self.__class__.__name__
  882. BaseTask = Task # noqa: E305 XXX compat alias