worker.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. """celery.worker"""
  2. from carrot.connection import DjangoAMQPConnection
  3. from celery.messaging import TaskConsumer
  4. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  5. from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  6. from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
  7. from celery.log import setup_logger
  8. from celery.registry import tasks
  9. from celery.pool import TaskPool
  10. from celery.datastructures import ExceptionInfo
  11. from celery.backends import default_backend, default_periodic_status_backend
  12. from celery.timer import EventTimer
  13. from django.core.mail import mail_admins
  14. import multiprocessing
  15. import traceback
  16. import threading
  17. import logging
  18. import socket
  19. import time
  20. import sys
  21. # pep8.py borks on a inline signature separator and
  22. # says "trailing whitespace" ;)
  23. EMAIL_SIGNATURE_SEP = "-- "
  24. TASK_FAIL_EMAIL_BODY = """
  25. Task %%(name)s with id %%(id)s raised exception: %%(exc)s
  26. The contents of the full traceback was:
  27. %%(traceback)s
  28. %%(EMAIL_SIGNATURE_SEP)s
  29. Just thought I'd let you know!
  30. celeryd at %%(hostname)s.
  31. """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
  32. class EmptyQueue(Exception):
  33. """The message queue is currently empty."""
  34. class UnknownTask(Exception):
  35. """Got an unknown task in the queue. The message is requeued and
  36. ignored."""
  37. def jail(task_id, func, args, kwargs):
  38. """Wraps the task in a jail, which catches all exceptions, and
  39. saves the status and result of the task execution to the task
  40. meta backend.
  41. If the call was successful, it saves the result to the task result
  42. backend, and sets the task status to ``"DONE"``.
  43. If the call results in an exception, it saves the exception as the task
  44. result, and sets the task status to ``"FAILURE"``.
  45. :param task_id: The id of the task.
  46. :param func: Callable object to execute.
  47. :param args: List of positional args to pass on to the function.
  48. :param kwargs: Keyword arguments mapping to pass on to the function.
  49. :returns: the function return value on success, or
  50. the exception instance on failure.
  51. """
  52. # See: http://groups.google.com/group/django-users/browse_thread/
  53. # thread/78200863d0c07c6d/38402e76cf3233e8?hl=en&lnk=gst&
  54. # q=multiprocessing#38402e76cf3233e8
  55. from django.db import connection
  56. connection.close()
  57. # Reset cache connection only if using memcached/libmemcached
  58. from django.core import cache
  59. # XXX At Opera we use a custom memcached backend that uses libmemcached
  60. # instead of libmemcache (cmemcache). Should find a better solution for
  61. # this, but for now "memcached" should probably be unique enough of a
  62. # string to not make problems.
  63. cache_backend = cache.settings.CACHE_BACKEND
  64. if hasattr(cache, "parse_backend_uri"):
  65. cache_scheme = cache.parse_backend_uri(cache_backend)[0]
  66. else:
  67. # Django <= 1.0.2
  68. cache_scheme = cache_backend.split(":", 1)[0]
  69. if "memcached" in cache_scheme:
  70. cache.cache.close()
  71. # Backend process cleanup
  72. default_backend.process_cleanup()
  73. # Convert any unicode keys in the keyword arguments to ascii.
  74. kwargs = dict([(k.encode("utf-8"), v)
  75. for k, v in kwargs.items()])
  76. try:
  77. result = func(*args, **kwargs)
  78. except (SystemExit, KeyboardInterrupt):
  79. raise
  80. except Exception, exc:
  81. default_backend.mark_as_failure(task_id, exc)
  82. return ExceptionInfo(sys.exc_info())
  83. else:
  84. default_backend.mark_as_done(task_id, result)
  85. return result
  86. class TaskWrapper(object):
  87. """Class wrapping a task to be run.
  88. :param task_name: see :attr:`task_name`.
  89. :param task_id: see :attr:`task_id`.
  90. :param task_func: see :attr:`task_func`
  91. :param args: see :attr:`args`
  92. :param kwargs: see :attr:`kwargs`.
  93. .. attribute:: task_name
  94. Kind of task. Must be a name registered in the task registry.
  95. .. attribute:: task_id
  96. UUID of the task.
  97. .. attribute:: task_func
  98. The tasks callable object.
  99. .. attribute:: args
  100. List of positional arguments to apply to the task.
  101. .. attribute:: kwargs
  102. Mapping of keyword arguments to apply to the task.
  103. """
  104. success_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  105. fail_msg = """
  106. Task %(name)s[%(id)s] raised exception: %(exc)s\n%(traceback)s
  107. """
  108. fail_email_subject = """
  109. [celery@%(hostname)s] Error: Task %(name)s (%(id)s): %(exc)s
  110. """
  111. fail_email_body = TASK_FAIL_EMAIL_BODY
  112. def __init__(self, task_name, task_id, task_func, args, kwargs, **opts):
  113. self.task_name = task_name
  114. self.task_id = task_id
  115. self.task_func = task_func
  116. self.args = args
  117. self.kwargs = kwargs
  118. self.logger = kwargs.get("logger")
  119. for opt in ("success_msg", "fail_msg", "fail_email_subject",
  120. "fail_email_body"):
  121. setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
  122. if not self.logger:
  123. self.logger = multiprocessing.get_logger()
  124. def __repr__(self):
  125. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  126. self.__class__.__name__,
  127. self.task_name, self.task_id,
  128. self.args, self.kwargs)
  129. @classmethod
  130. def from_message(cls, message, logger):
  131. """Create a :class:`TaskWrapper` from a task message sent by
  132. :class:`celery.messaging.TaskPublisher`.
  133. :raises UnknownTask: if the message does not describe a task,
  134. the message is also rejected.
  135. :returns: :class:`TaskWrapper` instance.
  136. """
  137. message_data = message.decode()
  138. task_name = message_data["task"]
  139. task_id = message_data["id"]
  140. args = message_data["args"]
  141. kwargs = message_data["kwargs"]
  142. if task_name not in tasks:
  143. raise UnknownTask(task_name)
  144. task_func = tasks[task_name]
  145. return cls(task_name, task_id, task_func, args, kwargs, logger=logger)
  146. def extend_with_default_kwargs(self, loglevel, logfile):
  147. """Extend the tasks keyword arguments with standard task arguments.
  148. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  149. """
  150. task_func_kwargs = {"logfile": logfile,
  151. "loglevel": loglevel,
  152. "task_id": self.task_id,
  153. "task_name": self.task_name}
  154. task_func_kwargs.update(self.kwargs)
  155. return task_func_kwargs
  156. def execute(self, loglevel=None, logfile=None):
  157. """Execute the task in a :func:`jail` and store return value
  158. and status in the task meta backend.
  159. :keyword loglevel: The loglevel used by the task.
  160. :keyword logfile: The logfile used by the task.
  161. """
  162. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  163. return jail(self.task_id, [
  164. self.task_func, self.args, task_func_kwargs])
  165. def on_success(self, ret_value, meta):
  166. """The handler used if the task was successfully processed (
  167. without raising an exception)."""
  168. task_id = meta.get("task_id")
  169. task_name = meta.get("task_name")
  170. msg = self.success_msg.strip() % {
  171. "id": task_id,
  172. "name": task_name,
  173. "return_value": ret_value}
  174. self.logger.info(msg)
  175. def on_failure(self, exc_info, meta):
  176. """The handler used if the task raised an exception."""
  177. task_id = meta.get("task_id")
  178. task_name = meta.get("task_name")
  179. context = {
  180. "hostname": socket.gethostname(),
  181. "id": task_id,
  182. "name": task_name,
  183. "exc": exc_info.exception,
  184. "traceback": exc_info.traceback,
  185. }
  186. self.logger.error(self.fail_msg.strip() % context)
  187. if SEND_CELERY_TASK_ERROR_EMAILS:
  188. subject = self.fail_email_subject.strip() % context
  189. body = self.fail_email_body.strip() % context
  190. mail_admins(subject, body, fail_silently=True)
  191. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  192. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  193. :param pool: A :class:`multiprocessing.Pool` instance.
  194. :keyword loglevel: The loglevel used by the task.
  195. :keyword logfile: The logfile used by the task.
  196. :returns :class:`multiprocessing.AsyncResult` instance.
  197. """
  198. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  199. jail_args = [self.task_id, self.task_func,
  200. self.args, task_func_kwargs]
  201. return pool.apply_async(jail, args=jail_args,
  202. callbacks=[self.on_success], errbacks=[self.on_failure],
  203. meta={"task_id": self.task_id, "task_name": self.task_name})
  204. class PeriodicWorkController(threading.Thread):
  205. """A thread that continuously checks if there are
  206. :class:`celery.task.PeriodicTask`s waiting for execution, and executes
  207. them.
  208. Example
  209. >>> PeriodicWorkController().start()
  210. """
  211. def run(self):
  212. """Don't use :meth:`run`. use :meth:`start`."""
  213. while True:
  214. default_periodic_status_backend.run_periodic_tasks()
  215. time.sleep(1)
  216. class WorkController(object):
  217. """Executes tasks waiting in the task queue.
  218. :param concurrency: see :attr:`concurrency`.
  219. :param logfile: see :attr:`logfile`.
  220. :param loglevel: see :attr:`loglevel`.
  221. :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
  222. .. attribute:: concurrency
  223. The number of simultaneous processes doing work (default:
  224. :const:`celery.conf.DAEMON_CONCURRENCY`)
  225. .. attribute:: loglevel
  226. The loglevel used (default: :const:`logging.INFO`)
  227. .. attribute:: logfile
  228. The logfile used, if no logfile is specified it uses ``stderr``
  229. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  230. .. attribute:: queue_wakeup_after
  231. The time it takes for the daemon to wake up after the queue is empty,
  232. so it can check for more work
  233. (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
  234. .. attribute:: empty_msg_emit_every
  235. How often the daemon emits the ``"Waiting for queue..."`` message.
  236. If this is ``None``, the message will never be logged.
  237. (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
  238. .. attribute:: logger
  239. The :class:`logging.Logger` instance used for logging.
  240. .. attribute:: pool
  241. The :class:`multiprocessing.Pool` instance used.
  242. .. attribute:: task_consumer
  243. The :class:`celery.messaging.TaskConsumer` instance used.
  244. """
  245. loglevel = logging.ERROR
  246. concurrency = DAEMON_CONCURRENCY
  247. logfile = DAEMON_LOG_FILE
  248. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  249. empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
  250. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  251. queue_wakeup_after=None, is_detached=False):
  252. self.loglevel = loglevel or self.loglevel
  253. self.concurrency = concurrency or self.concurrency
  254. self.logfile = logfile or self.logfile
  255. self.queue_wakeup_after = queue_wakeup_after or \
  256. self.queue_wakeup_after
  257. self.logger = setup_logger(loglevel, logfile)
  258. self.pool = TaskPool(self.concurrency, logger=self.logger)
  259. self.task_consumer = None
  260. self.task_consumer_it = None
  261. self.is_detached = is_detached
  262. self.reset_connection()
  263. def reset_connection(self):
  264. """Reset the AMQP connection, and reinitialize the
  265. :class:`celery.messaging.TaskConsumer` instance.
  266. Resets the task consumer in :attr:`task_consumer`.
  267. """
  268. if self.task_consumer:
  269. self.task_consumer.connection.close()
  270. amqp_connection = DjangoAMQPConnection()
  271. self.task_consumer = TaskConsumer(connection=amqp_connection)
  272. self.task_consumer_it = self.task_consumer.iterqueue(infinite=True)
  273. def connection_diagnostics(self):
  274. """Diagnose the AMQP connection, and reset connection if
  275. necessary."""
  276. connection = self.task_consumer.backend.channel.connection
  277. if not connection:
  278. self.logger.info(
  279. "AMQP Connection has died, restoring connection.")
  280. self.reset_connection()
  281. def receive_message(self):
  282. """Receive the next message from the message broker.
  283. Tries to reset the AMQP connection if not available.
  284. Returns ``None`` if no message is waiting on the queue.
  285. :rtype: :class:`carrot.messaging.Message` instance.
  286. """
  287. message = self.task_consumer_it.next()
  288. if not message:
  289. raise EmptyQueue()
  290. return message
  291. def process_task(self, message):
  292. """Process task message by passing it to the pool of workers."""
  293. task = TaskWrapper.from_message(message, logger=self.logger)
  294. self.logger.info("Got task from broker: %s[%s]" % (
  295. task.task_name, task.task_id))
  296. self.logger.debug("Got a task: %s. Trying to execute it..." % task)
  297. result = task.execute_using_pool(self.pool, self.loglevel,
  298. self.logfile)
  299. self.logger.debug("Task %s has been executed asynchronously." % task)
  300. return result
  301. def execute_next_task(self):
  302. """Execute the next task on the queue using the multiprocessing pool.
  303. Catches all exceptions and logs them with level
  304. :const:`logging.CRITICAL`.
  305. Raises :exc:`EmptyQueue` exception if there is no message
  306. waiting on the queue.
  307. """
  308. self.process_task(self.receive_message())
  309. def schedule_retry_tasks(self):
  310. """Reschedule all requeued tasks waiting for retry."""
  311. pass
  312. def run(self):
  313. """Starts the workers main loop."""
  314. log_wait = lambda: self.logger.info("Waiting for queue...")
  315. ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
  316. self.pool.run()
  317. PeriodicWorkController().start()
  318. # If not running as daemon, and DEBUG logging level is enabled,
  319. # print pool PIDs and sleep for a second before we start.
  320. if self.logger.isEnabledFor(logging.DEBUG):
  321. self.logger.debug("Pool child processes: [%s]" % (
  322. "|".join(map(str, self.pool.get_worker_pids()))))
  323. if not self.is_detached:
  324. time.sleep(1)
  325. while True:
  326. try:
  327. self.execute_next_task()
  328. except ValueError:
  329. # execute_next_task didn't return a r/name/id tuple,
  330. # probably because it got an exception.
  331. continue
  332. except EmptyQueue:
  333. ev_msg_waiting.tick()
  334. time.sleep(self.queue_wakeup_after)
  335. continue
  336. except UnknownTask, exc:
  337. self.logger.info("Unknown task ignored: %s" % (exc))
  338. continue
  339. except Exception, exc:
  340. self.logger.critical("Message queue raised %s: %s\n%s" % (
  341. exc.__class__, exc, traceback.format_exc()))
  342. continue
  343. except:
  344. self.pool.terminate()
  345. raise