job.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.job
  4. ~~~~~~~~~~~~~~~~~
  5. This module defines the :class:`TaskRequest` class,
  6. which specifies how tasks are executed.
  7. :copyright: (c) 2009 - 2011 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. import os
  12. import sys
  13. import time
  14. import socket
  15. import warnings
  16. from datetime import datetime
  17. from .. import current_app
  18. from .. import exceptions
  19. from .. import platforms
  20. from .. import registry
  21. from ..app import app_or_default
  22. from ..datastructures import ExceptionInfo
  23. from ..execute.trace import TaskTrace
  24. from ..utils import noop, kwdict, fun_takes_kwargs, truncate_text
  25. from ..utils.encoding import safe_repr, safe_str, default_encoding
  26. from ..utils.serialization import get_pickleable_exception
  27. from ..utils.timeutils import maybe_iso8601
  28. from . import state
  29. #: Keys to keep from the message delivery info. The values
  30. #: of these keys must be pickleable.
  31. WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
  32. class InvalidTaskError(Exception):
  33. """The task has invalid data or is not properly constructed."""
  34. pass
  35. if sys.version_info >= (3, 0):
  36. def default_encode(obj):
  37. return obj
  38. else:
  39. def default_encode(obj): # noqa
  40. return unicode(obj, default_encoding())
  41. class WorkerTaskTrace(TaskTrace):
  42. """Wraps the task in a jail, catches all exceptions, and
  43. saves the status and result of the task execution to the task
  44. meta backend.
  45. If the call was successful, it saves the result to the task result
  46. backend, and sets the task status to `"SUCCESS"`.
  47. If the call raises :exc:`~celery.exceptions.RetryTaskError`, it extracts
  48. the original exception, uses that as the result and sets the task status
  49. to `"RETRY"`.
  50. If the call results in an exception, it saves the exception as the task
  51. result, and sets the task status to `"FAILURE"`.
  52. :param task_name: The name of the task to execute.
  53. :param task_id: The unique id of the task.
  54. :param args: List of positional args to pass on to the function.
  55. :param kwargs: Keyword arguments mapping to pass on to the function.
  56. :keyword loader: Custom loader to use, if not specified the current app
  57. loader will be used.
  58. :keyword hostname: Custom hostname to use, if not specified the system
  59. hostname will be used.
  60. :returns: the evaluated functions return value on success, or
  61. the exception instance on failure.
  62. """
  63. #: Current loader.
  64. loader = None
  65. #: Hostname to report as.
  66. hostname = None
  67. def __init__(self, *args, **kwargs):
  68. self.loader = kwargs.get("loader") or current_app.loader
  69. self.hostname = kwargs.get("hostname") or socket.gethostname()
  70. super(WorkerTaskTrace, self).__init__(*args, **kwargs)
  71. self._store_errors = True
  72. if self.task.ignore_result:
  73. self._store_errors = self.task.store_errors_even_if_ignored
  74. self.super = super(WorkerTaskTrace, self)
  75. def execute_safe(self, *args, **kwargs):
  76. """Same as :meth:`execute`, but catches errors."""
  77. try:
  78. return self.execute(*args, **kwargs)
  79. except Exception, exc:
  80. _type, _value, _tb = sys.exc_info()
  81. _value = self.task.backend.prepare_exception(exc)
  82. exc_info = ExceptionInfo((_type, _value, _tb))
  83. warnings.warn("Exception outside body: %s: %s\n%s" % tuple(
  84. map(str, (exc.__class__, exc, exc_info.traceback))))
  85. return exc_info
  86. def execute(self):
  87. """Execute, trace and store the result of the task."""
  88. self.loader.on_task_init(self.task_id, self.task)
  89. if self.task.track_started:
  90. if not self.task.ignore_result:
  91. self.task.backend.mark_as_started(self.task_id,
  92. pid=os.getpid(),
  93. hostname=self.hostname)
  94. try:
  95. return super(WorkerTaskTrace, self).execute()
  96. finally:
  97. try:
  98. self.task.backend.process_cleanup()
  99. self.loader.on_process_cleanup()
  100. except (KeyboardInterrupt, SystemExit, MemoryError):
  101. raise
  102. except Exception, exc:
  103. logger = current_app.log.get_default_logger()
  104. logger.error("Process cleanup failed: %r", exc,
  105. exc_info=sys.exc_info())
  106. def handle_success(self, retval, *args):
  107. """Handle successful execution."""
  108. if not self.task.ignore_result:
  109. self.task.backend.mark_as_done(self.task_id, retval)
  110. return self.super.handle_success(retval, *args)
  111. def handle_retry(self, exc, type_, tb, strtb):
  112. """Handle retry exception."""
  113. message, orig_exc = exc.args
  114. if self._store_errors:
  115. self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
  116. return self.super.handle_retry(exc, type_, tb, strtb)
  117. def handle_failure(self, exc, type_, tb, strtb):
  118. """Handle exception."""
  119. if self._store_errors:
  120. self.task.backend.mark_as_failure(self.task_id, exc, strtb)
  121. exc = get_pickleable_exception(exc)
  122. return self.super.handle_failure(exc, type_, tb, strtb)
  123. def execute_and_trace(task_name, *args, **kwargs):
  124. """This is a pickleable method used as a target when applying to pools.
  125. It's the same as::
  126. >>> WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
  127. """
  128. hostname = kwargs.get("hostname")
  129. platforms.set_mp_process_title("celeryd", task_name, hostname=hostname)
  130. try:
  131. return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
  132. finally:
  133. platforms.set_mp_process_title("celeryd", "-idle-", hostname)
  134. class TaskRequest(object):
  135. """A request for task execution."""
  136. #: Kind of task. Must be a name registered in the task registry.
  137. name = None
  138. #: The task class (set by constructor using :attr:`task_name`).
  139. task = None
  140. #: UUID of the task.
  141. task_id = None
  142. #: UUID of the taskset that this task belongs to.
  143. taskset_id = None
  144. #: List of positional arguments to apply to the task.
  145. args = None
  146. #: Mapping of keyword arguments to apply to the task.
  147. kwargs = None
  148. #: Number of times the task has been retried.
  149. retries = 0
  150. #: The tasks eta (for information only).
  151. eta = None
  152. #: When the task expires.
  153. expires = None
  154. #: Body of a chord depending on this task.
  155. chord = None
  156. #: Callback called when the task should be acknowledged.
  157. on_ack = None
  158. #: The message object. Used to acknowledge the message.
  159. message = None
  160. #: Additional delivery info, e.g. contains the path from
  161. #: Producer to consumer.
  162. delivery_info = None
  163. #: Flag set when the task has been acknowledged.
  164. acknowledged = False
  165. #: Format string used to log task success.
  166. success_msg = """\
  167. Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s
  168. """
  169. #: Format string used to log task failure.
  170. error_msg = """\
  171. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  172. """
  173. #: Format string used to log task retry.
  174. retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
  175. #: Timestamp set when the task is started.
  176. time_start = None
  177. #: Process id of the worker processing this task (if any).
  178. worker_pid = None
  179. _already_revoked = False
  180. _terminate_on_ack = None
  181. def __init__(self, task_name, task_id, args, kwargs,
  182. on_ack=noop, retries=0, delivery_info=None, hostname=None,
  183. logger=None, eventer=None, eta=None, expires=None, app=None,
  184. taskset_id=None, chord=None, **opts):
  185. self.app = app_or_default(app)
  186. self.task_name = task_name
  187. self.task_id = task_id
  188. self.taskset_id = taskset_id
  189. self.retries = retries
  190. self.args = args
  191. self.kwargs = kwargs
  192. self.eta = eta
  193. self.expires = expires
  194. self.chord = chord
  195. self.on_ack = on_ack
  196. self.delivery_info = {} if delivery_info is None else delivery_info
  197. self.hostname = hostname or socket.gethostname()
  198. self.logger = logger or self.app.log.get_default_logger()
  199. self.eventer = eventer
  200. self.task = registry.tasks[self.task_name]
  201. self._store_errors = True
  202. if self.task.ignore_result:
  203. self._store_errors = self.task.store_errors_even_if_ignored
  204. @classmethod
  205. def from_message(cls, message, body, on_ack=noop, **kw):
  206. """Create request from a task message.
  207. :raises UnknownTaskError: if the message does not describe a task,
  208. the message is also rejected.
  209. """
  210. delivery_info = getattr(message, "delivery_info", {})
  211. delivery_info = dict((key, delivery_info.get(key))
  212. for key in WANTED_DELIVERY_INFO)
  213. kwargs = body.get("kwargs", {})
  214. if not hasattr(kwargs, "items"):
  215. raise InvalidTaskError("Task keyword arguments is not a mapping.")
  216. try:
  217. task_name = body["task"]
  218. task_id = body["id"]
  219. except KeyError, exc:
  220. raise InvalidTaskError(
  221. "Task message is missing required field %r" % (exc, ))
  222. return cls(task_name=task_name,
  223. task_id=task_id,
  224. taskset_id=body.get("taskset", None),
  225. args=body.get("args", []),
  226. kwargs=kwdict(kwargs),
  227. chord=body.get("chord"),
  228. retries=body.get("retries", 0),
  229. eta=maybe_iso8601(body.get("eta")),
  230. expires=maybe_iso8601(body.get("expires")),
  231. on_ack=on_ack, delivery_info=delivery_info, **kw)
  232. def get_instance_attrs(self, loglevel, logfile):
  233. return {"logfile": logfile, "loglevel": loglevel,
  234. "hostname": self.hostname,
  235. "id": self.task_id, "taskset": self.taskset_id,
  236. "retries": self.retries, "is_eager": False,
  237. "delivery_info": self.delivery_info, "chord": self.chord}
  238. def extend_with_default_kwargs(self, loglevel, logfile):
  239. """Extend the tasks keyword arguments with standard task arguments.
  240. Currently these are `logfile`, `loglevel`, `task_id`,
  241. `task_name`, `task_retries`, and `delivery_info`.
  242. See :meth:`celery.task.base.Task.run` for more information.
  243. Magic keyword arguments are deprecated and will be removed
  244. in version 3.0.
  245. """
  246. if not self.task.accept_magic_kwargs:
  247. return self.kwargs
  248. kwargs = dict(self.kwargs)
  249. default_kwargs = {"logfile": logfile,
  250. "loglevel": loglevel,
  251. "task_id": self.task_id,
  252. "task_name": self.task_name,
  253. "task_retries": self.retries,
  254. "task_is_eager": False,
  255. "delivery_info": self.delivery_info}
  256. fun = self.task.run
  257. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  258. extend_with = dict((key, val) for key, val in default_kwargs.items()
  259. if key in supported_keys)
  260. kwargs.update(extend_with)
  261. return kwargs
  262. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  263. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  264. :param pool: A :class:`multiprocessing.Pool` instance.
  265. :keyword loglevel: The loglevel used by the task.
  266. :keyword logfile: The logfile used by the task.
  267. """
  268. if self.revoked():
  269. return
  270. args = self._get_tracer_args(loglevel, logfile)
  271. instance_attrs = self.get_instance_attrs(loglevel, logfile)
  272. result = pool.apply_async(execute_and_trace,
  273. args=args,
  274. kwargs={"hostname": self.hostname,
  275. "request": instance_attrs},
  276. accept_callback=self.on_accepted,
  277. timeout_callback=self.on_timeout,
  278. callback=self.on_success,
  279. errback=self.on_failure,
  280. soft_timeout=self.task.soft_time_limit,
  281. timeout=self.task.time_limit)
  282. return result
  283. def execute(self, loglevel=None, logfile=None):
  284. """Execute the task in a :class:`WorkerTaskTrace`.
  285. :keyword loglevel: The loglevel used by the task.
  286. :keyword logfile: The logfile used by the task.
  287. """
  288. if self.revoked():
  289. return
  290. # acknowledge task as being processed.
  291. if not self.task.acks_late:
  292. self.acknowledge()
  293. instance_attrs = self.get_instance_attrs(loglevel, logfile)
  294. tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile),
  295. **{"hostname": self.hostname,
  296. "loader": self.app.loader,
  297. "request": instance_attrs})
  298. retval = tracer.execute()
  299. self.acknowledge()
  300. return retval
  301. def maybe_expire(self):
  302. """If expired, mark the task as revoked."""
  303. if self.expires and datetime.now() > self.expires:
  304. state.revoked.add(self.task_id)
  305. if self._store_errors:
  306. self.task.backend.mark_as_revoked(self.task_id)
  307. def terminate(self, pool, signal=None):
  308. if self.time_start:
  309. return pool.terminate_job(self.worker_pid, signal)
  310. else:
  311. self._terminate_on_ack = (True, pool, signal)
  312. def revoked(self):
  313. """If revoked, skip task and mark state."""
  314. if self._already_revoked:
  315. return True
  316. if self.expires:
  317. self.maybe_expire()
  318. if self.task_id in state.revoked:
  319. self.logger.warn("Skipping revoked task: %s[%s]",
  320. self.task_name, self.task_id)
  321. self.send_event("task-revoked", uuid=self.task_id)
  322. self.acknowledge()
  323. self._already_revoked = True
  324. return True
  325. return False
  326. def send_event(self, type, **fields):
  327. if self.eventer:
  328. self.eventer.send(type, **fields)
  329. def on_accepted(self, pid, time_accepted):
  330. """Handler called when task is accepted by worker pool."""
  331. self.worker_pid = pid
  332. self.time_start = time_accepted
  333. state.task_accepted(self)
  334. if not self.task.acks_late:
  335. self.acknowledge()
  336. self.send_event("task-started", uuid=self.task_id, pid=pid)
  337. self.logger.debug("Task accepted: %s[%s] pid:%r",
  338. self.task_name, self.task_id, pid)
  339. if self._terminate_on_ack is not None:
  340. _, pool, signal = self._terminate_on_ack
  341. self.terminate(pool, signal)
  342. def on_timeout(self, soft, timeout):
  343. """Handler called if the task times out."""
  344. state.task_ready(self)
  345. if soft:
  346. self.logger.warning("Soft time limit (%ss) exceeded for %s[%s]",
  347. timeout, self.task_name, self.task_id)
  348. exc = exceptions.SoftTimeLimitExceeded(timeout)
  349. else:
  350. self.logger.error("Hard time limit (%ss) exceeded for %s[%s]",
  351. timeout, self.task_name, self.task_id)
  352. exc = exceptions.TimeLimitExceeded(timeout)
  353. if self._store_errors:
  354. self.task.backend.mark_as_failure(self.task_id, exc)
  355. def on_success(self, ret_value):
  356. """Handler called if the task was successfully processed."""
  357. state.task_ready(self)
  358. if self.task.acks_late:
  359. self.acknowledge()
  360. runtime = self.time_start and (time.time() - self.time_start) or 0
  361. self.send_event("task-succeeded", uuid=self.task_id,
  362. result=safe_repr(ret_value), runtime=runtime)
  363. self.logger.info(self.success_msg.strip(),
  364. {"id": self.task_id,
  365. "name": self.task_name,
  366. "return_value": self.repr_result(ret_value),
  367. "runtime": runtime})
  368. def on_retry(self, exc_info):
  369. """Handler called if the task should be retried."""
  370. self.send_event("task-retried", uuid=self.task_id,
  371. exception=safe_repr(exc_info.exception.exc),
  372. traceback=safe_str(exc_info.traceback))
  373. self.logger.info(self.retry_msg.strip(),
  374. {"id": self.task_id,
  375. "name": self.task_name,
  376. "exc": safe_repr(exc_info.exception.exc)},
  377. exc_info=exc_info)
  378. def on_failure(self, exc_info):
  379. """Handler called if the task raised an exception."""
  380. state.task_ready(self)
  381. if self.task.acks_late:
  382. self.acknowledge()
  383. if isinstance(exc_info.exception, exceptions.RetryTaskError):
  384. return self.on_retry(exc_info)
  385. # This is a special case as the process would not have had
  386. # time to write the result.
  387. if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
  388. self._store_errors:
  389. self.task.backend.mark_as_failure(self.task_id, exc_info.exception)
  390. self.send_event("task-failed", uuid=self.task_id,
  391. exception=safe_repr(exc_info.exception),
  392. traceback=safe_str(exc_info.traceback))
  393. context = {"hostname": self.hostname,
  394. "id": self.task_id,
  395. "name": self.task_name,
  396. "exc": safe_repr(exc_info.exception),
  397. "traceback": safe_str(exc_info.traceback),
  398. "args": safe_repr(self.args),
  399. "kwargs": safe_repr(self.kwargs)}
  400. self.logger.error(self.error_msg.strip(), context,
  401. exc_info=exc_info.exc_info,
  402. extra={"data": {"id": self.task_id,
  403. "name": self.task_name,
  404. "hostname": self.hostname}})
  405. task_obj = registry.tasks.get(self.task_name, object)
  406. task_obj.send_error_email(context, exc_info.exception)
  407. def acknowledge(self):
  408. """Acknowledge task."""
  409. if not self.acknowledged:
  410. self.on_ack()
  411. self.acknowledged = True
  412. def repr_result(self, result, maxlen=46):
  413. # 46 is the length needed to fit
  414. # "the quick brown fox jumps over the lazy dog" :)
  415. return truncate_text(safe_repr(result), maxlen)
  416. def info(self, safe=False):
  417. return {"id": self.task_id,
  418. "name": self.task_name,
  419. "args": self.args if safe else safe_repr(self.args),
  420. "kwargs": self.kwargs if safe else safe_repr(self.kwargs),
  421. "hostname": self.hostname,
  422. "time_start": self.time_start,
  423. "acknowledged": self.acknowledged,
  424. "delivery_info": self.delivery_info,
  425. "worker_pid": self.worker_pid}
  426. def shortinfo(self):
  427. return "%s[%s]%s%s" % (
  428. self.task_name,
  429. self.task_id,
  430. " eta:[%s]" % (self.eta, ) if self.eta else "",
  431. " expires:[%s]" % (self.expires, ) if self.expires else "")
  432. __str__ = shortinfo
  433. def __repr__(self):
  434. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  435. self.__class__.__name__,
  436. self.task_name, self.task_id, self.args, self.kwargs)
  437. def _get_tracer_args(self, loglevel=None, logfile=None):
  438. """Get the :class:`WorkerTaskTrace` tracer for this task."""
  439. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  440. return self.task_name, self.task_id, self.args, task_func_kwargs