job.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.job
  4. ~~~~~~~~~~~~~~~~~
  5. This module defines the :class:`Request` class,
  6. which specifies how tasks are executed.
  7. :copyright: (c) 2009 - 2012 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. import logging
  12. import time
  13. import socket
  14. import sys
  15. from datetime import datetime
  16. from kombu.utils import kwdict, reprcall
  17. from kombu.utils.encoding import safe_repr, safe_str
  18. from celery import exceptions
  19. from celery.app import app_or_default
  20. from celery.app.state import _tls
  21. from celery.datastructures import ExceptionInfo
  22. from celery.task.trace import build_tracer, trace_task, report_internal_error
  23. from celery.platforms import set_mp_process_title as setps
  24. from celery.utils import fun_takes_kwargs
  25. from celery.utils.functional import noop
  26. from celery.utils.log import get_logger
  27. from celery.utils.text import truncate
  28. from celery.utils.timeutils import maybe_iso8601, timezone
  29. from . import state
  30. logger = get_logger(__name__)
  31. debug, info, warn, error = (logger.debug, logger.info,
  32. logger.warn, logger.error)
  33. _does_debug = logger.isEnabledFor(logging.DEBUG)
  34. _does_info = logger.isEnabledFor(logging.INFO)
  35. # Localize
  36. tz_to_local = timezone.to_local
  37. tz_or_local = timezone.tz_or_local
  38. tz_utc = timezone.utc
  39. NEEDS_KWDICT = sys.version_info <= (2, 6)
  40. def execute_and_trace(name, uuid, args, kwargs, request=None, **opts):
  41. """This is a pickleable method used as a target when applying to pools.
  42. It's the same as::
  43. >>> trace_task(name, *args, **kwargs)[0]
  44. """
  45. task = _tls.current_app._tasks[name]
  46. try:
  47. hostname = opts.get("hostname")
  48. setps("celeryd", name, hostname, rate_limit=True)
  49. try:
  50. if task.__tracer__ is None:
  51. task.__tracer__ = build_tracer(name, task, **opts)
  52. return task.__tracer__(uuid, args, kwargs, request)[0]
  53. finally:
  54. setps("celeryd", "-idle-", hostname, rate_limit=True)
  55. except Exception, exc:
  56. return report_internal_error(task, exc)
  57. class Request(object):
  58. """A request for task execution."""
  59. __slots__ = ("app", "name", "id", "args", "kwargs",
  60. "on_ack", "delivery_info", "hostname",
  61. "callbacks", "errbacks",
  62. "eventer", "connection_errors",
  63. "task", "eta", "expires",
  64. "request_dict", "acknowledged", "success_msg",
  65. "error_msg", "retry_msg", "time_start", "worker_pid",
  66. "_already_revoked", "_terminate_on_ack", "_tzlocal")
  67. #: Format string used to log task success.
  68. success_msg = """\
  69. Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s
  70. """
  71. #: Format string used to log task failure.
  72. error_msg = """\
  73. Task %(name)s[%(id)s] raised exception: %(exc)s
  74. """
  75. #: Format string used to log internal error.
  76. internal_error_msg = """\
  77. Task %(name)s[%(id)s] INTERNAL ERROR: %(exc)s
  78. """
  79. #: Format string used to log task retry.
  80. retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
  81. def __init__(self, body, on_ack=noop,
  82. hostname=None, eventer=None, app=None,
  83. connection_errors=None, request_dict=None,
  84. delivery_info=None, task=None, **opts):
  85. self.app = app or app_or_default(app)
  86. name = self.name = body["task"]
  87. self.id = body["id"]
  88. self.args = body.get("args", [])
  89. self.kwargs = body.get("kwargs", {})
  90. try:
  91. self.kwargs.items
  92. except AttributeError:
  93. raise exceptions.InvalidTaskError(
  94. "Task keyword arguments is not a mapping")
  95. if NEEDS_KWDICT:
  96. self.kwargs = kwdict(self.kwargs)
  97. eta = body.get("eta")
  98. expires = body.get("expires")
  99. utc = body.get("utc", False)
  100. self.on_ack = on_ack
  101. self.hostname = hostname or socket.gethostname()
  102. self.eventer = eventer
  103. self.connection_errors = connection_errors or ()
  104. self.task = task or self.app.tasks[name]
  105. self.acknowledged = self._already_revoked = False
  106. self.time_start = self.worker_pid = self._terminate_on_ack = None
  107. self._tzlocal = None
  108. # timezone means the message is timezone-aware, and the only timezone
  109. # supported at this point is UTC.
  110. if eta is not None:
  111. tz = tz_utc if utc else self.tzlocal
  112. self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
  113. else:
  114. self.eta = None
  115. if expires is not None:
  116. tz = tz_utc if utc else self.tzlocal
  117. self.expires = tz_to_local(maybe_iso8601(expires),
  118. self.tzlocal, tz)
  119. else:
  120. self.expires = None
  121. delivery_info = {} if delivery_info is None else delivery_info
  122. self.delivery_info = {
  123. "exchange": delivery_info.get("exchange"),
  124. "routing_key": delivery_info.get("routing_key"),
  125. }
  126. self.request_dict = body
  127. @classmethod
  128. def from_message(cls, message, body, **kwargs):
  129. # should be deprecated
  130. return Request(body,
  131. delivery_info=getattr(message, "delivery_info", None), **kwargs)
  132. def extend_with_default_kwargs(self, loglevel, logfile):
  133. """Extend the tasks keyword arguments with standard task arguments.
  134. Currently these are `logfile`, `loglevel`, `task_id`,
  135. `task_name`, `task_retries`, and `delivery_info`.
  136. See :meth:`celery.task.base.Task.run` for more information.
  137. Magic keyword arguments are deprecated and will be removed
  138. in version 3.0.
  139. """
  140. kwargs = dict(self.kwargs)
  141. default_kwargs = {"logfile": logfile,
  142. "loglevel": loglevel,
  143. "task_id": self.id,
  144. "task_name": self.name,
  145. "task_retries": self.request_dict.get("retries", 0),
  146. "task_is_eager": False,
  147. "delivery_info": self.delivery_info}
  148. fun = self.task.run
  149. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  150. extend_with = dict((key, val) for key, val in default_kwargs.items()
  151. if key in supported_keys)
  152. kwargs.update(extend_with)
  153. return kwargs
  154. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  155. """Like :meth:`execute`, but using a worker pool.
  156. :param pool: A :class:`multiprocessing.Pool` instance.
  157. :keyword loglevel: The loglevel used by the task.
  158. :keyword logfile: The logfile used by the task.
  159. """
  160. if self.revoked():
  161. return
  162. task = self.task
  163. hostname = self.hostname
  164. kwargs = self.kwargs
  165. if self.task.accept_magic_kwargs:
  166. kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  167. request = self.request_dict
  168. request.update({"loglevel": loglevel, "logfile": logfile,
  169. "hostname": hostname, "is_eager": False,
  170. "delivery_info": self.delivery_info})
  171. result = pool.apply_async(execute_and_trace,
  172. args=(self.name, self.id, self.args, kwargs),
  173. kwargs={"hostname": hostname,
  174. "request": request},
  175. accept_callback=self.on_accepted,
  176. timeout_callback=self.on_timeout,
  177. callback=self.on_success,
  178. error_callback=self.on_failure,
  179. soft_timeout=task.soft_time_limit,
  180. timeout=task.time_limit)
  181. return result
  182. def execute(self, loglevel=None, logfile=None):
  183. """Execute the task in a :func:`~celery.task.trace.trace_task`.
  184. :keyword loglevel: The loglevel used by the task.
  185. :keyword logfile: The logfile used by the task.
  186. """
  187. if self.revoked():
  188. return
  189. # acknowledge task as being processed.
  190. if not self.task.acks_late:
  191. self.acknowledge()
  192. kwargs = self.kwargs
  193. if self.task.accept_magic_kwargs:
  194. kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  195. request = self.request_dict
  196. request.update({"loglevel": loglevel, "logfile": logfile,
  197. "hostname": self.hostname, "is_eager": False,
  198. "delivery_info": self.delivery_info})
  199. retval, _ = trace_task(self.task, self.id, self.args, kwargs,
  200. **{"hostname": self.hostname,
  201. "loader": self.app.loader,
  202. "request": request})
  203. self.acknowledge()
  204. return retval
  205. def maybe_expire(self):
  206. """If expired, mark the task as revoked."""
  207. if self.expires and datetime.now(self.tzlocal) > self.expires:
  208. state.revoked.add(self.id)
  209. if self.store_errors:
  210. self.task.backend.mark_as_revoked(self.id)
  211. def terminate(self, pool, signal=None):
  212. if self.time_start:
  213. return pool.terminate_job(self.worker_pid, signal)
  214. else:
  215. self._terminate_on_ack = (True, pool, signal)
  216. def revoked(self):
  217. """If revoked, skip task and mark state."""
  218. if self._already_revoked:
  219. return True
  220. if self.expires:
  221. self.maybe_expire()
  222. if self.id in state.revoked:
  223. warn("Skipping revoked task: %s[%s]", self.name, self.id)
  224. self.send_event("task-revoked", uuid=self.id)
  225. self.acknowledge()
  226. self._already_revoked = True
  227. return True
  228. return False
  229. def send_event(self, type, **fields):
  230. if self.eventer and self.eventer.enabled:
  231. self.eventer.send(type, **fields)
  232. def on_accepted(self, pid, time_accepted):
  233. """Handler called when task is accepted by worker pool."""
  234. self.worker_pid = pid
  235. self.time_start = time_accepted
  236. state.task_accepted(self)
  237. if not self.task.acks_late:
  238. self.acknowledge()
  239. self.send_event("task-started", uuid=self.id, pid=pid)
  240. if _does_debug:
  241. debug("Task accepted: %s[%s] pid:%r", self.name, self.id, pid)
  242. if self._terminate_on_ack is not None:
  243. _, pool, signal = self._terminate_on_ack
  244. self.terminate(pool, signal)
  245. def on_timeout(self, soft, timeout):
  246. """Handler called if the task times out."""
  247. state.task_ready(self)
  248. if soft:
  249. warn("Soft time limit (%ss) exceeded for %s[%s]",
  250. timeout, self.name, self.id)
  251. exc = exceptions.SoftTimeLimitExceeded(timeout)
  252. else:
  253. error("Hard time limit (%ss) exceeded for %s[%s]",
  254. timeout, self.name, self.id)
  255. exc = exceptions.TimeLimitExceeded(timeout)
  256. if self.store_errors:
  257. self.task.backend.mark_as_failure(self.id, exc)
  258. def on_success(self, ret_value, now=None):
  259. """Handler called if the task was successfully processed."""
  260. if isinstance(ret_value, ExceptionInfo):
  261. if isinstance(ret_value.exception, (
  262. SystemExit, KeyboardInterrupt)):
  263. raise ret_value.exception
  264. return self.on_failure(ret_value)
  265. state.task_ready(self)
  266. if self.task.acks_late:
  267. self.acknowledge()
  268. if self.eventer and self.eventer.enabled:
  269. now = time.time()
  270. runtime = self.time_start and (time.time() - self.time_start) or 0
  271. self.send_event("task-succeeded", uuid=self.id,
  272. result=safe_repr(ret_value), runtime=runtime)
  273. if _does_info:
  274. now = now or time.time()
  275. runtime = self.time_start and (time.time() - self.time_start) or 0
  276. info(self.success_msg.strip(), {
  277. "id": self.id, "name": self.name,
  278. "return_value": self.repr_result(ret_value),
  279. "runtime": runtime})
  280. def on_retry(self, exc_info):
  281. """Handler called if the task should be retried."""
  282. self.send_event("task-retried", uuid=self.id,
  283. exception=safe_repr(exc_info.exception.exc),
  284. traceback=safe_str(exc_info.traceback))
  285. if _does_info:
  286. info(self.retry_msg.strip(), {
  287. "id": self.id, "name": self.name,
  288. "exc": safe_repr(exc_info.exception.exc)}, exc_info=exc_info)
  289. def on_failure(self, exc_info):
  290. """Handler called if the task raised an exception."""
  291. state.task_ready(self)
  292. if not exc_info.internal:
  293. if isinstance(exc_info.exception, exceptions.RetryTaskError):
  294. return self.on_retry(exc_info)
  295. # This is a special case as the process would not have had
  296. # time to write the result.
  297. if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
  298. self.store_errors:
  299. self.task.backend.mark_as_failure(self.id, exc_info.exception)
  300. # (acks_late) acknowledge after result stored.
  301. if self.task.acks_late:
  302. self.acknowledge()
  303. self._log_error(exc_info)
  304. def _log_error(self, exc_info):
  305. format = self.error_msg
  306. description = "raised exception"
  307. severity = logging.ERROR
  308. self.send_event("task-failed", uuid=self.id,
  309. exception=safe_repr(exc_info.exception),
  310. traceback=safe_str(exc_info.traceback))
  311. if exc_info.internal:
  312. format = self.internal_error_msg
  313. description = "INTERNAL ERROR"
  314. severity = logging.CRITICAL
  315. context = {"hostname": self.hostname,
  316. "id": self.id,
  317. "name": self.name,
  318. "exc": safe_repr(exc_info.exception),
  319. "traceback": safe_str(exc_info.traceback),
  320. "args": safe_repr(self.args),
  321. "kwargs": safe_repr(self.kwargs),
  322. "description": description}
  323. logger.log(severity, format.strip(), context,
  324. exc_info=exc_info.exc_info,
  325. extra={"data": {"id": self.id,
  326. "name": self.name,
  327. "hostname": self.hostname}})
  328. task_obj = self.app.tasks.get(self.name, object)
  329. task_obj.send_error_email(context, exc_info.exception)
  330. def acknowledge(self):
  331. """Acknowledge task."""
  332. if not self.acknowledged:
  333. self.on_ack(logger, self.connection_errors)
  334. self.acknowledged = True
  335. def repr_result(self, result, maxlen=46):
  336. # 46 is the length needed to fit
  337. # "the quick brown fox jumps over the lazy dog" :)
  338. return truncate(safe_repr(result), maxlen)
  339. def info(self, safe=False):
  340. return {"id": self.id,
  341. "name": self.name,
  342. "args": self.args if safe else safe_repr(self.args),
  343. "kwargs": self.kwargs if safe else safe_repr(self.kwargs),
  344. "hostname": self.hostname,
  345. "time_start": self.time_start,
  346. "acknowledged": self.acknowledged,
  347. "delivery_info": self.delivery_info,
  348. "worker_pid": self.worker_pid}
  349. def shortinfo(self):
  350. return "%s[%s]%s%s" % (
  351. self.name, self.id,
  352. " eta:[%s]" % (self.eta, ) if self.eta else "",
  353. " expires:[%s]" % (self.expires, ) if self.expires else "")
  354. __str__ = shortinfo
  355. def __repr__(self):
  356. return '<%s %s: %s>' % (type(self).__name__, self.id,
  357. reprcall(self.name, self.args, self.kwargs))
  358. @property
  359. def tzlocal(self):
  360. if self._tzlocal is None:
  361. self._tzlocal = tz_or_local(self.app.conf.CELERY_TIMEZONE)
  362. return self._tzlocal
  363. @property
  364. def store_errors(self):
  365. return (not self.task.ignore_result
  366. or self.task.store_errors_even_if_ignored)
  367. def _compat_get_task_id(self):
  368. return self.id
  369. def _compat_set_task_id(self, value):
  370. self.id = value
  371. def _compat_get_task_name(self):
  372. return self.name
  373. def _compat_set_task_name(self, value):
  374. self.name = value
  375. task_id = property(_compat_get_task_id, _compat_set_task_id)
  376. task_name = property(_compat_get_task_name, _compat_set_task_name)
  377. class TaskRequest(Request):
  378. def __init__(self, name, id, args=(), kwargs={},
  379. eta=None, expires=None, **options):
  380. """Compatibility class."""
  381. super(TaskRequest, self).__init__({
  382. "task": name, "id": id, "args": args,
  383. "kwargs": kwargs, "eta": eta,
  384. "expires": expires}, **options)