request.py 18 KB


  1. # -*- coding: utf-8 -*-
  2. """This module defines the :class:`Request` class, which specifies
  3. how tasks are executed."""
  4. import logging
  5. import sys
  6. from datetime import datetime
  7. from weakref import ref
  8. from billiard.common import TERM_SIGNAME
  9. from kombu.utils import cached_property
  10. from kombu.utils.encoding import safe_repr, safe_str
  11. from celery import signals
  12. from celery.app.trace import trace_task, trace_task_ret
  13. from celery.exceptions import (
  14. Ignore, TaskRevokedError, InvalidTaskError,
  15. SoftTimeLimitExceeded, TimeLimitExceeded,
  16. WorkerLostError, Terminated, Retry, Reject,
  17. )
  18. from celery.platforms import signals as _signals
  19. from celery.utils.functional import noop
  20. from celery.utils.log import get_logger
  21. from celery.utils.nodenames import gethostname
  22. from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
  23. from celery.utils.serialization import get_pickled_exception
  24. from . import state
  25. __all__ = ['Request']
  26. IS_PYPY = hasattr(sys, 'pypy_version_info')
  27. logger = get_logger(__name__)
  28. debug, info, warn, error = (logger.debug, logger.info,
  29. logger.warning, logger.error)
  30. _does_info = False
  31. _does_debug = False
  32. def __optimize__():
  33. # this is also called by celery.app.trace.setup_worker_optimizations
  34. global _does_debug
  35. global _does_info
  36. _does_debug = logger.isEnabledFor(logging.DEBUG)
  37. _does_info = logger.isEnabledFor(logging.INFO)
  38. __optimize__()
  39. # Localize
  40. tz_or_local = timezone.tz_or_local
  41. send_revoked = signals.task_revoked.send
  42. task_accepted = state.task_accepted
  43. task_ready = state.task_ready
  44. revoked_tasks = state.revoked
  45. class Request:
  46. """A request for task execution."""
  47. acknowledged = False
  48. time_start = None
  49. worker_pid = None
  50. time_limits = (None, None)
  51. _already_revoked = False
  52. _terminate_on_ack = None
  53. _apply_result = None
  54. _tzlocal = None
  55. if not IS_PYPY: # pragma: no cover
  56. __slots__ = (
  57. 'app', 'type', 'name', 'id', 'root_id', 'parent_id',
  58. 'on_ack', 'body', 'hostname', 'eventer', 'connection_errors',
  59. 'task', 'eta', 'expires', 'request_dict', 'on_reject', 'utc',
  60. 'content_type', 'content_encoding', 'argsrepr', 'kwargsrepr',
  61. '_decoded',
  62. '__weakref__', '__dict__',
  63. )
  64. def __init__(self, message, on_ack=noop,
  65. hostname=None, eventer=None, app=None,
  66. connection_errors=None, request_dict=None,
  67. task=None, on_reject=noop, body=None,
  68. headers=None, decoded=False, utc=True,
  69. maybe_make_aware=maybe_make_aware,
  70. maybe_iso8601=maybe_iso8601, **opts):
  71. if headers is None:
  72. headers = message.headers
  73. if body is None:
  74. body = message.body
  75. self.app = app
  76. self.message = message
  77. self.body = body
  78. self.utc = utc
  79. self._decoded = decoded
  80. if decoded:
  81. self.content_type = self.content_encoding = None
  82. else:
  83. self.content_type, self.content_encoding = (
  84. message.content_type, message.content_encoding,
  85. )
  86. self.id = headers['id']
  87. type = self.type = self.name = headers['task']
  88. self.root_id = headers.get('root_id')
  89. self.parent_id = headers.get('parent_id')
  90. if 'shadow' in headers:
  91. self.name = headers['shadow'] or self.name
  92. if 'timelimit' in headers:
  93. self.time_limits = headers['timelimit']
  94. self.argsrepr = headers.get('argsrepr', '')
  95. self.kwargsrepr = headers.get('kwargsrepr', '')
  96. self.on_ack = on_ack
  97. self.on_reject = on_reject
  98. self.hostname = hostname or gethostname()
  99. self.eventer = eventer
  100. self.connection_errors = connection_errors or ()
  101. self.task = task or self.app.tasks[type]
  102. # timezone means the message is timezone-aware, and the only timezone
  103. # supported at this point is UTC.
  104. eta = headers.get('eta')
  105. if eta is not None:
  106. try:
  107. eta = maybe_iso8601(eta)
  108. except (AttributeError, ValueError, TypeError) as exc:
  109. raise InvalidTaskError(
  110. 'invalid eta value {0!r}: {1}'.format(eta, exc))
  111. self.eta = maybe_make_aware(eta, self.tzlocal)
  112. else:
  113. self.eta = None
  114. expires = headers.get('expires')
  115. if expires is not None:
  116. try:
  117. expires = maybe_iso8601(expires)
  118. except (AttributeError, ValueError, TypeError) as exc:
  119. raise InvalidTaskError(
  120. 'invalid expires value {0!r}: {1}'.format(expires, exc))
  121. self.expires = maybe_make_aware(expires, self.tzlocal)
  122. else:
  123. self.expires = None
  124. delivery_info = message.delivery_info or {}
  125. properties = message.properties or {}
  126. headers.update({
  127. 'reply_to': properties.get('reply_to'),
  128. 'correlation_id': properties.get('correlation_id'),
  129. 'delivery_info': {
  130. 'exchange': delivery_info.get('exchange'),
  131. 'routing_key': delivery_info.get('routing_key'),
  132. 'priority': properties.get('priority'),
  133. 'redelivered': delivery_info.get('redelivered'),
  134. }
  135. })
  136. self.request_dict = headers
  137. @property
  138. def delivery_info(self):
  139. return self.request_dict['delivery_info']
  140. def execute_using_pool(self, pool, **kwargs):
  141. """Used by the worker to send this task to the pool.
  142. Arguments:
  143. pool (~celery.concurrency.base.TaskPool): The execution pool
  144. used to execute this request.
  145. Raises:
  146. celery.exceptions.TaskRevokedError: if the task was revoked.
  147. """
  148. task_id = self.id
  149. task = self.task
  150. if self.revoked():
  151. raise TaskRevokedError(task_id)
  152. time_limit, soft_time_limit = self.time_limits
  153. result = pool.apply_async(
  154. trace_task_ret,
  155. args=(self.type, task_id, self.request_dict, self.body,
  156. self.content_type, self.content_encoding),
  157. accept_callback=self.on_accepted,
  158. timeout_callback=self.on_timeout,
  159. callback=self.on_success,
  160. error_callback=self.on_failure,
  161. soft_timeout=soft_time_limit or task.soft_time_limit,
  162. timeout=time_limit or task.time_limit,
  163. correlation_id=task_id,
  164. )
  165. # cannot create weakref to None
  166. self._apply_result = ref(result) if result is not None else result
  167. return result
  168. def execute(self, loglevel=None, logfile=None):
  169. """Execute the task in a :func:`~celery.app.trace.trace_task`.
  170. Arguments:
  171. loglevel (int): The loglevel used by the task.
  172. logfile (str): The logfile used by the task.
  173. """
  174. if self.revoked():
  175. return
  176. # acknowledge task as being processed.
  177. if not self.task.acks_late:
  178. self.acknowledge()
  179. request = self.request_dict
  180. args, kwargs, embed = self._payload
  181. request.update({'loglevel': loglevel, 'logfile': logfile,
  182. 'hostname': self.hostname, 'is_eager': False,
  183. 'args': args, 'kwargs': kwargs}, **embed or {})
  184. retval = trace_task(self.task, self.id, args, kwargs, request,
  185. hostname=self.hostname, loader=self.app.loader,
  186. app=self.app)[0]
  187. self.acknowledge()
  188. return retval
  189. def maybe_expire(self):
  190. """If expired, mark the task as revoked."""
  191. if self.expires:
  192. now = datetime.now(self.expires.tzinfo)
  193. if now > self.expires:
  194. revoked_tasks.add(self.id)
  195. return True
  196. def terminate(self, pool, signal=None):
  197. signal = _signals.signum(signal or TERM_SIGNAME)
  198. if self.time_start:
  199. pool.terminate_job(self.worker_pid, signal)
  200. self._announce_revoked('terminated', True, signal, False)
  201. else:
  202. self._terminate_on_ack = pool, signal
  203. if self._apply_result is not None:
  204. obj = self._apply_result() # is a weakref
  205. if obj is not None:
  206. obj.terminate(signal)
  207. def _announce_revoked(self, reason, terminated, signum, expired):
  208. task_ready(self)
  209. self.send_event('task-revoked',
  210. terminated=terminated, signum=signum, expired=expired)
  211. self.task.backend.mark_as_revoked(
  212. self.id, reason, request=self, store_result=self.store_errors,
  213. )
  214. self.acknowledge()
  215. self._already_revoked = True
  216. send_revoked(self.task, request=self,
  217. terminated=terminated, signum=signum, expired=expired)
  218. def revoked(self):
  219. """If revoked, skip task and mark state."""
  220. expired = False
  221. if self._already_revoked:
  222. return True
  223. if self.expires:
  224. expired = self.maybe_expire()
  225. if self.id in revoked_tasks:
  226. info('Discarding revoked task: %s[%s]', self.name, self.id)
  227. self._announce_revoked(
  228. 'expired' if expired else 'revoked', False, None, expired,
  229. )
  230. return True
  231. return False
  232. def send_event(self, type, **fields):
  233. if self.eventer and self.eventer.enabled:
  234. self.eventer.send(type, uuid=self.id, **fields)
  235. def on_accepted(self, pid, time_accepted):
  236. """Handler called when task is accepted by worker pool."""
  237. self.worker_pid = pid
  238. self.time_start = time_accepted
  239. task_accepted(self)
  240. if not self.task.acks_late:
  241. self.acknowledge()
  242. self.send_event('task-started')
  243. if _does_debug:
  244. debug('Task accepted: %s[%s] pid:%r', self.name, self.id, pid)
  245. if self._terminate_on_ack is not None:
  246. self.terminate(*self._terminate_on_ack)
  247. def on_timeout(self, soft, timeout):
  248. """Handler called if the task times out."""
  249. task_ready(self)
  250. if soft:
  251. warn('Soft time limit (%ss) exceeded for %s[%s]',
  252. soft, self.name, self.id)
  253. exc = SoftTimeLimitExceeded(soft)
  254. else:
  255. error('Hard time limit (%ss) exceeded for %s[%s]',
  256. timeout, self.name, self.id)
  257. exc = TimeLimitExceeded(timeout)
  258. self.task.backend.mark_as_failure(
  259. self.id, exc, request=self, store_result=self.store_errors,
  260. )
  261. if self.task.acks_late:
  262. self.acknowledge()
  263. def on_success(self, failed__retval__runtime, **kwargs):
  264. """Handler called if the task was successfully processed."""
  265. failed, retval, runtime = failed__retval__runtime
  266. if failed:
  267. if isinstance(retval.exception, (SystemExit, KeyboardInterrupt)):
  268. raise retval.exception
  269. return self.on_failure(retval, return_ok=True)
  270. task_ready(self)
  271. if self.task.acks_late:
  272. self.acknowledge()
  273. self.send_event('task-succeeded', result=retval, runtime=runtime)
  274. def on_retry(self, exc_info):
  275. """Handler called if the task should be retried."""
  276. if self.task.acks_late:
  277. self.acknowledge()
  278. self.send_event('task-retried',
  279. exception=safe_repr(exc_info.exception.exc),
  280. traceback=safe_str(exc_info.traceback))
  281. def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
  282. """Handler called if the task raised an exception."""
  283. task_ready(self)
  284. if isinstance(exc_info.exception, MemoryError):
  285. raise MemoryError('Process got: %s' % (exc_info.exception,))
  286. elif isinstance(exc_info.exception, Reject):
  287. return self.reject(requeue=exc_info.exception.requeue)
  288. elif isinstance(exc_info.exception, Ignore):
  289. return self.acknowledge()
  290. exc = exc_info.exception
  291. if isinstance(exc, Retry):
  292. return self.on_retry(exc_info)
  293. # These are special cases where the process would not have had
  294. # time to write the result.
  295. if isinstance(exc, Terminated):
  296. self._announce_revoked(
  297. 'terminated', True, str(exc), False)
  298. send_failed_event = False # already sent revoked event
  299. elif isinstance(exc, WorkerLostError) or not return_ok:
  300. self.task.backend.mark_as_failure(
  301. self.id, exc, request=self, store_result=self.store_errors,
  302. )
  303. # (acks_late) acknowledge after result stored.
  304. if self.task.acks_late:
  305. requeue = self.delivery_info.get('redelivered', None) is False
  306. reject = (
  307. self.task.reject_on_worker_lost and
  308. isinstance(exc, WorkerLostError)
  309. )
  310. if reject:
  311. self.reject(requeue=requeue)
  312. send_failed_event = False
  313. else:
  314. self.acknowledge()
  315. if send_failed_event:
  316. self.send_event(
  317. 'task-failed',
  318. exception=safe_repr(get_pickled_exception(exc_info.exception)),
  319. traceback=exc_info.traceback,
  320. )
  321. if not return_ok:
  322. error('Task handler raised error: %r', exc,
  323. exc_info=exc_info.exc_info)
  324. def acknowledge(self):
  325. """Acknowledge task."""
  326. if not self.acknowledged:
  327. self.on_ack(logger, self.connection_errors)
  328. self.acknowledged = True
  329. def reject(self, requeue=False):
  330. if not self.acknowledged:
  331. self.on_reject(logger, self.connection_errors, requeue)
  332. self.acknowledged = True
  333. self.send_event('task-rejected', requeue=requeue)
  334. def info(self, safe=False):
  335. return {
  336. 'id': self.id,
  337. 'name': self.name,
  338. 'args': self.argsrepr,
  339. 'kwargs': self.kwargsrepr,
  340. 'type': self.type,
  341. 'body': self.body,
  342. 'hostname': self.hostname,
  343. 'time_start': self.time_start,
  344. 'acknowledged': self.acknowledged,
  345. 'delivery_info': self.delivery_info,
  346. 'worker_pid': self.worker_pid,
  347. }
  348. def __str__(self):
  349. return ' '.join([
  350. self.humaninfo(),
  351. ' eta:[{0}]'.format(self.eta) if self.eta else '',
  352. ' expires:[{0}]'.format(self.expires) if self.expires else '',
  353. ])
  354. def humaninfo(self):
  355. return '{0.name}[{0.id}]'.format(self)
  356. def __repr__(self):
  357. return '<{0}: {1} {2} {3}>'.format(
  358. type(self).__name__, self.humaninfo(),
  359. self.argsrepr, self.kwargsrepr,
  360. )
  361. @property
  362. def tzlocal(self):
  363. if self._tzlocal is None:
  364. self._tzlocal = self.app.conf.timezone
  365. return self._tzlocal
  366. @property
  367. def store_errors(self):
  368. return (not self.task.ignore_result or
  369. self.task.store_errors_even_if_ignored)
  370. @property
  371. def reply_to(self):
  372. # used by rpc backend when failures reported by parent process
  373. return self.request_dict['reply_to']
  374. @property
  375. def correlation_id(self):
  376. # used similarly to reply_to
  377. return self.request_dict['correlation_id']
  378. @cached_property
  379. def _payload(self):
  380. return self.body if self._decoded else self.message.payload
  381. @cached_property
  382. def chord(self):
  383. # used by backend.mark_as_failure when failure is reported
  384. # by parent process
  385. _, _, embed = self._payload
  386. return embed.get('chord')
  387. @cached_property
  388. def errbacks(self):
  389. # used by backend.mark_as_failure when failure is reported
  390. # by parent process
  391. _, _, embed = self._payload
  392. return embed.get('errbacks')
  393. @cached_property
  394. def group(self):
  395. # used by backend.on_chord_part_return when failures reported
  396. # by parent process
  397. return self.request_dict['group']
  398. def create_request_cls(base, task, pool, hostname, eventer,
  399. ref=ref, revoked_tasks=revoked_tasks,
  400. task_ready=task_ready):
  401. from celery.app.trace import trace_task_ret as trace
  402. default_time_limit = task.time_limit
  403. default_soft_time_limit = task.soft_time_limit
  404. apply_async = pool.apply_async
  405. acks_late = task.acks_late
  406. events = eventer and eventer.enabled
  407. class Request(base):
  408. def execute_using_pool(self, pool, **kwargs):
  409. task_id = self.id
  410. if (self.expires or task_id in revoked_tasks) and self.revoked():
  411. raise TaskRevokedError(task_id)
  412. time_limit, soft_time_limit = self.time_limits
  413. result = apply_async(
  414. trace,
  415. args=(self.type, task_id, self.request_dict, self.body,
  416. self.content_type, self.content_encoding),
  417. accept_callback=self.on_accepted,
  418. timeout_callback=self.on_timeout,
  419. callback=self.on_success,
  420. error_callback=self.on_failure,
  421. soft_timeout=soft_time_limit or default_soft_time_limit,
  422. timeout=time_limit or default_time_limit,
  423. correlation_id=task_id,
  424. )
  425. # cannot create weakref to None
  426. self._apply_result = ref(result) if result is not None else result
  427. return result
  428. def on_success(self, failed__retval__runtime, **kwargs):
  429. failed, retval, runtime = failed__retval__runtime
  430. if failed:
  431. if isinstance(retval.exception, (
  432. SystemExit, KeyboardInterrupt)):
  433. raise retval.exception
  434. return self.on_failure(retval, return_ok=True)
  435. task_ready(self)
  436. if acks_late:
  437. self.acknowledge()
  438. if events:
  439. self.send_event(
  440. 'task-succeeded', result=retval, runtime=runtime,
  441. )
  442. return Request