request.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.request
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. This module defines the :class:`Request` class,
  6. which specifies how tasks are executed.
  7. """
  8. from __future__ import absolute_import, unicode_literals
  9. import logging
  10. import socket
  11. import sys
  12. from datetime import datetime
  13. from weakref import ref
  14. from kombu.utils.encoding import safe_repr, safe_str
  15. from celery import signals
  16. from celery.app.trace import trace_task, trace_task_ret
  17. from celery.exceptions import (
  18. Ignore, TaskRevokedError, InvalidTaskError,
  19. SoftTimeLimitExceeded, TimeLimitExceeded,
  20. WorkerLostError, Terminated, Retry, Reject,
  21. )
  22. from celery.five import string
  23. from celery.platforms import signals as _signals
  24. from celery.utils.functional import noop
  25. from celery.utils.log import get_logger
  26. from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
  27. from celery.utils.serialization import get_pickled_exception
  28. from . import state
  29. __all__ = ['Request']
  30. IS_PYPY = hasattr(sys, 'pypy_version_info')
  31. logger = get_logger(__name__)
  32. debug, info, warn, error = (logger.debug, logger.info,
  33. logger.warning, logger.error)
  34. _does_info = False
  35. _does_debug = False
  36. def __optimize__():
  37. # this is also called by celery.app.trace.setup_worker_optimizations
  38. global _does_debug
  39. global _does_info
  40. _does_debug = logger.isEnabledFor(logging.DEBUG)
  41. _does_info = logger.isEnabledFor(logging.INFO)
  42. __optimize__()
  43. # Localize
  44. tz_utc = timezone.utc
  45. tz_or_local = timezone.tz_or_local
  46. send_revoked = signals.task_revoked.send
  47. task_accepted = state.task_accepted
  48. task_ready = state.task_ready
  49. revoked_tasks = state.revoked
  50. class Request(object):
  51. """A request for task execution."""
  52. acknowledged = False
  53. time_start = None
  54. worker_pid = None
  55. timeouts = (None, None)
  56. _already_revoked = False
  57. _terminate_on_ack = None
  58. _apply_result = None
  59. _tzlocal = None
  60. if not IS_PYPY: # pragma: no cover
  61. __slots__ = (
  62. 'app', 'name', 'id', 'on_ack', 'body',
  63. 'hostname', 'eventer', 'connection_errors', 'task', 'eta',
  64. 'expires', 'request_dict', 'on_reject', 'utc',
  65. 'content_type', 'content_encoding',
  66. '__weakref__', '__dict__',
  67. )
  68. def __init__(self, message, on_ack=noop,
  69. hostname=None, eventer=None, app=None,
  70. connection_errors=None, request_dict=None,
  71. task=None, on_reject=noop, body=None,
  72. headers=None, decoded=False, utc=True,
  73. maybe_make_aware=maybe_make_aware,
  74. maybe_iso8601=maybe_iso8601, **opts):
  75. if headers is None:
  76. headers = message.headers
  77. if body is None:
  78. body = message.body
  79. self.app = app
  80. self.message = message
  81. self.body = body
  82. self.utc = utc
  83. if decoded:
  84. self.content_type = self.content_encoding = None
  85. else:
  86. self.content_type, self.content_encoding = (
  87. message.content_type, message.content_encoding,
  88. )
  89. name = self.name = headers['task']
  90. self.id = headers['id']
  91. if 'timeouts' in headers:
  92. self.timeouts = headers['timeouts']
  93. self.on_ack = on_ack
  94. self.on_reject = on_reject
  95. self.hostname = hostname or socket.gethostname()
  96. self.eventer = eventer
  97. self.connection_errors = connection_errors or ()
  98. self.task = task or self.app.tasks[name]
  99. # timezone means the message is timezone-aware, and the only timezone
  100. # supported at this point is UTC.
  101. eta = headers.get('eta')
  102. if eta is not None:
  103. try:
  104. eta = maybe_iso8601(eta)
  105. except (AttributeError, ValueError, TypeError) as exc:
  106. raise InvalidTaskError(
  107. 'invalid eta value {0!r}: {1}'.format(eta, exc))
  108. self.eta = maybe_make_aware(eta, self.tzlocal)
  109. else:
  110. self.eta = None
  111. expires = headers.get('expires')
  112. if expires is not None:
  113. try:
  114. expires = maybe_iso8601(expires)
  115. except (AttributeError, ValueError, TypeError) as exc:
  116. raise InvalidTaskError(
  117. 'invalid expires value {0!r}: {1}'.format(expires, exc))
  118. self.expires = maybe_make_aware(expires, self.tzlocal)
  119. else:
  120. self.expires = None
  121. delivery_info = message.delivery_info or {}
  122. properties = message.properties or {}
  123. headers.update({
  124. 'reply_to': properties.get('reply_to'),
  125. 'correlation_id': properties.get('correlation_id'),
  126. 'delivery_info': {
  127. 'exchange': delivery_info.get('exchange'),
  128. 'routing_key': delivery_info.get('routing_key'),
  129. 'priority': delivery_info.get('priority'),
  130. 'redelivered': delivery_info.get('redelivered'),
  131. }
  132. })
  133. self.request_dict = headers
  134. @property
  135. def delivery_info(self):
  136. return self.request_dict['delivery_info']
  137. def execute_using_pool(self, pool, **kwargs):
  138. """Used by the worker to send this task to the pool.
  139. :param pool: A :class:`celery.concurrency.base.TaskPool` instance.
  140. :raises celery.exceptions.TaskRevokedError: if the task was revoked
  141. and ignored.
  142. """
  143. task_id = self.id
  144. task = self.task
  145. if self.revoked():
  146. raise TaskRevokedError(task_id)
  147. timeout, soft_timeout = self.timeouts
  148. timeout = timeout or task.time_limit
  149. soft_timeout = soft_timeout or task.soft_time_limit
  150. result = pool.apply_async(
  151. trace_task_ret,
  152. args=(self.name, task_id, self.request_dict, self.body,
  153. self.content_type, self.content_encoding, self.hostname),
  154. accept_callback=self.on_accepted,
  155. timeout_callback=self.on_timeout,
  156. callback=self.on_success,
  157. error_callback=self.on_failure,
  158. soft_timeout=soft_timeout or task.soft_time_limit,
  159. timeout=timeout or task.time_limit,
  160. correlation_id=task_id,
  161. )
  162. # cannot create weakref to None
  163. self._apply_result = ref(result) if result is not None else result
  164. return result
  165. def execute(self, loglevel=None, logfile=None):
  166. """Execute the task in a :func:`~celery.app.trace.trace_task`.
  167. :keyword loglevel: The loglevel used by the task.
  168. :keyword logfile: The logfile used by the task.
  169. """
  170. if self.revoked():
  171. return
  172. # acknowledge task as being processed.
  173. if not self.task.acks_late:
  174. self.acknowledge()
  175. request = self.request_dict
  176. args, kwargs = self.message.payload
  177. request.update({'loglevel': loglevel, 'logfile': logfile,
  178. 'hostname': self.hostname, 'is_eager': False,
  179. 'args': args, 'kwargs': kwargs})
  180. retval = trace_task(self.task, self.id, args, kwargs, request,
  181. hostname=self.hostname, loader=self.app.loader,
  182. app=self.app)[0]
  183. self.acknowledge()
  184. return retval
  185. def maybe_expire(self):
  186. """If expired, mark the task as revoked."""
  187. if self.expires:
  188. now = datetime.now(tz_or_local(self.tzlocal) if self.utc else None)
  189. if now > self.expires:
  190. revoked_tasks.add(self.id)
  191. return True
  192. def terminate(self, pool, signal=None):
  193. signal = _signals.signum(signal or 'TERM')
  194. if self.time_start:
  195. pool.terminate_job(self.worker_pid, signal)
  196. self._announce_revoked('terminated', True, signal, False)
  197. else:
  198. self._terminate_on_ack = pool, signal
  199. if self._apply_result is not None:
  200. obj = self._apply_result() # is a weakref
  201. if obj is not None:
  202. obj.terminate(signal)
  203. def _announce_revoked(self, reason, terminated, signum, expired):
  204. task_ready(self)
  205. self.send_event('task-revoked',
  206. terminated=terminated, signum=signum, expired=expired)
  207. if self.store_errors:
  208. self.task.backend.mark_as_revoked(self.id, reason, request=self)
  209. self.acknowledge()
  210. self._already_revoked = True
  211. send_revoked(self.task, request=self,
  212. terminated=terminated, signum=signum, expired=expired)
  213. def revoked(self):
  214. """If revoked, skip task and mark state."""
  215. expired = False
  216. if self._already_revoked:
  217. return True
  218. if self.expires:
  219. expired = self.maybe_expire()
  220. if self.id in revoked_tasks:
  221. info('Discarding revoked task: %s[%s]', self.name, self.id)
  222. self._announce_revoked(
  223. 'expired' if expired else 'revoked', False, None, expired,
  224. )
  225. return True
  226. return False
  227. def send_event(self, type, **fields):
  228. if self.eventer and self.eventer.enabled:
  229. self.eventer.send(type, uuid=self.id, **fields)
  230. def on_accepted(self, pid, time_accepted):
  231. """Handler called when task is accepted by worker pool."""
  232. self.worker_pid = pid
  233. self.time_start = time_accepted
  234. task_accepted(self)
  235. if not self.task.acks_late:
  236. self.acknowledge()
  237. self.send_event('task-started')
  238. if _does_debug:
  239. debug('Task accepted: %s[%s] pid:%r', self.name, self.id, pid)
  240. if self._terminate_on_ack is not None:
  241. self.terminate(*self._terminate_on_ack)
  242. def on_timeout(self, soft, timeout):
  243. """Handler called if the task times out."""
  244. task_ready(self)
  245. if soft:
  246. warn('Soft time limit (%ss) exceeded for %s[%s]',
  247. timeout, self.name, self.id)
  248. exc = SoftTimeLimitExceeded(timeout)
  249. else:
  250. error('Hard time limit (%ss) exceeded for %s[%s]',
  251. timeout, self.name, self.id)
  252. exc = TimeLimitExceeded(timeout)
  253. if self.store_errors:
  254. self.task.backend.mark_as_failure(self.id, exc, request=self)
  255. if self.task.acks_late:
  256. self.acknowledge()
  257. def on_success(self, failed__retval__runtime, **kwargs):
  258. """Handler called if the task was successfully processed."""
  259. failed, retval, runtime = failed__retval__runtime
  260. if failed:
  261. if isinstance(retval.exception, (SystemExit, KeyboardInterrupt)):
  262. raise retval.exception
  263. return self.on_failure(retval, return_ok=True)
  264. task_ready(self)
  265. if self.task.acks_late:
  266. self.acknowledge()
  267. if self.eventer and self.eventer.enabled:
  268. self.send_event(
  269. 'task-succeeded', result=retval, runtime=runtime,
  270. )
  271. def on_retry(self, exc_info):
  272. """Handler called if the task should be retried."""
  273. if self.task.acks_late:
  274. self.acknowledge()
  275. self.send_event('task-retried',
  276. exception=safe_repr(exc_info.exception.exc),
  277. traceback=safe_str(exc_info.traceback))
  278. def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
  279. """Handler called if the task raised an exception."""
  280. task_ready(self)
  281. if isinstance(exc_info.exception, MemoryError):
  282. raise MemoryError('Process got: %s' % (exc_info.exception, ))
  283. elif isinstance(exc_info.exception, Reject):
  284. return self.reject(requeue=exc_info.exception.requeue)
  285. elif isinstance(exc_info.exception, Ignore):
  286. return self.acknowledge()
  287. exc = exc_info.exception
  288. if isinstance(exc, Retry):
  289. return self.on_retry(exc_info)
  290. # These are special cases where the process would not have had
  291. # time to write the result.
  292. if self.store_errors:
  293. if isinstance(exc, Terminated):
  294. self._announce_revoked(
  295. 'terminated', True, string(exc), False)
  296. send_failed_event = False # already sent revoked event
  297. elif isinstance(exc, WorkerLostError) or not return_ok:
  298. self.task.backend.mark_as_failure(
  299. self.id, exc, request=self,
  300. )
  301. # (acks_late) acknowledge after result stored.
  302. if self.task.acks_late:
  303. self.acknowledge()
  304. if send_failed_event:
  305. self.send_event(
  306. 'task-failed',
  307. exception=safe_repr(get_pickled_exception(exc_info.exception)),
  308. traceback=exc_info.traceback,
  309. )
  310. if not return_ok:
  311. error('Task handler raised error: %r', exc,
  312. exc_info=exc_info.exc_info)
  313. def acknowledge(self):
  314. """Acknowledge task."""
  315. if not self.acknowledged:
  316. self.on_ack(logger, self.connection_errors)
  317. self.acknowledged = True
  318. def reject(self, requeue=False):
  319. if not self.acknowledged:
  320. self.on_reject(logger, self.connection_errors, requeue)
  321. self.acknowledged = True
  322. def info(self, safe=False):
  323. return {'id': self.id,
  324. 'name': self.name,
  325. 'body': self.body,
  326. 'hostname': self.hostname,
  327. 'time_start': self.time_start,
  328. 'acknowledged': self.acknowledged,
  329. 'delivery_info': self.delivery_info,
  330. 'worker_pid': self.worker_pid}
  331. def __str__(self):
  332. return '{0.name}[{0.id}]{1}{2}'.format(self,
  333. ' eta:[{0}]'.format(self.eta) if self.eta else '',
  334. ' expires:[{0}]'.format(self.expires) if self.expires else '')
  335. shortinfo = __str__
  336. def __repr__(self):
  337. return '<{0} {1}: {2}>'.format(type(self).__name__, self.id, self.name)
  338. @property
  339. def tzlocal(self):
  340. if self._tzlocal is None:
  341. self._tzlocal = self.app.conf.CELERY_TIMEZONE
  342. return self._tzlocal
  343. @property
  344. def store_errors(self):
  345. return (not self.task.ignore_result
  346. or self.task.store_errors_even_if_ignored)
  347. @property
  348. def task_id(self):
  349. # XXX compat
  350. return self.id
  351. @task_id.setter # noqa
  352. def task_id(self, value):
  353. self.id = value
  354. @property
  355. def task_name(self):
  356. # XXX compat
  357. return self.name
  358. @task_name.setter # noqa
  359. def task_name(self, value):
  360. self.name = value
  361. @property
  362. def reply_to(self):
  363. # used by rpc backend when failures reported by parent process
  364. return self.request_dict['reply_to']
  365. @property
  366. def correlation_id(self):
  367. # used similarly to reply_to
  368. return self.request_dict['correlation_id']
  369. def create_request_cls(base, task, pool, hostname, eventer,
  370. ref=ref, revoked_tasks=revoked_tasks,
  371. task_ready=task_ready):
  372. from celery.app.trace import trace_task_ret as trace
  373. default_time_limit = task.time_limit
  374. default_soft_time_limit = task.soft_time_limit
  375. apply_async = pool.apply_async
  376. acks_late = task.acks_late
  377. std_kwargs = {'hostname': hostname, 'is_eager': False}
  378. events = eventer and eventer.enabled
  379. class Request(base):
  380. def execute_using_pool(self, pool, **kwargs):
  381. task_id = self.id
  382. if (self.expires or task_id in revoked_tasks) and self.revoked():
  383. raise TaskRevokedError(task_id)
  384. timeout, soft_timeout = self.timeouts
  385. timeout = timeout or default_time_limit
  386. soft_timeout = soft_timeout or default_soft_time_limit
  387. result = apply_async(
  388. trace,
  389. args=(self.name, task_id, self.request_dict, self.body,
  390. self.content_type, self.content_encoding),
  391. kwargs=std_kwargs,
  392. accept_callback=self.on_accepted,
  393. timeout_callback=self.on_timeout,
  394. callback=self.on_success,
  395. error_callback=self.on_failure,
  396. soft_timeout=soft_timeout,
  397. timeout=timeout,
  398. correlation_id=task_id,
  399. )
  400. # cannot create weakref to None
  401. self._apply_result = ref(result) if result is not None else result
  402. return result
  403. def on_success(self, failed__retval__runtime, **kwargs):
  404. failed, retval, runtime = failed__retval__runtime
  405. if failed:
  406. if isinstance(retval.exception, (
  407. SystemExit, KeyboardInterrupt)):
  408. raise retval.exception
  409. return self.on_failure(retval, return_ok=True)
  410. task_ready(self)
  411. if acks_late:
  412. self.acknowledge()
  413. if events:
  414. self.send_event(
  415. 'task-succeeded', result=retval, runtime=runtime,
  416. )
  417. return Request