request.py 19 KB


  1. # -*- coding: utf-8 -*-
  2. """Task request.
  3. This module defines the :class:`Request` class, that specifies
  4. how tasks are executed.
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. import logging
  8. import sys
  9. from datetime import datetime
  10. from weakref import ref
  11. from billiard.common import TERM_SIGNAME
  12. from kombu.utils.encoding import safe_repr, safe_str
  13. from kombu.utils.objects import cached_property
  14. from celery import signals
  15. from celery.app.trace import trace_task, trace_task_ret
  16. from celery.exceptions import (
  17. Ignore, TaskRevokedError, InvalidTaskError,
  18. SoftTimeLimitExceeded, TimeLimitExceeded,
  19. WorkerLostError, Terminated, Retry, Reject,
  20. )
  21. from celery.five import python_2_unicode_compatible, string
  22. from celery.platforms import signals as _signals
  23. from celery.utils.functional import maybe, noop
  24. from celery.utils.log import get_logger
  25. from celery.utils.nodenames import gethostname
  26. from celery.utils.time import maybe_iso8601, timezone, maybe_make_aware
  27. from celery.utils.serialization import get_pickled_exception
  28. from . import state
  29. __all__ = ('Request',)
  30. # pylint: disable=redefined-outer-name
  31. # We cache globals and attribute lookups, so disable this warning.
  32. IS_PYPY = hasattr(sys, 'pypy_version_info')
  33. logger = get_logger(__name__)
  34. debug, info, warn, error = (logger.debug, logger.info,
  35. logger.warning, logger.error)
  36. _does_info = False
  37. _does_debug = False
  38. def __optimize__():
  39. # this is also called by celery.app.trace.setup_worker_optimizations
  40. global _does_debug
  41. global _does_info
  42. _does_debug = logger.isEnabledFor(logging.DEBUG)
  43. _does_info = logger.isEnabledFor(logging.INFO)
  44. __optimize__() # noqa: E305
  45. # Localize
  46. tz_or_local = timezone.tz_or_local
  47. send_revoked = signals.task_revoked.send
  48. task_accepted = state.task_accepted
  49. task_ready = state.task_ready
  50. revoked_tasks = state.revoked
  51. @python_2_unicode_compatible
  52. class Request(object):
  53. """A request for task execution."""
  54. acknowledged = False
  55. time_start = None
  56. worker_pid = None
  57. time_limits = (None, None)
  58. _already_revoked = False
  59. _terminate_on_ack = None
  60. _apply_result = None
  61. _tzlocal = None
  62. if not IS_PYPY: # pragma: no cover
  63. __slots__ = (
  64. 'app', 'type', 'name', 'id', 'root_id', 'parent_id',
  65. 'on_ack', 'body', 'hostname', 'eventer', 'connection_errors',
  66. 'task', 'eta', 'expires', 'request_dict', 'on_reject', 'utc',
  67. 'content_type', 'content_encoding', 'argsrepr', 'kwargsrepr',
  68. '_decoded',
  69. '__weakref__', '__dict__',
  70. )
  71. def __init__(self, message, on_ack=noop,
  72. hostname=None, eventer=None, app=None,
  73. connection_errors=None, request_dict=None,
  74. task=None, on_reject=noop, body=None,
  75. headers=None, decoded=False, utc=True,
  76. maybe_make_aware=maybe_make_aware,
  77. maybe_iso8601=maybe_iso8601, **opts):
  78. if headers is None:
  79. headers = message.headers
  80. if body is None:
  81. body = message.body
  82. self.app = app
  83. self.message = message
  84. self.body = body
  85. self.utc = utc
  86. self._decoded = decoded
  87. if decoded:
  88. self.content_type = self.content_encoding = None
  89. else:
  90. self.content_type, self.content_encoding = (
  91. message.content_type, message.content_encoding,
  92. )
  93. self.id = headers['id']
  94. type = self.type = self.name = headers['task']
  95. self.root_id = headers.get('root_id')
  96. self.parent_id = headers.get('parent_id')
  97. if 'shadow' in headers:
  98. self.name = headers['shadow'] or self.name
  99. if 'timelimit' in headers:
  100. self.time_limits = headers['timelimit']
  101. self.argsrepr = headers.get('argsrepr', '')
  102. self.kwargsrepr = headers.get('kwargsrepr', '')
  103. self.on_ack = on_ack
  104. self.on_reject = on_reject
  105. self.hostname = hostname or gethostname()
  106. self.eventer = eventer
  107. self.connection_errors = connection_errors or ()
  108. self.task = task or self.app.tasks[type]
  109. # timezone means the message is timezone-aware, and the only timezone
  110. # supported at this point is UTC.
  111. eta = headers.get('eta')
  112. if eta is not None:
  113. try:
  114. eta = maybe_iso8601(eta)
  115. except (AttributeError, ValueError, TypeError) as exc:
  116. raise InvalidTaskError(
  117. 'invalid ETA value {0!r}: {1}'.format(eta, exc))
  118. self.eta = maybe_make_aware(eta, self.tzlocal)
  119. else:
  120. self.eta = None
  121. expires = headers.get('expires')
  122. if expires is not None:
  123. try:
  124. expires = maybe_iso8601(expires)
  125. except (AttributeError, ValueError, TypeError) as exc:
  126. raise InvalidTaskError(
  127. 'invalid expires value {0!r}: {1}'.format(expires, exc))
  128. self.expires = maybe_make_aware(expires, self.tzlocal)
  129. else:
  130. self.expires = None
  131. delivery_info = message.delivery_info or {}
  132. properties = message.properties or {}
  133. headers.update({
  134. 'reply_to': properties.get('reply_to'),
  135. 'correlation_id': properties.get('correlation_id'),
  136. 'delivery_info': {
  137. 'exchange': delivery_info.get('exchange'),
  138. 'routing_key': delivery_info.get('routing_key'),
  139. 'priority': properties.get('priority'),
  140. 'redelivered': delivery_info.get('redelivered'),
  141. }
  142. })
  143. self.request_dict = headers
  144. @property
  145. def delivery_info(self):
  146. return self.request_dict['delivery_info']
  147. def execute_using_pool(self, pool, **kwargs):
  148. """Used by the worker to send this task to the pool.
  149. Arguments:
  150. pool (~celery.concurrency.base.TaskPool): The execution pool
  151. used to execute this request.
  152. Raises:
  153. celery.exceptions.TaskRevokedError: if the task was revoked.
  154. """
  155. task_id = self.id
  156. task = self.task
  157. if self.revoked():
  158. raise TaskRevokedError(task_id)
  159. time_limit, soft_time_limit = self.time_limits
  160. result = pool.apply_async(
  161. trace_task_ret,
  162. args=(self.type, task_id, self.request_dict, self.body,
  163. self.content_type, self.content_encoding),
  164. accept_callback=self.on_accepted,
  165. timeout_callback=self.on_timeout,
  166. callback=self.on_success,
  167. error_callback=self.on_failure,
  168. soft_timeout=soft_time_limit or task.soft_time_limit,
  169. timeout=time_limit or task.time_limit,
  170. correlation_id=task_id,
  171. )
  172. # cannot create weakref to None
  173. self._apply_result = maybe(ref, result)
  174. return result
  175. def execute(self, loglevel=None, logfile=None):
  176. """Execute the task in a :func:`~celery.app.trace.trace_task`.
  177. Arguments:
  178. loglevel (int): The loglevel used by the task.
  179. logfile (str): The logfile used by the task.
  180. """
  181. if self.revoked():
  182. return
  183. # acknowledge task as being processed.
  184. if not self.task.acks_late:
  185. self.acknowledge()
  186. request = self.request_dict
  187. # pylint: disable=unpacking-non-sequence
  188. # payload is a property, so pylint doesn't think it's a tuple.
  189. args, kwargs, embed = self._payload
  190. request.update({
  191. 'loglevel': loglevel,
  192. 'logfile': logfile,
  193. 'hostname': self.hostname,
  194. 'is_eager': False,
  195. 'args': args,
  196. 'kwargs': kwargs
  197. }, **embed or {})
  198. retval = trace_task(self.task, self.id, args, kwargs, request,
  199. hostname=self.hostname, loader=self.app.loader,
  200. app=self.app)[0]
  201. self.acknowledge()
  202. return retval
  203. def maybe_expire(self):
  204. """If expired, mark the task as revoked."""
  205. if self.expires:
  206. now = datetime.now(self.expires.tzinfo)
  207. if now > self.expires:
  208. revoked_tasks.add(self.id)
  209. return True
  210. def terminate(self, pool, signal=None):
  211. signal = _signals.signum(signal or TERM_SIGNAME)
  212. if self.time_start:
  213. pool.terminate_job(self.worker_pid, signal)
  214. self._announce_revoked('terminated', True, signal, False)
  215. else:
  216. self._terminate_on_ack = pool, signal
  217. if self._apply_result is not None:
  218. obj = self._apply_result() # is a weakref
  219. if obj is not None:
  220. obj.terminate(signal)
  221. def _announce_revoked(self, reason, terminated, signum, expired):
  222. task_ready(self)
  223. self.send_event('task-revoked',
  224. terminated=terminated, signum=signum, expired=expired)
  225. self.task.backend.mark_as_revoked(
  226. self.id, reason, request=self, store_result=self.store_errors,
  227. )
  228. self.acknowledge()
  229. self._already_revoked = True
  230. send_revoked(self.task, request=self,
  231. terminated=terminated, signum=signum, expired=expired)
  232. def revoked(self):
  233. """If revoked, skip task and mark state."""
  234. expired = False
  235. if self._already_revoked:
  236. return True
  237. if self.expires:
  238. expired = self.maybe_expire()
  239. if self.id in revoked_tasks:
  240. info('Discarding revoked task: %s[%s]', self.name, self.id)
  241. self._announce_revoked(
  242. 'expired' if expired else 'revoked', False, None, expired,
  243. )
  244. return True
  245. return False
  246. def send_event(self, type, **fields):
  247. if self.eventer and self.eventer.enabled and self.task.send_events:
  248. self.eventer.send(type, uuid=self.id, **fields)
  249. def on_accepted(self, pid, time_accepted):
  250. """Handler called when task is accepted by worker pool."""
  251. self.worker_pid = pid
  252. self.time_start = time_accepted
  253. task_accepted(self)
  254. if not self.task.acks_late:
  255. self.acknowledge()
  256. self.send_event('task-started')
  257. if _does_debug:
  258. debug('Task accepted: %s[%s] pid:%r', self.name, self.id, pid)
  259. if self._terminate_on_ack is not None:
  260. self.terminate(*self._terminate_on_ack)
  261. def on_timeout(self, soft, timeout):
  262. """Handler called if the task times out."""
  263. task_ready(self)
  264. if soft:
  265. warn('Soft time limit (%ss) exceeded for %s[%s]',
  266. timeout, self.name, self.id)
  267. exc = SoftTimeLimitExceeded(soft)
  268. else:
  269. error('Hard time limit (%ss) exceeded for %s[%s]',
  270. timeout, self.name, self.id)
  271. exc = TimeLimitExceeded(timeout)
  272. self.task.backend.mark_as_failure(
  273. self.id, exc, request=self, store_result=self.store_errors,
  274. )
  275. if self.task.acks_late:
  276. self.acknowledge()
  277. def on_success(self, failed__retval__runtime, **kwargs):
  278. """Handler called if the task was successfully processed."""
  279. failed, retval, runtime = failed__retval__runtime
  280. if failed:
  281. if isinstance(retval.exception, (SystemExit, KeyboardInterrupt)):
  282. raise retval.exception
  283. return self.on_failure(retval, return_ok=True)
  284. task_ready(self)
  285. if self.task.acks_late:
  286. self.acknowledge()
  287. self.send_event('task-succeeded', result=retval, runtime=runtime)
  288. def on_retry(self, exc_info):
  289. """Handler called if the task should be retried."""
  290. if self.task.acks_late:
  291. self.acknowledge()
  292. self.send_event('task-retried',
  293. exception=safe_repr(exc_info.exception.exc),
  294. traceback=safe_str(exc_info.traceback))
  295. def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
  296. """Handler called if the task raised an exception."""
  297. task_ready(self)
  298. if isinstance(exc_info.exception, MemoryError):
  299. raise MemoryError('Process got: %s' % (exc_info.exception,))
  300. elif isinstance(exc_info.exception, Reject):
  301. return self.reject(requeue=exc_info.exception.requeue)
  302. elif isinstance(exc_info.exception, Ignore):
  303. return self.acknowledge()
  304. exc = exc_info.exception
  305. if isinstance(exc, Retry):
  306. return self.on_retry(exc_info)
  307. # These are special cases where the process wouldn't've had
  308. # time to write the result.
  309. if isinstance(exc, Terminated):
  310. self._announce_revoked(
  311. 'terminated', True, string(exc), False)
  312. send_failed_event = False # already sent revoked event
  313. elif isinstance(exc, WorkerLostError) or not return_ok:
  314. self.task.backend.mark_as_failure(
  315. self.id, exc, request=self, store_result=self.store_errors,
  316. )
  317. # (acks_late) acknowledge after result stored.
  318. if self.task.acks_late:
  319. requeue = not self.delivery_info.get('redelivered')
  320. reject = (
  321. self.task.reject_on_worker_lost and
  322. isinstance(exc, WorkerLostError)
  323. )
  324. if reject:
  325. self.reject(requeue=requeue)
  326. send_failed_event = False
  327. else:
  328. self.acknowledge()
  329. if send_failed_event:
  330. self.send_event(
  331. 'task-failed',
  332. exception=safe_repr(get_pickled_exception(exc_info.exception)),
  333. traceback=exc_info.traceback,
  334. )
  335. if not return_ok:
  336. error('Task handler raised error: %r', exc,
  337. exc_info=exc_info.exc_info)
  338. def acknowledge(self):
  339. """Acknowledge task."""
  340. if not self.acknowledged:
  341. self.on_ack(logger, self.connection_errors)
  342. self.acknowledged = True
  343. def reject(self, requeue=False):
  344. if not self.acknowledged:
  345. self.on_reject(logger, self.connection_errors, requeue)
  346. self.acknowledged = True
  347. self.send_event('task-rejected', requeue=requeue)
  348. def info(self, safe=False):
  349. return {
  350. 'id': self.id,
  351. 'name': self.name,
  352. 'args': self.argsrepr,
  353. 'kwargs': self.kwargsrepr,
  354. 'type': self.type,
  355. 'hostname': self.hostname,
  356. 'time_start': self.time_start,
  357. 'acknowledged': self.acknowledged,
  358. 'delivery_info': self.delivery_info,
  359. 'worker_pid': self.worker_pid,
  360. }
  361. def humaninfo(self):
  362. return '{0.name}[{0.id}]'.format(self)
  363. def __str__(self):
  364. """``str(self)``."""
  365. return ' '.join([
  366. self.humaninfo(),
  367. ' ETA:[{0}]'.format(self.eta) if self.eta else '',
  368. ' expires:[{0}]'.format(self.expires) if self.expires else '',
  369. ])
  370. def __repr__(self):
  371. """``repr(self)``."""
  372. return '<{0}: {1} {2} {3}>'.format(
  373. type(self).__name__, self.humaninfo(),
  374. self.argsrepr, self.kwargsrepr,
  375. )
  376. @property
  377. def tzlocal(self):
  378. if self._tzlocal is None:
  379. self._tzlocal = self.app.conf.timezone
  380. return self._tzlocal
  381. @property
  382. def store_errors(self):
  383. return (not self.task.ignore_result or
  384. self.task.store_errors_even_if_ignored)
  385. @property
  386. def task_id(self):
  387. # XXX compat
  388. return self.id
  389. @task_id.setter # noqa
  390. def task_id(self, value):
  391. self.id = value
  392. @property
  393. def task_name(self):
  394. # XXX compat
  395. return self.name
  396. @task_name.setter # noqa
  397. def task_name(self, value):
  398. self.name = value
  399. @property
  400. def reply_to(self):
  401. # used by rpc backend when failures reported by parent process
  402. return self.request_dict['reply_to']
  403. @property
  404. def correlation_id(self):
  405. # used similarly to reply_to
  406. return self.request_dict['correlation_id']
  407. @cached_property
  408. def _payload(self):
  409. return self.body if self._decoded else self.message.payload
  410. @cached_property
  411. def chord(self):
  412. # used by backend.mark_as_failure when failure is reported
  413. # by parent process
  414. # pylint: disable=unpacking-non-sequence
  415. # payload is a property, so pylint doesn't think it's a tuple.
  416. _, _, embed = self._payload
  417. return embed.get('chord')
  418. @cached_property
  419. def errbacks(self):
  420. # used by backend.mark_as_failure when failure is reported
  421. # by parent process
  422. # pylint: disable=unpacking-non-sequence
  423. # payload is a property, so pylint doesn't think it's a tuple.
  424. _, _, embed = self._payload
  425. return embed.get('errbacks')
  426. @cached_property
  427. def group(self):
  428. # used by backend.on_chord_part_return when failures reported
  429. # by parent process
  430. return self.request_dict['group']
  431. def create_request_cls(base, task, pool, hostname, eventer,
  432. ref=ref, revoked_tasks=revoked_tasks,
  433. task_ready=task_ready, trace=trace_task_ret):
  434. default_time_limit = task.time_limit
  435. default_soft_time_limit = task.soft_time_limit
  436. apply_async = pool.apply_async
  437. acks_late = task.acks_late
  438. events = eventer and eventer.enabled
  439. class Request(base):
  440. def execute_using_pool(self, pool, **kwargs):
  441. task_id = self.id
  442. if (self.expires or task_id in revoked_tasks) and self.revoked():
  443. raise TaskRevokedError(task_id)
  444. time_limit, soft_time_limit = self.time_limits
  445. result = apply_async(
  446. trace,
  447. args=(self.type, task_id, self.request_dict, self.body,
  448. self.content_type, self.content_encoding),
  449. accept_callback=self.on_accepted,
  450. timeout_callback=self.on_timeout,
  451. callback=self.on_success,
  452. error_callback=self.on_failure,
  453. soft_timeout=soft_time_limit or default_soft_time_limit,
  454. timeout=time_limit or default_time_limit,
  455. correlation_id=task_id,
  456. )
  457. # cannot create weakref to None
  458. # pylint: disable=attribute-defined-outside-init
  459. self._apply_result = maybe(ref, result)
  460. return result
  461. def on_success(self, failed__retval__runtime, **kwargs):
  462. failed, retval, runtime = failed__retval__runtime
  463. if failed:
  464. if isinstance(retval.exception, (
  465. SystemExit, KeyboardInterrupt)):
  466. raise retval.exception
  467. return self.on_failure(retval, return_ok=True)
  468. task_ready(self)
  469. if acks_late:
  470. self.acknowledge()
  471. if events:
  472. self.send_event(
  473. 'task-succeeded', result=retval, runtime=runtime,
  474. )
  475. return Request