job.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.job
  4. ~~~~~~~~~~~~~~~~~
  5. This module defines the :class:`Request` class,
  6. which specifies how tasks are executed.
  7. """
  8. from __future__ import absolute_import
  9. import logging
  10. import time
  11. import socket
  12. import sys
  13. from datetime import datetime
  14. from kombu.utils import kwdict, reprcall
  15. from kombu.utils.encoding import safe_repr, safe_str
  16. from celery import exceptions
  17. from celery import signals
  18. from celery.app import app_or_default
  19. from celery.datastructures import ExceptionInfo
  20. from celery.exceptions import Ignore, TaskRevokedError
  21. from celery.platforms import signals as _signals
  22. from celery.task.trace import (
  23. trace_task,
  24. trace_task_ret,
  25. )
  26. from celery.utils import fun_takes_kwargs
  27. from celery.utils.functional import noop
  28. from celery.utils.log import get_logger
  29. from celery.utils.serialization import get_pickled_exception
  30. from celery.utils.text import truncate
  31. from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
  32. from . import state
  33. IS_PYPY = hasattr(sys, 'pypy_version_info')
  34. logger = get_logger(__name__)
  35. debug, info, warn, error = (logger.debug, logger.info,
  36. logger.warning, logger.error)
  37. _does_info = False
  38. _does_debug = False
  39. def __optimize__():
  40. global _does_debug
  41. global _does_info
  42. _does_debug = logger.isEnabledFor(logging.DEBUG)
  43. _does_info = logger.isEnabledFor(logging.INFO)
  44. __optimize__()
  45. # Localize
  46. tz_utc = timezone.utc
  47. tz_or_local = timezone.tz_or_local
  48. send_revoked = signals.task_revoked.send
  49. task_accepted = state.task_accepted
  50. task_ready = state.task_ready
  51. revoked_tasks = state.revoked
  52. NEEDS_KWDICT = sys.version_info <= (2, 6)
  53. class Request(object):
  54. """A request for task execution."""
  55. if not IS_PYPY:
  56. __slots__ = (
  57. 'app', 'name', 'id', 'args', 'kwargs', 'on_ack', 'delivery_info',
  58. 'hostname', 'eventer', 'connection_errors', 'task', 'eta',
  59. 'expires', 'request_dict', 'acknowledged', 'success_msg',
  60. 'error_msg', 'retry_msg', 'ignore_msg', 'utc', 'time_start',
  61. 'worker_pid', '_already_revoked', '_terminate_on_ack',
  62. '_tzlocal', '__weakref__',
  63. )
  64. #: Format string used to log task success.
  65. success_msg = """\
  66. Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s
  67. """
  68. #: Format string used to log task failure.
  69. error_msg = """\
  70. Task %(name)s[%(id)s] raised exception: %(exc)s
  71. """
  72. #: Format string used to log internal error.
  73. internal_error_msg = """\
  74. Task %(name)s[%(id)s] INTERNAL ERROR: %(exc)s
  75. """
  76. ignored_msg = """\
  77. Task %(name)s[%(id)s] ignored
  78. """
  79. #: Format string used to log task retry.
  80. retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
  81. def __init__(self, body, on_ack=noop,
  82. hostname=None, eventer=None, app=None,
  83. connection_errors=None, request_dict=None,
  84. delivery_info=None, task=None, **opts):
  85. self.app = app or app_or_default(app)
  86. name = self.name = body['task']
  87. self.id = body['id']
  88. self.args = body.get('args', [])
  89. self.kwargs = body.get('kwargs', {})
  90. try:
  91. self.kwargs.items
  92. except AttributeError:
  93. raise exceptions.InvalidTaskError(
  94. 'Task keyword arguments is not a mapping')
  95. if NEEDS_KWDICT:
  96. self.kwargs = kwdict(self.kwargs)
  97. eta = body.get('eta')
  98. expires = body.get('expires')
  99. utc = self.utc = body.get('utc', False)
  100. self.on_ack = on_ack
  101. self.hostname = hostname or socket.gethostname()
  102. self.eventer = eventer
  103. self.connection_errors = connection_errors or ()
  104. self.task = task or self.app.tasks[name]
  105. self.acknowledged = self._already_revoked = False
  106. self.time_start = self.worker_pid = self._terminate_on_ack = None
  107. self._tzlocal = None
  108. # timezone means the message is timezone-aware, and the only timezone
  109. # supported at this point is UTC.
  110. if eta is not None:
  111. self.eta = maybe_iso8601(eta)
  112. if utc:
  113. self.eta = maybe_make_aware(self.eta, self.tzlocal)
  114. else:
  115. self.eta = None
  116. if expires is not None:
  117. self.expires = maybe_iso8601(expires)
  118. if utc:
  119. self.expires = maybe_make_aware(self.expires, self.tzlocal)
  120. else:
  121. self.expires = None
  122. delivery_info = {} if delivery_info is None else delivery_info
  123. self.delivery_info = {
  124. 'exchange': delivery_info.get('exchange'),
  125. 'routing_key': delivery_info.get('routing_key'),
  126. 'priority': delivery_info.get('priority'),
  127. }
  128. # amqplib transport adds the channel here for some reason, so need
  129. # to remove it.
  130. self.delivery_info.pop('channel', None)
  131. self.request_dict = body
  132. @classmethod
  133. def from_message(cls, message, body, **kwargs):
  134. # should be deprecated
  135. return Request(
  136. body,
  137. delivery_info=getattr(message, 'delivery_info', None), **kwargs
  138. )
  139. def extend_with_default_kwargs(self):
  140. """Extend the tasks keyword arguments with standard task arguments.
  141. Currently these are `logfile`, `loglevel`, `task_id`,
  142. `task_name`, `task_retries`, and `delivery_info`.
  143. See :meth:`celery.task.base.Task.run` for more information.
  144. Magic keyword arguments are deprecated and will be removed
  145. in version 4.0.
  146. """
  147. kwargs = dict(self.kwargs)
  148. default_kwargs = {'logfile': None, # deprecated
  149. 'loglevel': None, # deprecated
  150. 'task_id': self.id,
  151. 'task_name': self.name,
  152. 'task_retries': self.request_dict.get('retries', 0),
  153. 'task_is_eager': False,
  154. 'delivery_info': self.delivery_info}
  155. fun = self.task.run
  156. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  157. extend_with = dict((key, val) for key, val in default_kwargs.items()
  158. if key in supported_keys)
  159. kwargs.update(extend_with)
  160. return kwargs
  161. def execute_using_pool(self, pool, **kwargs):
  162. """Like :meth:`execute`, but using a worker pool.
  163. :param pool: A :class:`celery.concurrency.base.TaskPool` instance.
  164. :raises celery.exceptions.TaskRevokedError: if the task was revoked
  165. and ignored.
  166. """
  167. task = self.task
  168. if self.revoked():
  169. raise TaskRevokedError(self.id)
  170. hostname = self.hostname
  171. kwargs = self.kwargs
  172. if task.accept_magic_kwargs:
  173. kwargs = self.extend_with_default_kwargs()
  174. request = self.request_dict
  175. request.update({'hostname': hostname, 'is_eager': False,
  176. 'delivery_info': self.delivery_info,
  177. 'group': self.request_dict.get('taskset')})
  178. result = pool.apply_async(trace_task_ret,
  179. args=(self.name, self.id,
  180. self.args, kwargs, request),
  181. accept_callback=self.on_accepted,
  182. timeout_callback=self.on_timeout,
  183. callback=self.on_success,
  184. error_callback=self.on_failure,
  185. soft_timeout=task.soft_time_limit,
  186. timeout=task.time_limit)
  187. return result
  188. def execute(self, loglevel=None, logfile=None):
  189. """Execute the task in a :func:`~celery.task.trace.trace_task`.
  190. :keyword loglevel: The loglevel used by the task.
  191. :keyword logfile: The logfile used by the task.
  192. """
  193. if self.revoked():
  194. return
  195. # acknowledge task as being processed.
  196. if not self.task.acks_late:
  197. self.acknowledge()
  198. kwargs = self.kwargs
  199. if self.task.accept_magic_kwargs:
  200. kwargs = self.extend_with_default_kwargs()
  201. request = self.request_dict
  202. request.update({'loglevel': loglevel, 'logfile': logfile,
  203. 'hostname': self.hostname, 'is_eager': False,
  204. 'delivery_info': self.delivery_info})
  205. retval = trace_task(self.task, self.id, self.args, kwargs, request,
  206. **{'hostname': self.hostname,
  207. 'loader': self.app.loader})
  208. self.acknowledge()
  209. return retval
  210. def maybe_expire(self):
  211. """If expired, mark the task as revoked."""
  212. if self.expires:
  213. now = datetime.now(tz_or_local(self.tzlocal) if self.utc else None)
  214. if now > self.expires:
  215. revoked_tasks.add(self.id)
  216. return True
  217. def terminate(self, pool, signal=None):
  218. if self.time_start:
  219. signal = _signals.signum(signal or 'TERM')
  220. pool.terminate_job(self.worker_pid, signal)
  221. self._announce_revoked('terminated', True, signal, False)
  222. else:
  223. self._terminate_on_ack = pool, signal
  224. def _announce_revoked(self, reason, terminated, signum, expired):
  225. task_ready(self)
  226. self.send_event('task-revoked',
  227. terminated=terminated, signum=signum, expired=expired)
  228. if self.store_errors:
  229. self.task.backend.mark_as_revoked(self.id, reason)
  230. self.acknowledge()
  231. self._already_revoked = True
  232. send_revoked(self.task, terminated=terminated,
  233. signum=signum, expired=expired)
  234. def revoked(self):
  235. """If revoked, skip task and mark state."""
  236. expired = False
  237. if self._already_revoked:
  238. return True
  239. if self.expires:
  240. expired = self.maybe_expire()
  241. if self.id in revoked_tasks:
  242. warn('Skipping revoked task: %s[%s]', self.name, self.id)
  243. self._announce_revoked(
  244. 'expired' if expired else 'revoked', False, None, expired,
  245. )
  246. return True
  247. return False
  248. def send_event(self, type, **fields):
  249. if self.eventer and self.eventer.enabled:
  250. self.eventer.send(type, uuid=self.id, **fields)
  251. def on_accepted(self, pid, time_accepted):
  252. """Handler called when task is accepted by worker pool."""
  253. self.worker_pid = pid
  254. self.time_start = time_accepted
  255. task_accepted(self)
  256. if not self.task.acks_late:
  257. self.acknowledge()
  258. self.send_event('task-started', pid=pid)
  259. if _does_debug:
  260. debug('Task accepted: %s[%s] pid:%r', self.name, self.id, pid)
  261. if self._terminate_on_ack is not None:
  262. self.terminate(*self._terminate_on_ack)
  263. def on_timeout(self, soft, timeout):
  264. """Handler called if the task times out."""
  265. task_ready(self)
  266. if soft:
  267. warn('Soft time limit (%ss) exceeded for %s[%s]',
  268. timeout, self.name, self.id)
  269. exc = exceptions.SoftTimeLimitExceeded(timeout)
  270. else:
  271. error('Hard time limit (%ss) exceeded for %s[%s]',
  272. timeout, self.name, self.id)
  273. exc = exceptions.TimeLimitExceeded(timeout)
  274. if self.store_errors:
  275. self.task.backend.mark_as_failure(self.id, exc)
  276. def on_success(self, ret_value, now=None):
  277. """Handler called if the task was successfully processed."""
  278. if isinstance(ret_value, ExceptionInfo):
  279. if isinstance(ret_value.exception, (
  280. SystemExit, KeyboardInterrupt)):
  281. raise ret_value.exception
  282. return self.on_failure(ret_value)
  283. task_ready(self)
  284. if self.task.acks_late:
  285. self.acknowledge()
  286. if self.eventer and self.eventer.enabled:
  287. now = time.time()
  288. runtime = self.time_start and (time.time() - self.time_start) or 0
  289. self.send_event('task-succeeded',
  290. result=safe_repr(ret_value), runtime=runtime)
  291. if _does_info:
  292. now = now or time.time()
  293. runtime = self.time_start and (time.time() - self.time_start) or 0
  294. info(self.success_msg.strip(), {
  295. 'id': self.id, 'name': self.name,
  296. 'return_value': self.repr_result(ret_value),
  297. 'runtime': runtime})
  298. def on_retry(self, exc_info):
  299. """Handler called if the task should be retried."""
  300. if self.task.acks_late:
  301. self.acknowledge()
  302. self.send_event('task-retried',
  303. exception=safe_repr(exc_info.exception.exc),
  304. traceback=safe_str(exc_info.traceback))
  305. if _does_info:
  306. info(self.retry_msg.strip(), {
  307. 'id': self.id, 'name': self.name,
  308. 'exc': exc_info.exception})
  309. def on_failure(self, exc_info):
  310. """Handler called if the task raised an exception."""
  311. task_ready(self)
  312. if not exc_info.internal:
  313. exc = exc_info.exception
  314. if isinstance(exc, exceptions.RetryTaskError):
  315. return self.on_retry(exc_info)
  316. # These are special cases where the process would not have had
  317. # time to write the result.
  318. if self.store_errors:
  319. if isinstance(exc, exceptions.WorkerLostError):
  320. self.task.backend.mark_as_failure(self.id, exc)
  321. elif isinstance(exc, exceptions.Terminated):
  322. self._announce_revoked('terminated', True, str(exc), False)
  323. # (acks_late) acknowledge after result stored.
  324. if self.task.acks_late:
  325. self.acknowledge()
  326. self._log_error(exc_info)
  327. def _log_error(self, einfo):
  328. einfo.exception = get_pickled_exception(einfo.exception)
  329. exception, traceback, exc_info, internal, sargs, skwargs = (
  330. safe_repr(einfo.exception),
  331. safe_str(einfo.traceback),
  332. einfo.exc_info,
  333. einfo.internal,
  334. safe_repr(self.args),
  335. safe_repr(self.kwargs),
  336. )
  337. format = self.error_msg
  338. description = 'raised exception'
  339. severity = logging.ERROR
  340. self.send_event(
  341. 'task-failed', exception=exception, traceback=traceback,
  342. )
  343. if internal:
  344. if isinstance(einfo.exception, Ignore):
  345. format = self.ignored_msg
  346. description = 'ignored'
  347. severity = logging.INFO
  348. exc_info = None
  349. self.acknowledge()
  350. else:
  351. format = self.internal_error_msg
  352. description = 'INTERNAL ERROR'
  353. severity = logging.CRITICAL
  354. context = {
  355. 'hostname': self.hostname,
  356. 'id': self.id,
  357. 'name': self.name,
  358. 'exc': exception,
  359. 'traceback': traceback,
  360. 'args': sargs,
  361. 'kwargs': skwargs,
  362. 'description': description,
  363. }
  364. logger.log(severity, format.strip(), context,
  365. exc_info=exc_info,
  366. extra={'data': {'id': self.id,
  367. 'name': self.name,
  368. 'args': sargs,
  369. 'kwargs': skwargs,
  370. 'hostname': self.hostname,
  371. 'internal': internal}})
  372. self.task.send_error_email(context, einfo.exception)
  373. def acknowledge(self):
  374. """Acknowledge task."""
  375. if not self.acknowledged:
  376. self.on_ack(logger, self.connection_errors)
  377. self.acknowledged = True
  378. def repr_result(self, result, maxlen=46):
  379. # 46 is the length needed to fit
  380. # 'the quick brown fox jumps over the lazy dog' :)
  381. return truncate(safe_repr(result), maxlen)
  382. def info(self, safe=False):
  383. return {'id': self.id,
  384. 'name': self.name,
  385. 'args': self.args if safe else safe_repr(self.args),
  386. 'kwargs': self.kwargs if safe else safe_repr(self.kwargs),
  387. 'hostname': self.hostname,
  388. 'time_start': self.time_start,
  389. 'acknowledged': self.acknowledged,
  390. 'delivery_info': self.delivery_info,
  391. 'worker_pid': self.worker_pid}
  392. def __str__(self):
  393. return '%s[%s]%s%s' % (
  394. self.name, self.id,
  395. ' eta:[%s]' % (self.eta, ) if self.eta else '',
  396. ' expires:[%s]' % (self.expires, ) if self.expires else '')
  397. shortinfo = __str__
  398. def __repr__(self):
  399. return '<%s %s: %s>' % (
  400. type(self).__name__, self.id,
  401. reprcall(self.name, self.args, self.kwargs),
  402. )
  403. @property
  404. def tzlocal(self):
  405. if self._tzlocal is None:
  406. self._tzlocal = self.app.conf.CELERY_TIMEZONE
  407. return self._tzlocal
  408. @property
  409. def store_errors(self):
  410. return (not self.task.ignore_result
  411. or self.task.store_errors_even_if_ignored)
  412. def _compat_get_task_id(self):
  413. return self.id
  414. def _compat_set_task_id(self, value):
  415. self.id = value
  416. task_id = property(_compat_get_task_id, _compat_set_task_id)
  417. def _compat_get_task_name(self):
  418. return self.name
  419. def _compat_set_task_name(self, value):
  420. self.name = value
  421. task_name = property(_compat_get_task_name, _compat_set_task_name)
  422. class TaskRequest(Request):
  423. def __init__(self, name, id, args=(), kwargs={},
  424. eta=None, expires=None, **options):
  425. """Compatibility class."""
  426. super(TaskRequest, self).__init__({
  427. 'task': name, 'id': id, 'args': args,
  428. 'kwargs': kwargs, 'eta': eta,
  429. 'expires': expires}, **options)