request.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.request
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. This module defines the :class:`Request` class,
  6. which specifies how tasks are executed.
  7. """
  8. from __future__ import absolute_import, unicode_literals
  9. import logging
  10. import sys
  11. from datetime import datetime
  12. from weakref import ref
  13. from billiard.common import TERM_SIGNAME
  14. from kombu.utils.encoding import safe_repr, safe_str
  15. from celery import signals
  16. from celery.app.trace import trace_task, trace_task_ret
  17. from celery.exceptions import (
  18. Ignore, TaskRevokedError, InvalidTaskError,
  19. SoftTimeLimitExceeded, TimeLimitExceeded,
  20. WorkerLostError, Terminated, Retry, Reject,
  21. )
  22. from celery.five import string
  23. from celery.platforms import signals as _signals
  24. from celery.utils import cached_property, gethostname
  25. from celery.utils.functional import noop
  26. from celery.utils.log import get_logger
  27. from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
  28. from celery.utils.serialization import get_pickled_exception
  29. from . import state
  30. __all__ = ['Request']
  31. IS_PYPY = hasattr(sys, 'pypy_version_info')
  32. logger = get_logger(__name__)
  33. debug, info, warn, error = (logger.debug, logger.info,
  34. logger.warning, logger.error)
  35. _does_info = False
  36. _does_debug = False
  37. def __optimize__():
  38. # this is also called by celery.app.trace.setup_worker_optimizations
  39. global _does_debug
  40. global _does_info
  41. _does_debug = logger.isEnabledFor(logging.DEBUG)
  42. _does_info = logger.isEnabledFor(logging.INFO)
  43. __optimize__()
  44. # Localize
  45. tz_utc = timezone.utc
  46. tz_or_local = timezone.tz_or_local
  47. send_revoked = signals.task_revoked.send
  48. task_accepted = state.task_accepted
  49. task_ready = state.task_ready
  50. revoked_tasks = state.revoked
  51. class Request(object):
  52. """A request for task execution."""
  53. acknowledged = False
  54. time_start = None
  55. worker_pid = None
  56. time_limits = (None, None)
  57. _already_revoked = False
  58. _terminate_on_ack = None
  59. _apply_result = None
  60. _tzlocal = None
  61. if not IS_PYPY: # pragma: no cover
  62. __slots__ = (
  63. 'app', 'type', 'name', 'id', 'root_id', 'parent_id',
  64. 'on_ack', 'body', 'hostname', 'eventer', 'connection_errors',
  65. 'task', 'eta', 'expires', 'request_dict', 'on_reject', 'utc',
  66. 'content_type', 'content_encoding', 'argsrepr', 'kwargsrepr',
  67. '_decoded',
  68. '__weakref__', '__dict__',
  69. )
  70. def __init__(self, message, on_ack=noop,
  71. hostname=None, eventer=None, app=None,
  72. connection_errors=None, request_dict=None,
  73. task=None, on_reject=noop, body=None,
  74. headers=None, decoded=False, utc=True,
  75. maybe_make_aware=maybe_make_aware,
  76. maybe_iso8601=maybe_iso8601, **opts):
  77. if headers is None:
  78. headers = message.headers
  79. if body is None:
  80. body = message.body
  81. self.app = app
  82. self.message = message
  83. self.body = body
  84. self.utc = utc
  85. self._decoded = decoded
  86. if decoded:
  87. self.content_type = self.content_encoding = None
  88. else:
  89. self.content_type, self.content_encoding = (
  90. message.content_type, message.content_encoding,
  91. )
  92. self.id = headers['id']
  93. type = self.type = self.name = headers['task']
  94. self.root_id = headers.get('root_id')
  95. self.parent_id = headers.get('parent_id')
  96. if 'shadow' in headers:
  97. self.name = headers['shadow'] or self.name
  98. if 'timelimit' in headers:
  99. self.time_limits = headers['timelimit']
  100. self.argsrepr = headers.get('argsrepr', '')
  101. self.kwargsrepr = headers.get('kwargsrepr', '')
  102. self.on_ack = on_ack
  103. self.on_reject = on_reject
  104. self.hostname = hostname or gethostname()
  105. self.eventer = eventer
  106. self.connection_errors = connection_errors or ()
  107. self.task = task or self.app.tasks[type]
  108. # timezone means the message is timezone-aware, and the only timezone
  109. # supported at this point is UTC.
  110. eta = headers.get('eta')
  111. if eta is not None:
  112. try:
  113. eta = maybe_iso8601(eta)
  114. except (AttributeError, ValueError, TypeError) as exc:
  115. raise InvalidTaskError(
  116. 'invalid eta value {0!r}: {1}'.format(eta, exc))
  117. self.eta = maybe_make_aware(eta, self.tzlocal)
  118. else:
  119. self.eta = None
  120. expires = headers.get('expires')
  121. if expires is not None:
  122. try:
  123. expires = maybe_iso8601(expires)
  124. except (AttributeError, ValueError, TypeError) as exc:
  125. raise InvalidTaskError(
  126. 'invalid expires value {0!r}: {1}'.format(expires, exc))
  127. self.expires = maybe_make_aware(expires, self.tzlocal)
  128. else:
  129. self.expires = None
  130. delivery_info = message.delivery_info or {}
  131. properties = message.properties or {}
  132. headers.update({
  133. 'reply_to': properties.get('reply_to'),
  134. 'correlation_id': properties.get('correlation_id'),
  135. 'delivery_info': {
  136. 'exchange': delivery_info.get('exchange'),
  137. 'routing_key': delivery_info.get('routing_key'),
  138. 'priority': properties.get('priority'),
  139. 'redelivered': delivery_info.get('redelivered'),
  140. }
  141. })
  142. self.request_dict = headers
  143. @property
  144. def delivery_info(self):
  145. return self.request_dict['delivery_info']
  146. def execute_using_pool(self, pool, **kwargs):
  147. """Used by the worker to send this task to the pool.
  148. :param pool: A :class:`celery.concurrency.base.TaskPool` instance.
  149. :raises celery.exceptions.TaskRevokedError: if the task was revoked
  150. and ignored.
  151. """
  152. task_id = self.id
  153. task = self.task
  154. if self.revoked():
  155. raise TaskRevokedError(task_id)
  156. time_limit, soft_time_limit = self.time_limits
  157. time_limit = time_limit or task.time_limit
  158. soft_time_limit = soft_time_limit or task.soft_time_limit
  159. result = pool.apply_async(
  160. trace_task_ret,
  161. args=(self.type, task_id, self.request_dict, self.body,
  162. self.content_type, self.content_encoding),
  163. accept_callback=self.on_accepted,
  164. timeout_callback=self.on_timeout,
  165. callback=self.on_success,
  166. error_callback=self.on_failure,
  167. soft_timeout=soft_time_limit,
  168. timeout=time_limit,
  169. correlation_id=task_id,
  170. )
  171. # cannot create weakref to None
  172. self._apply_result = ref(result) if result is not None else result
  173. return result
  174. def execute(self, loglevel=None, logfile=None):
  175. """Execute the task in a :func:`~celery.app.trace.trace_task`.
  176. :keyword loglevel: The loglevel used by the task.
  177. :keyword logfile: The logfile used by the task.
  178. """
  179. if self.revoked():
  180. return
  181. # acknowledge task as being processed.
  182. if not self.task.acks_late:
  183. self.acknowledge()
  184. request = self.request_dict
  185. args, kwargs, embed = self._payload
  186. request.update({'loglevel': loglevel, 'logfile': logfile,
  187. 'hostname': self.hostname, 'is_eager': False,
  188. 'args': args, 'kwargs': kwargs}, **embed or {})
  189. retval = trace_task(self.task, self.id, args, kwargs, request,
  190. hostname=self.hostname, loader=self.app.loader,
  191. app=self.app)[0]
  192. self.acknowledge()
  193. return retval
  194. def maybe_expire(self):
  195. """If expired, mark the task as revoked."""
  196. if self.expires:
  197. now = datetime.now(self.expires.tzinfo)
  198. if now > self.expires:
  199. revoked_tasks.add(self.id)
  200. return True
  201. def terminate(self, pool, signal=None):
  202. signal = _signals.signum(signal or TERM_SIGNAME)
  203. if self.time_start:
  204. pool.terminate_job(self.worker_pid, signal)
  205. self._announce_revoked('terminated', True, signal, False)
  206. else:
  207. self._terminate_on_ack = pool, signal
  208. if self._apply_result is not None:
  209. obj = self._apply_result() # is a weakref
  210. if obj is not None:
  211. obj.terminate(signal)
  212. def _announce_revoked(self, reason, terminated, signum, expired):
  213. task_ready(self)
  214. self.send_event('task-revoked',
  215. terminated=terminated, signum=signum, expired=expired)
  216. self.task.backend.mark_as_revoked(
  217. self.id, reason, request=self, store_result=self.store_errors,
  218. )
  219. self.acknowledge()
  220. self._already_revoked = True
  221. send_revoked(self.task, request=self,
  222. terminated=terminated, signum=signum, expired=expired)
  223. def revoked(self):
  224. """If revoked, skip task and mark state."""
  225. expired = False
  226. if self._already_revoked:
  227. return True
  228. if self.expires:
  229. expired = self.maybe_expire()
  230. if self.id in revoked_tasks:
  231. info('Discarding revoked task: %s[%s]', self.name, self.id)
  232. self._announce_revoked(
  233. 'expired' if expired else 'revoked', False, None, expired,
  234. )
  235. return True
  236. return False
  237. def send_event(self, type, **fields):
  238. if self.eventer and self.eventer.enabled:
  239. self.eventer.send(type, uuid=self.id, **fields)
  240. def on_accepted(self, pid, time_accepted):
  241. """Handler called when task is accepted by worker pool."""
  242. self.worker_pid = pid
  243. self.time_start = time_accepted
  244. task_accepted(self)
  245. if not self.task.acks_late:
  246. self.acknowledge()
  247. self.send_event('task-started')
  248. if _does_debug:
  249. debug('Task accepted: %s[%s] pid:%r', self.name, self.id, pid)
  250. if self._terminate_on_ack is not None:
  251. self.terminate(*self._terminate_on_ack)
  252. def on_timeout(self, soft, timeout):
  253. """Handler called if the task times out."""
  254. task_ready(self)
  255. if soft:
  256. warn('Soft time limit (%ss) exceeded for %s[%s]',
  257. soft, self.name, self.id)
  258. exc = SoftTimeLimitExceeded(soft)
  259. else:
  260. error('Hard time limit (%ss) exceeded for %s[%s]',
  261. timeout, self.name, self.id)
  262. exc = TimeLimitExceeded(timeout)
  263. self.task.backend.mark_as_failure(
  264. self.id, exc, request=self, store_result=self.store_errors,
  265. )
  266. if self.task.acks_late:
  267. self.acknowledge()
  268. def on_success(self, failed__retval__runtime, **kwargs):
  269. """Handler called if the task was successfully processed."""
  270. failed, retval, runtime = failed__retval__runtime
  271. if failed:
  272. if isinstance(retval.exception, (SystemExit, KeyboardInterrupt)):
  273. raise retval.exception
  274. return self.on_failure(retval, return_ok=True)
  275. task_ready(self)
  276. if self.task.acks_late:
  277. self.acknowledge()
  278. self.send_event('task-succeeded', result=retval, runtime=runtime)
  279. def on_retry(self, exc_info):
  280. """Handler called if the task should be retried."""
  281. if self.task.acks_late:
  282. self.acknowledge()
  283. self.send_event('task-retried',
  284. exception=safe_repr(exc_info.exception.exc),
  285. traceback=safe_str(exc_info.traceback))
  286. def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
  287. """Handler called if the task raised an exception."""
  288. task_ready(self)
  289. if isinstance(exc_info.exception, MemoryError):
  290. raise MemoryError('Process got: %s' % (exc_info.exception,))
  291. elif isinstance(exc_info.exception, Reject):
  292. return self.reject(requeue=exc_info.exception.requeue)
  293. elif isinstance(exc_info.exception, Ignore):
  294. return self.acknowledge()
  295. exc = exc_info.exception
  296. if isinstance(exc, Retry):
  297. return self.on_retry(exc_info)
  298. # These are special cases where the process would not have had
  299. # time to write the result.
  300. if isinstance(exc, Terminated):
  301. self._announce_revoked(
  302. 'terminated', True, string(exc), False)
  303. send_failed_event = False # already sent revoked event
  304. elif isinstance(exc, WorkerLostError) or not return_ok:
  305. self.task.backend.mark_as_failure(
  306. self.id, exc, request=self, store_result=self.store_errors,
  307. )
  308. # (acks_late) acknowledge after result stored.
  309. if self.task.acks_late:
  310. requeue = self.delivery_info.get('redelivered', None) is False
  311. reject = (
  312. self.task.reject_on_worker_lost and
  313. isinstance(exc, WorkerLostError)
  314. )
  315. if reject:
  316. self.reject(requeue=requeue)
  317. send_failed_event = False
  318. else:
  319. self.acknowledge()
  320. if send_failed_event:
  321. self.send_event(
  322. 'task-failed',
  323. exception=safe_repr(get_pickled_exception(exc_info.exception)),
  324. traceback=exc_info.traceback,
  325. )
  326. if not return_ok:
  327. error('Task handler raised error: %r', exc,
  328. exc_info=exc_info.exc_info)
  329. def acknowledge(self):
  330. """Acknowledge task."""
  331. if not self.acknowledged:
  332. self.on_ack(logger, self.connection_errors)
  333. self.acknowledged = True
  334. def reject(self, requeue=False):
  335. if not self.acknowledged:
  336. self.on_reject(logger, self.connection_errors, requeue)
  337. self.acknowledged = True
  338. self.send_event('task-rejected', requeue=requeue)
  339. def info(self, safe=False):
  340. return {
  341. 'id': self.id,
  342. 'name': self.name,
  343. 'args': self.argsrepr,
  344. 'kwargs': self.kwargsrepr,
  345. 'type': self.type,
  346. 'body': self.body,
  347. 'hostname': self.hostname,
  348. 'time_start': self.time_start,
  349. 'acknowledged': self.acknowledged,
  350. 'delivery_info': self.delivery_info,
  351. 'worker_pid': self.worker_pid,
  352. }
  353. def __str__(self):
  354. return ' '.join([
  355. self.humaninfo(),
  356. ' eta:[{0}]'.format(self.eta) if self.eta else '',
  357. ' expires:[{0}]'.format(self.expires) if self.expires else '',
  358. ])
  359. shortinfo = __str__
  360. def humaninfo(self):
  361. return '{0.name}[{0.id}]'.format(self)
  362. def __repr__(self):
  363. return '<{0}: {1} {2} {3}>'.format(
  364. type(self).__name__, self.humaninfo(),
  365. self.argsrepr, self.kwargsrepr,
  366. )
  367. @property
  368. def tzlocal(self):
  369. if self._tzlocal is None:
  370. self._tzlocal = self.app.conf.timezone
  371. return self._tzlocal
  372. @property
  373. def store_errors(self):
  374. return (not self.task.ignore_result or
  375. self.task.store_errors_even_if_ignored)
  376. @property
  377. def task_id(self):
  378. # XXX compat
  379. return self.id
  380. @task_id.setter # noqa
  381. def task_id(self, value):
  382. self.id = value
  383. @property
  384. def task_name(self):
  385. # XXX compat
  386. return self.name
  387. @task_name.setter # noqa
  388. def task_name(self, value):
  389. self.name = value
  390. @property
  391. def reply_to(self):
  392. # used by rpc backend when failures reported by parent process
  393. return self.request_dict['reply_to']
  394. @property
  395. def correlation_id(self):
  396. # used similarly to reply_to
  397. return self.request_dict['correlation_id']
  398. @cached_property
  399. def _payload(self):
  400. return self.body if self._decoded else self.message.payload
  401. @cached_property
  402. def chord(self):
  403. # used by backend.mark_as_failure when failure is reported
  404. # by parent process
  405. _, _, embed = self._payload
  406. return embed.get('chord')
  407. @cached_property
  408. def errbacks(self):
  409. # used by backend.mark_as_failure when failure is reported
  410. # by parent process
  411. _, _, embed = self._payload
  412. return embed.get('errbacks')
  413. @cached_property
  414. def group(self):
  415. # used by backend.on_chord_part_return when failures reported
  416. # by parent process
  417. return self.request_dict['group']
  418. def create_request_cls(base, task, pool, hostname, eventer,
  419. ref=ref, revoked_tasks=revoked_tasks,
  420. task_ready=task_ready):
  421. from celery.app.trace import trace_task_ret as trace
  422. default_time_limit = task.time_limit
  423. default_soft_time_limit = task.soft_time_limit
  424. apply_async = pool.apply_async
  425. acks_late = task.acks_late
  426. events = eventer and eventer.enabled
  427. class Request(base):
  428. def execute_using_pool(self, pool, **kwargs):
  429. task_id = self.id
  430. if (self.expires or task_id in revoked_tasks) and self.revoked():
  431. raise TaskRevokedError(task_id)
  432. time_limit, soft_time_limit = self.time_limits
  433. time_limit = time_limit or default_time_limit
  434. soft_time_limit = soft_time_limit or default_soft_time_limit
  435. result = apply_async(
  436. trace,
  437. args=(self.type, task_id, self.request_dict, self.body,
  438. self.content_type, self.content_encoding),
  439. accept_callback=self.on_accepted,
  440. timeout_callback=self.on_timeout,
  441. callback=self.on_success,
  442. error_callback=self.on_failure,
  443. soft_timeout=soft_time_limit,
  444. timeout=time_limit,
  445. correlation_id=task_id,
  446. )
  447. # cannot create weakref to None
  448. self._apply_result = ref(result) if result is not None else result
  449. return result
  450. def on_success(self, failed__retval__runtime, **kwargs):
  451. failed, retval, runtime = failed__retval__runtime
  452. if failed:
  453. if isinstance(retval.exception, (
  454. SystemExit, KeyboardInterrupt)):
  455. raise retval.exception
  456. return self.on_failure(retval, return_ok=True)
  457. task_ready(self)
  458. if acks_late:
  459. self.acknowledge()
  460. if events:
  461. self.send_event(
  462. 'task-succeeded', result=retval, runtime=runtime,
  463. )
  464. return Request