job.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. import logging
  2. import os
  3. import sys
  4. import time
  5. import socket
  6. import warnings
  7. from datetime import datetime
  8. from celery import platforms
  9. from celery.app import app_or_default
  10. from celery.datastructures import ExceptionInfo
  11. from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
  12. from celery.exceptions import WorkerLostError, RetryTaskError
  13. from celery.execute.trace import TaskTrace
  14. from celery.registry import tasks
  15. from celery.utils import noop, kwdict, fun_takes_kwargs
  16. from celery.utils import truncate_text
  17. from celery.utils.compat import log_with_extra
  18. from celery.utils.timeutils import maybe_iso8601
  19. from celery.worker import state
  20. # pep8.py borks on a inline signature separator and
  21. # says "trailing whitespace" ;)
  22. EMAIL_SIGNATURE_SEP = "-- "
  23. #: format string for the body of an error e-mail.
  24. TASK_ERROR_EMAIL_BODY = """
  25. Task %%(name)s with id %%(id)s raised exception:\n%%(exc)r
  26. Task was called with args: %%(args)s kwargs: %%(kwargs)s.
  27. The contents of the full traceback was:
  28. %%(traceback)s
  29. %(EMAIL_SIGNATURE_SEP)s
  30. Just to let you know,
  31. celeryd at %%(hostname)s.
  32. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  33. #: Keys to keep from the message delivery info. The values
  34. #: of these keys must be pickleable.
  35. WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
  36. class InvalidTaskError(Exception):
  37. """The task has invalid data or is not properly constructed."""
  38. pass
  39. class AlreadyExecutedError(Exception):
  40. """Tasks can only be executed once, as they might change
  41. world-wide state."""
  42. pass
  43. class WorkerTaskTrace(TaskTrace):
  44. """Wraps the task in a jail, catches all exceptions, and
  45. saves the status and result of the task execution to the task
  46. meta backend.
  47. If the call was successful, it saves the result to the task result
  48. backend, and sets the task status to `"SUCCESS"`.
  49. If the call raises :exc:`celery.exceptions.RetryTaskError`, it extracts
  50. the original exception, uses that as the result and sets the task status
  51. to `"RETRY"`.
  52. If the call results in an exception, it saves the exception as the task
  53. result, and sets the task status to `"FAILURE"`.
  54. :param task_name: The name of the task to execute.
  55. :param task_id: The unique id of the task.
  56. :param args: List of positional args to pass on to the function.
  57. :param kwargs: Keyword arguments mapping to pass on to the function.
  58. :returns: the evaluated functions return value on success, or
  59. the exception instance on failure.
  60. """
  61. #: Current loader.
  62. loader = None
  63. #: Hostname to report as.
  64. hostname = None
  65. def __init__(self, *args, **kwargs):
  66. self.loader = kwargs.get("loader") or app_or_default().loader
  67. self.hostname = kwargs.get("hostname") or socket.gethostname()
  68. super(WorkerTaskTrace, self).__init__(*args, **kwargs)
  69. self._store_errors = True
  70. if self.task.ignore_result:
  71. self._store_errors = self.task.store_errors_even_if_ignored
  72. self.super = super(WorkerTaskTrace, self)
  73. def execute_safe(self, *args, **kwargs):
  74. """Same as :meth:`execute`, but catches errors."""
  75. try:
  76. return self.execute(*args, **kwargs)
  77. except Exception, exc:
  78. _type, _value, _tb = sys.exc_info()
  79. _value = self.task.backend.prepare_exception(exc)
  80. exc_info = ExceptionInfo((_type, _value, _tb))
  81. warnings.warn("Exception outside body: %s: %s\n%s" % tuple(
  82. map(str, (exc.__class__, exc, exc_info.traceback))))
  83. return exc_info
  84. def execute(self):
  85. """Execute, trace and store the result of the task."""
  86. self.loader.on_task_init(self.task_id, self.task)
  87. if self.task.track_started:
  88. if not self.task.ignore_result:
  89. self.task.backend.mark_as_started(self.task_id,
  90. pid=os.getpid(),
  91. hostname=self.hostname)
  92. try:
  93. return super(WorkerTaskTrace, self).execute()
  94. finally:
  95. self.task.backend.process_cleanup()
  96. self.loader.on_process_cleanup()
  97. def handle_success(self, retval, *args):
  98. """Handle successful execution."""
  99. if not self.task.ignore_result:
  100. self.task.backend.mark_as_done(self.task_id, retval)
  101. return self.super.handle_success(retval, *args)
  102. def handle_retry(self, exc, type_, tb, strtb):
  103. """Handle retry exception."""
  104. message, orig_exc = exc.args
  105. if self._store_errors:
  106. self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
  107. return self.super.handle_retry(exc, type_, tb, strtb)
  108. def handle_failure(self, exc, type_, tb, strtb):
  109. """Handle exception."""
  110. if self._store_errors:
  111. exc = self.task.backend.mark_as_failure(self.task_id, exc, strtb)
  112. else:
  113. exc = self.task.backend.prepare_exception(exc)
  114. return self.super.handle_failure(exc, type_, tb, strtb)
  115. def execute_and_trace(task_name, *args, **kwargs):
  116. """This is a pickleable method used as a target when applying to pools.
  117. It's the same as::
  118. >>> WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
  119. """
  120. hostname = kwargs.get("hostname")
  121. platforms.set_mp_process_title("celeryd", info=task_name,
  122. hostname=hostname)
  123. try:
  124. return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
  125. finally:
  126. platforms.set_mp_process_title("celeryd", hostname=hostname)
  127. class TaskRequest(object):
  128. """A request for task execution."""
  129. #: Kind of task. Must be a name registered in the task registry.
  130. name = None
  131. #: The task class (set by constructor using :attr:`task_name`).
  132. task = None
  133. #: UUID of the task.
  134. task_id = None
  135. #: List of positional arguments to apply to the task.
  136. args = None
  137. #: Mapping of keyword arguments to apply to the task.
  138. kwargs = None
  139. #: Number of times the task has been retried.
  140. retries = 0
  141. #: The tasks eta (for information only).
  142. eta = None
  143. #: When the task expires.
  144. expires = None
  145. #: Callback called when the task should be acknowledged.
  146. on_ack = None
  147. #: The message object. Used to acknowledge the message.
  148. message = None
  149. #: Flag set when the task has been executed.
  150. executed = False
  151. #: Additional delivery info, e.g. contains the path from
  152. #: Producer to consumer.
  153. delivery_info = None
  154. #: Flag set when the task has been acknowledged.
  155. acknowledged = False
  156. #: Format string used to log task success.
  157. success_msg = """\
  158. Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s
  159. """
  160. #: Format string used to log task failure.
  161. error_msg = """\
  162. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  163. """
  164. #: Format string used to log task retry.
  165. retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
  166. #: Format string used to generate error e-mail subjects.
  167. email_subject = """\
  168. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  169. """
  170. #: Format string used to generate error e-mail content.
  171. email_body = TASK_ERROR_EMAIL_BODY
  172. #: Timestamp set when the task is started.
  173. time_start = None
  174. _already_revoked = False
  175. def __init__(self, task_name, task_id, args, kwargs,
  176. on_ack=noop, retries=0, delivery_info=None, hostname=None,
  177. email_subject=None, email_body=None, logger=None,
  178. eventer=None, eta=None, expires=None, app=None, **opts):
  179. self.app = app_or_default(app)
  180. self.task_name = task_name
  181. self.task_id = task_id
  182. self.retries = retries
  183. self.args = args
  184. self.kwargs = kwargs
  185. self.eta = eta
  186. self.expires = expires
  187. self.on_ack = on_ack
  188. self.delivery_info = delivery_info or {}
  189. self.hostname = hostname or socket.gethostname()
  190. self.logger = logger or self.app.log.get_default_logger()
  191. self.eventer = eventer
  192. self.email_subject = email_subject or self.email_subject
  193. self.email_body = email_body or self.email_body
  194. self.task = tasks[self.task_name]
  195. self._store_errors = True
  196. if self.task.ignore_result:
  197. self._store_errors = self.task.store_errors_even_if_ignored
  198. @classmethod
  199. def from_message(cls, message, message_data, on_ack=noop, **kw):
  200. """Create request from a task message.
  201. :raises UnknownTaskError: if the message does not describe a task,
  202. the message is also rejected.
  203. """
  204. _delivery_info = getattr(message, "delivery_info", {})
  205. delivery_info = dict((key, _delivery_info.get(key))
  206. for key in WANTED_DELIVERY_INFO)
  207. kwargs = message_data["kwargs"]
  208. if not hasattr(kwargs, "items"):
  209. raise InvalidTaskError("Task keyword arguments is not a mapping.")
  210. return cls(task_name=message_data["task"],
  211. task_id=message_data["id"],
  212. args=message_data["args"],
  213. kwargs=kwdict(kwargs),
  214. retries=message_data.get("retries", 0),
  215. eta=maybe_iso8601(message_data.get("eta")),
  216. expires=maybe_iso8601(message_data.get("expires")),
  217. on_ack=on_ack,
  218. delivery_info=delivery_info,
  219. **kw)
  220. def get_instance_attrs(self, loglevel, logfile):
  221. return {"logfile": logfile,
  222. "loglevel": loglevel,
  223. "id": self.task_id,
  224. "retries": self.retries,
  225. "is_eager": False,
  226. "delivery_info": self.delivery_info}
  227. def extend_with_default_kwargs(self, loglevel, logfile):
  228. """Extend the tasks keyword arguments with standard task arguments.
  229. Currently these are `logfile`, `loglevel`, `task_id`,
  230. `task_name`, `task_retries`, and `delivery_info`.
  231. See :meth:`celery.task.base.Task.run` for more information.
  232. """
  233. if not self.task.accept_magic_kwargs:
  234. return self.kwargs
  235. kwargs = dict(self.kwargs)
  236. default_kwargs = {"logfile": logfile,
  237. "loglevel": loglevel,
  238. "task_id": self.task_id,
  239. "task_name": self.task_name,
  240. "task_retries": self.retries,
  241. "task_is_eager": False,
  242. "delivery_info": self.delivery_info}
  243. fun = self.task.run
  244. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  245. extend_with = dict((key, val) for key, val in default_kwargs.items()
  246. if key in supported_keys)
  247. kwargs.update(extend_with)
  248. return kwargs
  249. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  250. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  251. :param pool: A :class:`multiprocessing.Pool` instance.
  252. :keyword loglevel: The loglevel used by the task.
  253. :keyword logfile: The logfile used by the task.
  254. """
  255. if self.revoked():
  256. return
  257. # Make sure task has not already been executed.
  258. self._set_executed_bit()
  259. args = self._get_tracer_args(loglevel, logfile)
  260. instance_attrs = self.get_instance_attrs(loglevel, logfile)
  261. result = pool.apply_async(execute_and_trace,
  262. args=args,
  263. kwargs={"hostname": self.hostname,
  264. "request": instance_attrs},
  265. accept_callback=self.on_accepted,
  266. timeout_callback=self.on_timeout,
  267. callbacks=[self.on_success],
  268. errbacks=[self.on_failure])
  269. return result
  270. def execute(self, loglevel=None, logfile=None):
  271. """Execute the task in a :class:`WorkerTaskTrace`.
  272. :keyword loglevel: The loglevel used by the task.
  273. :keyword logfile: The logfile used by the task.
  274. """
  275. if self.revoked():
  276. return
  277. # Make sure task has not already been executed.
  278. self._set_executed_bit()
  279. # acknowledge task as being processed.
  280. if not self.task.acks_late:
  281. self.acknowledge()
  282. instance_attrs = self.get_instance_attrs(loglevel, logfile)
  283. tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile),
  284. **{"hostname": self.hostname,
  285. "loader": self.app.loader,
  286. "request": instance_attrs})
  287. retval = tracer.execute()
  288. self.acknowledge()
  289. return retval
  290. def maybe_expire(self):
  291. """If expired, mark the task as revoked."""
  292. if self.expires and datetime.now() > self.expires:
  293. state.revoked.add(self.task_id)
  294. if self._store_errors:
  295. self.task.backend.mark_as_revoked(self.task_id)
  296. def revoked(self):
  297. """If revoked, skip task and mark state."""
  298. if self._already_revoked:
  299. return True
  300. if self.expires:
  301. self.maybe_expire()
  302. if self.task_id in state.revoked:
  303. self.logger.warn("Skipping revoked task: %s[%s]" % (
  304. self.task_name, self.task_id))
  305. self.send_event("task-revoked", uuid=self.task_id)
  306. self.acknowledge()
  307. self._already_revoked = True
  308. return True
  309. return False
  310. def send_event(self, type, **fields):
  311. if self.eventer:
  312. self.eventer.send(type, **fields)
  313. def on_accepted(self):
  314. """Handler called when task is accepted by worker pool."""
  315. self.time_start = time.time()
  316. state.task_accepted(self)
  317. if not self.task.acks_late:
  318. self.acknowledge()
  319. self.send_event("task-started", uuid=self.task_id)
  320. self.logger.debug("Task accepted: %s[%s]" % (
  321. self.task_name, self.task_id))
  322. def on_timeout(self, soft):
  323. """Handler called if the task times out."""
  324. state.task_ready(self)
  325. if soft:
  326. self.logger.warning("Soft time limit exceeded for %s[%s]" % (
  327. self.task_name, self.task_id))
  328. exc = SoftTimeLimitExceeded()
  329. else:
  330. self.logger.error("Hard time limit exceeded for %s[%s]" % (
  331. self.task_name, self.task_id))
  332. exc = TimeLimitExceeded()
  333. if self._store_errors:
  334. self.task.backend.mark_as_failure(self.task_id, exc)
  335. def on_success(self, ret_value):
  336. """Handler called if the task was successfully processed."""
  337. state.task_ready(self)
  338. if self.task.acks_late:
  339. self.acknowledge()
  340. runtime = self.time_start and (time.time() - self.time_start) or 0
  341. self.send_event("task-succeeded", uuid=self.task_id,
  342. result=repr(ret_value), runtime=runtime)
  343. self.logger.info(self.success_msg.strip() % {
  344. "id": self.task_id,
  345. "name": self.task_name,
  346. "return_value": self.repr_result(ret_value),
  347. "runtime": runtime})
  348. def on_retry(self, exc_info):
  349. """Handler called if the task should be retried."""
  350. self.send_event("task-retried", uuid=self.task_id,
  351. exception=repr(exc_info.exception.exc),
  352. traceback=repr(exc_info.traceback))
  353. self.logger.info(self.retry_msg.strip() % {
  354. "id": self.task_id,
  355. "name": self.task_name,
  356. "exc": repr(exc_info.exception.exc)})
  357. def on_failure(self, exc_info):
  358. """Handler called if the task raised an exception."""
  359. state.task_ready(self)
  360. if self.task.acks_late:
  361. self.acknowledge()
  362. if isinstance(exc_info.exception, RetryTaskError):
  363. return self.on_retry(exc_info)
  364. # This is a special case as the process would not have had
  365. # time to write the result.
  366. if isinstance(exc_info.exception, WorkerLostError):
  367. if self._store_errors:
  368. self.task.backend.mark_as_failure(self.task_id,
  369. exc_info.exception)
  370. self.send_event("task-failed", uuid=self.task_id,
  371. exception=repr(exc_info.exception),
  372. traceback=exc_info.traceback)
  373. context = {"hostname": self.hostname,
  374. "id": self.task_id,
  375. "name": self.task_name,
  376. "exc": repr(exc_info.exception),
  377. "traceback": unicode(exc_info.traceback, 'utf-8'),
  378. "args": self.args,
  379. "kwargs": self.kwargs}
  380. log_with_extra(self.logger, logging.ERROR,
  381. self.error_msg.strip() % context,
  382. exc_info=exc_info,
  383. extra={"data": {"hostname": self.hostname,
  384. "id": self.task_id,
  385. "name": self.task_name}})
  386. task_obj = tasks.get(self.task_name, object)
  387. self.send_error_email(task_obj, context, exc_info.exception,
  388. enabled=task_obj.send_error_emails,
  389. whitelist=task_obj.error_whitelist)
  390. def acknowledge(self):
  391. """Acknowledge task."""
  392. if not self.acknowledged:
  393. self.on_ack()
  394. self.acknowledged = True
  395. def send_error_email(self, task, context, exc,
  396. whitelist=None, enabled=False, fail_silently=True):
  397. if enabled and not task.disable_error_emails:
  398. if whitelist:
  399. if not isinstance(exc, tuple(whitelist)):
  400. return
  401. subject = self.email_subject.strip() % context
  402. body = self.email_body.strip() % context
  403. self.app.mail_admins(subject, body, fail_silently=fail_silently)
  404. def repr_result(self, result, maxlen=46):
  405. # 46 is the length needed to fit
  406. # "the quick brown fox jumps over the lazy dog" :)
  407. return truncate_text(repr(result), maxlen)
  408. def info(self, safe=False):
  409. args = self.args
  410. kwargs = self.kwargs
  411. if not safe:
  412. args = repr(args)
  413. kwargs = repr(self.kwargs)
  414. return {"id": self.task_id,
  415. "name": self.task_name,
  416. "args": args,
  417. "kwargs": kwargs,
  418. "hostname": self.hostname,
  419. "time_start": self.time_start,
  420. "acknowledged": self.acknowledged,
  421. "delivery_info": self.delivery_info}
  422. def shortinfo(self):
  423. return "%s[%s]%s%s" % (
  424. self.task_name,
  425. self.task_id,
  426. self.eta and " eta:[%s]" % (self.eta, ) or "",
  427. self.expires and " expires:[%s]" % (self.expires, ) or "")
  428. def __repr__(self):
  429. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  430. self.__class__.__name__,
  431. self.task_name, self.task_id, self.args, self.kwargs)
  432. def _get_tracer_args(self, loglevel=None, logfile=None):
  433. """Get the :class:`WorkerTaskTrace` tracer for this task."""
  434. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  435. return self.task_name, self.task_id, self.args, task_func_kwargs
  436. def _set_executed_bit(self):
  437. """Set task as executed to make sure it's not executed again."""
  438. if self.executed:
  439. raise AlreadyExecutedError(
  440. "Task %s[%s] has already been executed" % (
  441. self.task_name, self.task_id))
  442. self.executed = True