worker.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. """celery.worker"""
  2. from carrot.connection import DjangoAMQPConnection
  3. from celery.messaging import TaskConsumerSet
  4. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  5. from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
  6. from celery.log import setup_logger
  7. from celery.registry import tasks
  8. from celery.pool import TaskPool
  9. from celery.datastructures import ExceptionInfo
  10. from celery.backends import default_backend, default_periodic_status_backend
  11. from celery.timer import EventTimer
  12. from django.core.mail import mail_admins
  13. from celery.monitoring import TaskTimerStats
  14. import multiprocessing
  15. import traceback
  16. import threading
  17. import logging
  18. import signal
  19. import socket
  20. import time
  21. import sys
  22. # pep8.py borks on a inline signature separator and
  23. # says "trailing whitespace" ;)
  24. EMAIL_SIGNATURE_SEP = "-- "
  25. TASK_FAIL_EMAIL_BODY = """
  26. Task %%(name)s with id %%(id)s raised exception: %%(exc)s
  27. The contents of the full traceback was:
  28. %%(traceback)s
  29. %(EMAIL_SIGNATURE_SEP)s
  30. Just thought I'd let you know!
  31. celeryd at %%(hostname)s.
  32. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  33. class UnknownTask(Exception):
  34. """Got an unknown task in the queue. The message is requeued and
  35. ignored."""
  36. def jail(task_id, task_name, func, args, kwargs):
  37. """Wraps the task in a jail, which catches all exceptions, and
  38. saves the status and result of the task execution to the task
  39. meta backend.
  40. If the call was successful, it saves the result to the task result
  41. backend, and sets the task status to ``"DONE"``.
  42. If the call results in an exception, it saves the exception as the task
  43. result, and sets the task status to ``"FAILURE"``.
  44. :param task_id: The id of the task.
  45. :param task_name: The name of the task.
  46. :param func: Callable object to execute.
  47. :param args: List of positional args to pass on to the function.
  48. :param kwargs: Keyword arguments mapping to pass on to the function.
  49. :returns: the function return value on success, or
  50. the exception instance on failure.
  51. """
  52. timer_stat = TaskTimerStats.start(task_id, task_name, args, kwargs)
  53. # See: http://groups.google.com/group/django-users/browse_thread/
  54. # thread/78200863d0c07c6d/38402e76cf3233e8?hl=en&lnk=gst&
  55. # q=multiprocessing#38402e76cf3233e8
  56. from django.db import connection
  57. connection.close()
  58. # Reset cache connection only if using memcached/libmemcached
  59. from django.core import cache
  60. # XXX At Opera we use a custom memcached backend that uses libmemcached
  61. # instead of libmemcache (cmemcache). Should find a better solution for
  62. # this, but for now "memcached" should probably be unique enough of a
  63. # string to not make problems.
  64. cache_backend = cache.settings.CACHE_BACKEND
  65. if hasattr(cache, "parse_backend_uri"):
  66. cache_scheme = cache.parse_backend_uri(cache_backend)[0]
  67. else:
  68. # Django <= 1.0.2
  69. cache_scheme = cache_backend.split(":", 1)[0]
  70. if "memcached" in cache_scheme:
  71. cache.cache.close()
  72. # Backend process cleanup
  73. default_backend.process_cleanup()
  74. try:
  75. result = func(*args, **kwargs)
  76. except (SystemExit, KeyboardInterrupt):
  77. raise
  78. except Exception, exc:
  79. default_backend.mark_as_failure(task_id, exc)
  80. retval = ExceptionInfo(sys.exc_info())
  81. else:
  82. default_backend.mark_as_done(task_id, result)
  83. retval = result
  84. finally:
  85. timer_stat.stop()
  86. return retval
  87. class TaskWrapper(object):
  88. """Class wrapping a task to be run.
  89. :param task_name: see :attr:`task_name`.
  90. :param task_id: see :attr:`task_id`.
  91. :param task_func: see :attr:`task_func`
  92. :param args: see :attr:`args`
  93. :param kwargs: see :attr:`kwargs`.
  94. .. attribute:: task_name
  95. Kind of task. Must be a name registered in the task registry.
  96. .. attribute:: task_id
  97. UUID of the task.
  98. .. attribute:: task_func
  99. The tasks callable object.
  100. .. attribute:: args
  101. List of positional arguments to apply to the task.
  102. .. attribute:: kwargs
  103. Mapping of keyword arguments to apply to the task.
  104. .. attribute:: message
  105. The original message sent. Used for acknowledging the message.
  106. """
  107. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  108. fail_msg = """
  109. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  110. """
  111. fail_email_subject = """
  112. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  113. """
  114. fail_email_body = TASK_FAIL_EMAIL_BODY
  115. def __init__(self, task_name, task_id, task_func, args, kwargs,
  116. on_acknowledge=None, **opts):
  117. self.task_name = task_name
  118. self.task_id = task_id
  119. self.task_func = task_func
  120. self.args = args
  121. self.kwargs = kwargs
  122. self.logger = kwargs.get("logger")
  123. self.on_acknowledge = on_acknowledge
  124. for opt in ("success_msg", "fail_msg", "fail_email_subject",
  125. "fail_email_body"):
  126. setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
  127. if not self.logger:
  128. self.logger = multiprocessing.get_logger()
  129. def __repr__(self):
  130. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  131. self.__class__.__name__,
  132. self.task_name, self.task_id,
  133. self.args, self.kwargs)
  134. @classmethod
  135. def from_message(cls, message, message_data, logger):
  136. """Create a :class:`TaskWrapper` from a task message sent by
  137. :class:`celery.messaging.TaskPublisher`.
  138. :raises UnknownTask: if the message does not describe a task,
  139. the message is also rejected.
  140. :returns: :class:`TaskWrapper` instance.
  141. """
  142. task_name = message_data["task"]
  143. task_id = message_data["id"]
  144. args = message_data["args"]
  145. kwargs = message_data["kwargs"]
  146. # Convert any unicode keys in the keyword arguments to ascii.
  147. kwargs = dict([(key.encode("utf-8"), value)
  148. for key, value in kwargs.items()])
  149. if task_name not in tasks:
  150. raise UnknownTask(task_name)
  151. task_func = tasks[task_name]
  152. return cls(task_name, task_id, task_func, args, kwargs,
  153. on_acknowledge=message.ack, logger=logger)
  154. def extend_with_default_kwargs(self, loglevel, logfile):
  155. """Extend the tasks keyword arguments with standard task arguments.
  156. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  157. """
  158. task_func_kwargs = {"logfile": logfile,
  159. "loglevel": loglevel,
  160. "task_id": self.task_id,
  161. "task_name": self.task_name}
  162. task_func_kwargs.update(self.kwargs)
  163. return task_func_kwargs
  164. def execute(self, loglevel=None, logfile=None):
  165. """Execute the task in a :func:`jail` and store return value
  166. and status in the task meta backend.
  167. :keyword loglevel: The loglevel used by the task.
  168. :keyword logfile: The logfile used by the task.
  169. """
  170. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  171. if self.on_acknowledge:
  172. self.on_acknowledge()
  173. return jail(self.task_id, self.task_name, [
  174. self.task_func, self.args, task_func_kwargs])
  175. def on_success(self, ret_value, meta):
  176. """The handler used if the task was successfully processed (
  177. without raising an exception)."""
  178. task_id = meta.get("task_id")
  179. task_name = meta.get("task_name")
  180. msg = self.success_msg.strip() % {
  181. "id": task_id,
  182. "name": task_name,
  183. "return_value": ret_value}
  184. self.logger.info(msg)
  185. def on_failure(self, exc_info, meta):
  186. """The handler used if the task raised an exception."""
  187. task_id = meta.get("task_id")
  188. task_name = meta.get("task_name")
  189. context = {
  190. "hostname": socket.gethostname(),
  191. "id": task_id,
  192. "name": task_name,
  193. "exc": exc_info.exception,
  194. "traceback": exc_info.traceback,
  195. }
  196. self.logger.error(self.fail_msg.strip() % context)
  197. if SEND_CELERY_TASK_ERROR_EMAILS:
  198. subject = self.fail_email_subject.strip() % context
  199. body = self.fail_email_body.strip() % context
  200. mail_admins(subject, body, fail_silently=True)
  201. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  202. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  203. :param pool: A :class:`multiprocessing.Pool` instance.
  204. :keyword loglevel: The loglevel used by the task.
  205. :keyword logfile: The logfile used by the task.
  206. :returns :class:`multiprocessing.AsyncResult` instance.
  207. """
  208. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  209. jail_args = [self.task_id, self.task_name, self.task_func,
  210. self.args, task_func_kwargs]
  211. return pool.apply_async(jail, args=jail_args,
  212. callbacks=[self.on_success], errbacks=[self.on_failure],
  213. on_acknowledge=self.on_acknowledge,
  214. meta={"task_id": self.task_id, "task_name": self.task_name})
  215. class PeriodicWorkController(threading.Thread):
  216. """A thread that continuously checks if there are
  217. :class:`celery.task.PeriodicTask`s waiting for execution, and executes
  218. them.
  219. Example
  220. >>> PeriodicWorkController().start()
  221. """
  222. def __init__(self):
  223. super(PeriodicWorkController, self).__init__()
  224. self._shutdown = threading.Event()
  225. self._stopped = threading.Event()
  226. def run(self):
  227. """Don't use :meth:`run`. use :meth:`start`."""
  228. while True:
  229. if self._shutdown.isSet():
  230. break
  231. default_periodic_status_backend.run_periodic_tasks()
  232. time.sleep(1)
  233. self._stopped.set() # indicate that we are stopped
  234. def stop(self):
  235. """Shutdown the thread."""
  236. self._shutdown.set()
  237. self._stopped.wait() # block until this thread is done
  238. class WorkController(object):
  239. """Executes tasks waiting in the task queue.
  240. :param concurrency: see :attr:`concurrency`.
  241. :param logfile: see :attr:`logfile`.
  242. :param loglevel: see :attr:`loglevel`.
  243. .. attribute:: concurrency
  244. The number of simultaneous processes doing work (default:
  245. :const:`celery.conf.DAEMON_CONCURRENCY`)
  246. .. attribute:: loglevel
  247. The loglevel used (default: :const:`logging.INFO`)
  248. .. attribute:: logfile
  249. The logfile used, if no logfile is specified it uses ``stderr``
  250. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  251. .. attribute:: logger
  252. The :class:`logging.Logger` instance used for logging.
  253. .. attribute:: pool
  254. The :class:`multiprocessing.Pool` instance used.
  255. .. attribute:: task_consumer
  256. The :class:`celery.messaging.TaskConsumerSet` instance used.
  257. """
  258. loglevel = logging.ERROR
  259. concurrency = DAEMON_CONCURRENCY
  260. logfile = DAEMON_LOG_FILE
  261. _state = None
  262. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  263. is_detached=False):
  264. self.loglevel = loglevel or self.loglevel
  265. self.concurrency = concurrency or self.concurrency
  266. self.logfile = logfile or self.logfile
  267. self.logger = setup_logger(loglevel, logfile)
  268. self.pool = TaskPool(self.concurrency, logger=self.logger)
  269. self.periodicworkcontroller = PeriodicWorkController()
  270. self.is_detached = is_detached
  271. self.amqp_connection = None
  272. self.task_consumer = None
  273. def close_connection(self):
  274. """Close the AMQP connection."""
  275. if self.task_consumer:
  276. self.task_consumer.close()
  277. if self.amqp_connection:
  278. self.amqp_connection.close()
  279. def reset_connection(self):
  280. """Reset the AMQP connection, and reinitialize the
  281. :class:`celery.messaging.TaskConsumer` instance.
  282. Resets the task consumer in :attr:`task_consumer`.
  283. """
  284. self.close_connection()
  285. self.amqp_connection = DjangoAMQPConnection()
  286. self.task_consumer = TaskConsumerSet(connection=self.amqp_connection)
  287. self.task_consumer.register_callback(self._message_callback)
  288. return self.task_consumer
  289. def connection_diagnostics(self):
  290. """Diagnose the AMQP connection, and reset connection if
  291. necessary."""
  292. connection = self.task_consumer.backend.channel.connection
  293. if not connection:
  294. self.logger.info(
  295. "AMQP Connection has died, restoring connection.")
  296. self.reset_connection()
  297. def _message_callback(self, message_data, message):
  298. """The method called when we receive a message."""
  299. try:
  300. try:
  301. self.process_task(message_data, message)
  302. except ValueError:
  303. # execute_next_task didn't return a r/name/id tuple,
  304. # probably because it got an exception.
  305. pass
  306. except UnknownTask, exc:
  307. self.logger.info("Unknown task ignored: %s" % (exc))
  308. except Exception, exc:
  309. self.logger.critical("Message queue raised %s: %s\n%s" % (
  310. exc.__class__, exc, traceback.format_exc()))
  311. except (SystemExit, KeyboardInterrupt):
  312. self.shutdown()
  313. def process_task(self, message_data, message):
  314. """Process task message by passing it to the pool of workers."""
  315. task = TaskWrapper.from_message(message, message_data,
  316. logger=self.logger)
  317. self.logger.info("Got task from broker: %s[%s]" % (
  318. task.task_name, task.task_id))
  319. self.logger.debug("Got a task: %s. Trying to execute it..." % task)
  320. result = task.execute_using_pool(self.pool, self.loglevel,
  321. self.logfile)
  322. self.logger.debug("Task %s has been executed asynchronously." % task)
  323. return result
  324. def shutdown(self):
  325. """Make sure ``celeryd`` exits cleanly."""
  326. # shut down the periodic work controller thread
  327. if self._state != "RUN":
  328. return
  329. self._state = "TERMINATE"
  330. self.periodicworkcontroller.stop()
  331. self.pool.terminate()
  332. self.close_connection()
  333. def run(self):
  334. """Starts the workers main loop."""
  335. self._state = "RUN"
  336. task_consumer = self.reset_connection()
  337. it = task_consumer.iterconsume(limit=None)
  338. self.pool.run()
  339. self.periodicworkcontroller.start()
  340. # If not running as daemon, and DEBUG logging level is enabled,
  341. # print pool PIDs and sleep for a second before we start.
  342. if self.logger.isEnabledFor(logging.DEBUG):
  343. self.logger.debug("Pool child processes: [%s]" % (
  344. "|".join(map(str, self.pool.get_worker_pids()))))
  345. if not self.is_detached:
  346. time.sleep(1)
  347. try:
  348. while True:
  349. it.next()
  350. except (SystemExit, KeyboardInterrupt):
  351. self.shutdown()