job.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. import logging
  2. import os
  3. import sys
  4. import time
  5. import socket
  6. import warnings
  7. from datetime import datetime
  8. from celery import log
  9. from celery import platforms
  10. from celery.datastructures import ExceptionInfo
  11. from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
  12. from celery.exceptions import WorkerLostError, RetryTaskError
  13. from celery.execute.trace import TaskTrace
  14. from celery.loaders import current_loader
  15. from celery.registry import tasks
  16. from celery.utils import noop, kwdict, fun_takes_kwargs
  17. from celery.utils import truncate_text, maybe_iso8601
  18. from celery.utils.mail import mail_admins
  19. from celery.utils.compat import log_with_extra
  20. from celery.worker import state
  21. # pep8.py borks on a inline signature separator and
  22. # says "trailing whitespace" ;)
  23. EMAIL_SIGNATURE_SEP = "-- "
  24. TASK_ERROR_EMAIL_BODY = """
  25. Task %%(name)s with id %%(id)s raised exception:\n%%(exc)s
  26. Task was called with args: %%(args)s kwargs: %%(kwargs)s.
  27. The contents of the full traceback was:
  28. %%(traceback)s
  29. %(EMAIL_SIGNATURE_SEP)s
  30. Just to let you know,
  31. celeryd at %%(hostname)s.
  32. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  33. WANTED_DELIVERY_INFO = ("exchange", "routing_key", "consumer_tag", )
  34. class InvalidTaskError(Exception):
  35. """The task has invalid data or is not properly constructed."""
  36. class AlreadyExecutedError(Exception):
  37. """Tasks can only be executed once, as they might change
  38. world-wide state."""
  39. class WorkerTaskTrace(TaskTrace):
  40. """Wraps the task in a jail, catches all exceptions, and
  41. saves the status and result of the task execution to the task
  42. meta backend.
  43. If the call was successful, it saves the result to the task result
  44. backend, and sets the task status to ``"SUCCESS"``.
  45. If the call raises :exc:`celery.exceptions.RetryTaskError`, it extracts
  46. the original exception, uses that as the result and sets the task status
  47. to ``"RETRY"``.
  48. If the call results in an exception, it saves the exception as the task
  49. result, and sets the task status to ``"FAILURE"``.
  50. :param task_name: The name of the task to execute.
  51. :param task_id: The unique id of the task.
  52. :param args: List of positional args to pass on to the function.
  53. :param kwargs: Keyword arguments mapping to pass on to the function.
  54. :returns: the evaluated functions return value on success, or
  55. the exception instance on failure.
  56. """
  57. def __init__(self, *args, **kwargs):
  58. self.loader = kwargs.get("loader") or current_loader()
  59. self.hostname = kwargs.get("hostname") or socket.gethostname()
  60. super(WorkerTaskTrace, self).__init__(*args, **kwargs)
  61. self._store_errors = True
  62. if self.task.ignore_result:
  63. self._store_errors = self.task.store_errors_even_if_ignored
  64. self.super = super(WorkerTaskTrace, self)
  65. def execute_safe(self, *args, **kwargs):
  66. """Same as :meth:`execute`, but catches errors."""
  67. try:
  68. return self.execute(*args, **kwargs)
  69. except Exception, exc:
  70. _type, _value, _tb = sys.exc_info()
  71. _value = self.task.backend.prepare_exception(exc)
  72. exc_info = ExceptionInfo((_type, _value, _tb))
  73. warnings.warn("Exception outside body: %s: %s\n%s" % tuple(
  74. map(str, (exc.__class__, exc, exc_info.traceback))))
  75. return exc_info
  76. def execute(self):
  77. """Execute, trace and store the result of the task."""
  78. self.loader.on_task_init(self.task_id, self.task)
  79. if self.task.track_started:
  80. if not self.task.ignore_result:
  81. self.task.backend.mark_as_started(self.task_id,
  82. pid=os.getpid(),
  83. hostname=self.hostname)
  84. try:
  85. return super(WorkerTaskTrace, self).execute()
  86. finally:
  87. self.task.backend.process_cleanup()
  88. self.loader.on_process_cleanup()
  89. def handle_success(self, retval, *args):
  90. """Handle successful execution."""
  91. if not self.task.ignore_result:
  92. self.task.backend.mark_as_done(self.task_id, retval)
  93. return self.super.handle_success(retval, *args)
  94. def handle_retry(self, exc, type_, tb, strtb):
  95. """Handle retry exception."""
  96. message, orig_exc = exc.args
  97. if self._store_errors:
  98. self.task.backend.mark_as_retry(self.task_id, orig_exc, strtb)
  99. return self.super.handle_retry(exc, type_, tb, strtb)
  100. def handle_failure(self, exc, type_, tb, strtb):
  101. """Handle exception."""
  102. if self._store_errors:
  103. exc = self.task.backend.mark_as_failure(self.task_id, exc, strtb)
  104. else:
  105. exc = self.task.backend.prepare_exception(exc)
  106. return self.super.handle_failure(exc, type_, tb, strtb)
  107. def execute_and_trace(task_name, *args, **kwargs):
  108. """This is a pickleable method used as a target when applying to pools.
  109. It's the same as::
  110. >>> WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
  111. """
  112. hostname = kwargs.get("hostname")
  113. platforms.set_mp_process_title("celeryd", info=task_name,
  114. hostname=hostname)
  115. try:
  116. return WorkerTaskTrace(task_name, *args, **kwargs).execute_safe()
  117. finally:
  118. platforms.set_mp_process_title("celeryd", hostname=hostname)
  119. class TaskRequest(object):
  120. """A request for task execution.
  121. :param task_name: see :attr:`task_name`.
  122. :param task_id: see :attr:`task_id`.
  123. :param args: see :attr:`args`
  124. :param kwargs: see :attr:`kwargs`.
  125. .. attribute:: task_name
  126. Kind of task. Must be a name registered in the task registry.
  127. .. attribute:: task_id
  128. UUID of the task.
  129. .. attribute:: args
  130. List of positional arguments to apply to the task.
  131. .. attribute:: kwargs
  132. Mapping of keyword arguments to apply to the task.
  133. .. attribute:: on_ack
  134. Callback called when the task should be acknowledged.
  135. .. attribute:: message
  136. The original message sent. Used for acknowledging the message.
  137. .. attribute:: executed
  138. Set to :const:`True` if the task has been executed.
  139. A task should only be executed once.
  140. .. attribute:: delivery_info
  141. Additional delivery info, e.g. the contains the path
  142. from producer to consumer.
  143. .. attribute:: acknowledged
  144. Set to :const:`True` if the task has been acknowledged.
  145. """
  146. # Logging output
  147. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  148. error_msg = """
  149. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  150. """
  151. retry_msg = """
  152. Task %(name)s[%(id)s] retry: %(exc)s
  153. """
  154. # E-mails
  155. email_subject = """
  156. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  157. """
  158. email_body = TASK_ERROR_EMAIL_BODY
  159. # Internal flags
  160. executed = False
  161. acknowledged = False
  162. time_start = None
  163. _already_revoked = False
  164. def __init__(self, task_name, task_id, args, kwargs,
  165. on_ack=noop, retries=0, delivery_info=None, hostname=None,
  166. email_subject=None, email_body=None, logger=None,
  167. eventer=None, eta=None, expires=None, **opts):
  168. self.task_name = task_name
  169. self.task_id = task_id
  170. self.retries = retries
  171. self.args = args
  172. self.kwargs = kwargs
  173. self.eta = eta
  174. self.expires = expires
  175. self.on_ack = on_ack
  176. self.delivery_info = delivery_info or {}
  177. self.hostname = hostname or socket.gethostname()
  178. self.logger = logger or log.get_default_logger()
  179. self.eventer = eventer
  180. self.email_subject = email_subject or self.email_subject
  181. self.email_body = email_body or self.email_body
  182. self.task = tasks[self.task_name]
  183. self._store_errors = True
  184. if self.task.ignore_result:
  185. self._store_errors = self.task.store_errors_even_if_ignored
  186. def maybe_expire(self):
  187. if self.expires and datetime.now() > self.expires:
  188. state.revoked.add(self.task_id)
  189. if self._store_errors:
  190. self.task.backend.mark_as_revoked(self.task_id)
  191. def revoked(self):
  192. if self._already_revoked:
  193. return True
  194. if self.expires:
  195. self.maybe_expire()
  196. if self.task_id in state.revoked:
  197. self.logger.warn("Skipping revoked task: %s[%s]" % (
  198. self.task_name, self.task_id))
  199. self.send_event("task-revoked", uuid=self.task_id)
  200. self.acknowledge()
  201. self._already_revoked = True
  202. return True
  203. return False
  204. @classmethod
  205. def from_message(cls, message, message_data, logger=None, eventer=None,
  206. hostname=None):
  207. """Create a :class:`TaskRequest` from a task message sent by
  208. :class:`celery.messaging.TaskPublisher`.
  209. :raises UnknownTaskError: if the message does not describe a task,
  210. the message is also rejected.
  211. :returns :class:`TaskRequest`:
  212. """
  213. task_name = message_data["task"]
  214. task_id = message_data["id"]
  215. args = message_data["args"]
  216. kwargs = message_data["kwargs"]
  217. retries = message_data.get("retries", 0)
  218. eta = maybe_iso8601(message_data.get("eta"))
  219. expires = maybe_iso8601(message_data.get("expires"))
  220. _delivery_info = getattr(message, "delivery_info", {})
  221. delivery_info = dict((key, _delivery_info.get(key))
  222. for key in WANTED_DELIVERY_INFO)
  223. if not hasattr(kwargs, "items"):
  224. raise InvalidTaskError("Task kwargs must be a dictionary.")
  225. return cls(task_name, task_id, args, kwdict(kwargs),
  226. retries=retries, on_ack=message.ack,
  227. delivery_info=delivery_info, logger=logger,
  228. eventer=eventer, hostname=hostname,
  229. eta=eta, expires=expires)
  230. def extend_with_default_kwargs(self, loglevel, logfile):
  231. """Extend the tasks keyword arguments with standard task arguments.
  232. Currently these are ``logfile``, ``loglevel``, ``task_id``,
  233. ``task_name``, ``task_retries``, and ``delivery_info``.
  234. See :meth:`celery.task.base.Task.run` for more information.
  235. """
  236. kwargs = dict(self.kwargs)
  237. default_kwargs = {"logfile": logfile,
  238. "loglevel": loglevel,
  239. "task_id": self.task_id,
  240. "task_name": self.task_name,
  241. "task_retries": self.retries,
  242. "task_is_eager": False,
  243. "delivery_info": self.delivery_info}
  244. fun = self.task.run
  245. supported_keys = fun_takes_kwargs(fun, default_kwargs)
  246. extend_with = dict((key, val) for key, val in default_kwargs.items()
  247. if key in supported_keys)
  248. kwargs.update(extend_with)
  249. return kwargs
  250. def _get_tracer_args(self, loglevel=None, logfile=None):
  251. """Get the :class:`WorkerTaskTrace` tracer for this task."""
  252. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  253. return self.task_name, self.task_id, self.args, task_func_kwargs
  254. def _set_executed_bit(self):
  255. """Set task as executed to make sure it's not executed again."""
  256. if self.executed:
  257. raise AlreadyExecutedError(
  258. "Task %s[%s] has already been executed" % (
  259. self.task_name, self.task_id))
  260. self.executed = True
  261. def execute(self, loglevel=None, logfile=None):
  262. """Execute the task in a :class:`WorkerTaskTrace`.
  263. :keyword loglevel: The loglevel used by the task.
  264. :keyword logfile: The logfile used by the task.
  265. """
  266. if self.revoked():
  267. return
  268. # Make sure task has not already been executed.
  269. self._set_executed_bit()
  270. # acknowledge task as being processed.
  271. if not self.task.acks_late:
  272. self.acknowledge()
  273. tracer = WorkerTaskTrace(*self._get_tracer_args(loglevel, logfile),
  274. **{"hostname": self.hostname})
  275. retval = tracer.execute()
  276. self.acknowledge()
  277. return retval
  278. def send_event(self, type, **fields):
  279. if self.eventer:
  280. self.eventer.send(type, **fields)
  281. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  282. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  283. :param pool: A :class:`multiprocessing.Pool` instance.
  284. :keyword loglevel: The loglevel used by the task.
  285. :keyword logfile: The logfile used by the task.
  286. """
  287. if self.revoked():
  288. return
  289. # Make sure task has not already been executed.
  290. self._set_executed_bit()
  291. args = self._get_tracer_args(loglevel, logfile)
  292. self.time_start = time.time()
  293. result = pool.apply_async(execute_and_trace,
  294. args=args,
  295. kwargs={"hostname": self.hostname},
  296. accept_callback=self.on_accepted,
  297. timeout_callback=self.on_timeout,
  298. callbacks=[self.on_success],
  299. errbacks=[self.on_failure])
  300. return result
  301. def on_accepted(self):
  302. state.task_accepted(self)
  303. if not self.task.acks_late:
  304. self.acknowledge()
  305. self.send_event("task-started", uuid=self.task_id)
  306. self.logger.debug("Task accepted: %s[%s]" % (
  307. self.task_name, self.task_id))
  308. def on_timeout(self, soft):
  309. state.task_ready(self)
  310. if soft:
  311. self.logger.warning("Soft time limit exceeded for %s[%s]" % (
  312. self.task_name, self.task_id))
  313. exc = SoftTimeLimitExceeded()
  314. else:
  315. self.logger.error("Hard time limit exceeded for %s[%s]" % (
  316. self.task_name, self.task_id))
  317. exc = TimeLimitExceeded()
  318. if self._store_errors:
  319. self.task.backend.mark_as_failure(self.task_id, exc)
  320. def acknowledge(self):
  321. if not self.acknowledged:
  322. self.on_ack()
  323. self.acknowledged = True
  324. def on_success(self, ret_value):
  325. """The handler used if the task was successfully processed (
  326. without raising an exception)."""
  327. state.task_ready(self)
  328. if self.task.acks_late:
  329. self.acknowledge()
  330. runtime = time.time() - self.time_start
  331. self.send_event("task-succeeded", uuid=self.task_id,
  332. result=repr(ret_value), runtime=runtime)
  333. msg = self.success_msg.strip() % {
  334. "id": self.task_id,
  335. "name": self.task_name,
  336. "return_value": self.repr_result(ret_value)}
  337. self.logger.info(msg)
  338. def repr_result(self, result, maxlen=46):
  339. # 46 is the length needed to fit
  340. # "the quick brown fox jumps over the lazy dog" :)
  341. return truncate_text(repr(result), maxlen)
  342. def on_retry(self, exc_info):
  343. self.send_event("task-retried", uuid=self.task_id,
  344. exception=repr(exc_info.exception.exc),
  345. traceback=repr(exc_info.traceback))
  346. msg = self.retry_msg.strip() % {
  347. "id": self.task_id,
  348. "name": self.task_name,
  349. "exc": repr(exc_info.exception.exc)}
  350. self.logger.info(msg)
  351. def on_failure(self, exc_info):
  352. """The handler used if the task raised an exception."""
  353. state.task_ready(self)
  354. if self.task.acks_late:
  355. self.acknowledge()
  356. if isinstance(exc_info.exception, RetryTaskError):
  357. return self.on_retry(exc_info)
  358. # This is a special case as the process would not have had
  359. # time to write the result.
  360. if isinstance(exc_info.exception, WorkerLostError):
  361. if self._store_errors:
  362. self.task.backend.mark_as_failure(self.task_id,
  363. exc_info.exception)
  364. self.send_event("task-failed", uuid=self.task_id,
  365. exception=repr(exc_info.exception),
  366. traceback=exc_info.traceback)
  367. context = {"hostname": self.hostname,
  368. "id": self.task_id,
  369. "name": self.task_name,
  370. "exc": repr(exc_info.exception),
  371. "traceback": unicode(exc_info.traceback, 'utf-8'),
  372. "args": self.args,
  373. "kwargs": self.kwargs}
  374. log_with_extra(self.logger, logging.ERROR,
  375. self.error_msg.strip() % context,
  376. exc_info=exc_info,
  377. extra={"data": {"hostname": self.hostname,
  378. "id": self.task_id,
  379. "name": self.task_name}})
  380. task_obj = tasks.get(self.task_name, object)
  381. self.send_error_email(task_obj, context, exc_info.exception,
  382. enabled=task_obj.send_error_emails,
  383. whitelist=task_obj.error_whitelist)
  384. def send_error_email(self, task, context, exc,
  385. whitelist=None, enabled=False, fail_silently=True):
  386. if enabled and not task.disable_error_emails:
  387. if whitelist:
  388. if not isinstance(exc, tuple(whitelist)):
  389. return
  390. subject = self.email_subject.strip() % context
  391. body = self.email_body.strip() % context
  392. return mail_admins(subject, body, fail_silently=fail_silently)
  393. def __repr__(self):
  394. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  395. self.__class__.__name__,
  396. self.task_name, self.task_id,
  397. self.args, self.kwargs)
  398. def info(self, safe=False):
  399. args = self.args
  400. kwargs = self.kwargs
  401. if not safe:
  402. args = repr(args)
  403. kwargs = repr(self.kwargs)
  404. return {"id": self.task_id,
  405. "name": self.task_name,
  406. "args": args,
  407. "kwargs": kwargs,
  408. "hostname": self.hostname,
  409. "time_start": self.time_start,
  410. "acknowledged": self.acknowledged,
  411. "delivery_info": self.delivery_info}
  412. def shortinfo(self):
  413. return "%s[%s]%s%s" % (
  414. self.task_name,
  415. self.task_id,
  416. self.eta and " eta:[%s]" % (self.eta, ) or "",
  417. self.expires and " expires:[%s]" % (self.expires, ) or "")