worker.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. """celery.worker"""
  2. from carrot.connection import DjangoAMQPConnection
  3. from celery.messaging import TaskConsumer
  4. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  5. from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  6. from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
  7. from celery.log import setup_logger
  8. from celery.registry import tasks
  9. from celery.pool import TaskPool
  10. from celery.datastructures import ExceptionInfo
  11. from celery.models import PeriodicTaskMeta
  12. from celery.backends import default_backend, default_periodic_status_backend
  13. from celery.timer import EventTimer
  14. from django.core.mail import mail_admins
  15. import multiprocessing
  16. import simplejson
  17. import traceback
  18. import logging
  19. import time
  20. import sys
  21. class EmptyQueue(Exception):
  22. """The message queue is currently empty."""
  23. class UnknownTask(Exception):
  24. """Got an unknown task in the queue. The message is requeued and
  25. ignored."""
  26. def jail(task_id, func, args, kwargs):
  27. """Wraps the task in a jail, which catches all exceptions, and
  28. saves the status and result of the task execution to the task
  29. meta backend.
  30. If the call was successful, it saves the result to the task result
  31. backend, and sets the task status to ``"DONE"``.
  32. If the call results in an exception, it saves the exception as the task
  33. result, and sets the task status to ``"FAILURE"``.
  34. :param task_id: The id of the task.
  35. :param func: Callable object to execute.
  36. :param args: List of positional args to pass on to the function.
  37. :param kwargs: Keyword arguments mapping to pass on to the function.
  38. :returns: the function return value on success, or
  39. the exception instance on failure.
  40. """
  41. # Convert any unicode keys in the keyword arguments to ascii.
  42. kwargs = dict([(k.encode("utf-8"), v)
  43. for k, v in kwargs.items()])
  44. try:
  45. result = func(*args, **kwargs)
  46. except Exception, exc:
  47. default_backend.mark_as_failure(task_id, exc)
  48. return ExceptionInfo(sys.exc_info())
  49. else:
  50. default_backend.mark_as_done(task_id, result)
  51. return result
  52. class TaskWrapper(object):
  53. """Class wrapping a task to be run.
  54. :param task_name: see :attr:`task_name`.
  55. :param task_id: see :attr:`task_id`.
  56. :param task_func: see :attr:`task_func`
  57. :param args: see :attr:`args`
  58. :param kwargs: see :attr:`kwargs`.
  59. .. attribute:: task_name
  60. Kind of task. Must be a name registered in the task registry.
  61. .. attribute:: task_id
  62. UUID of the task.
  63. .. attribute:: task_func
  64. The tasks callable object.
  65. .. attribute:: args
  66. List of positional arguments to apply to the task.
  67. .. attribute:: kwargs
  68. Mapping of keyword arguments to apply to the task.
  69. """
  70. done_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  71. def __init__(self, task_name, task_id, task_func, args, kwargs, **opts):
  72. self.task_name = task_name
  73. self.task_id = task_id
  74. self.task_func = task_func
  75. self.args = args
  76. self.kwargs = kwargs
  77. self.done_msg = opts.get("done_msg", self.done_msg)
  78. self.logger = opts.get("logger", multiprocessing.get_logger())
  79. def __repr__(self):
  80. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  81. self.__class__.__name__,
  82. self.task_name, self.task_id,
  83. self.args, self.kwargs)
  84. @classmethod
  85. def from_message(cls, message, logger):
  86. """Create a :class:`TaskWrapper` from a task message sent by
  87. :class:`celery.messaging.TaskPublisher`.
  88. :raises UnknownTask: if the message does not describe a task,
  89. the message is also rejected.
  90. :returns: :class:`TaskWrapper` instance.
  91. """
  92. message_data = message.decode()
  93. task_name = message_data["task"]
  94. task_id = message_data["id"]
  95. args = message_data["args"]
  96. kwargs = message_data["kwargs"]
  97. if task_name not in tasks:
  98. raise UnknownTask(task_name)
  99. task_func = tasks[task_name]
  100. return cls(task_name, task_id, task_func, args, kwargs, logger=logger)
  101. def extend_with_default_kwargs(self, loglevel, logfile):
  102. """Extend the tasks keyword arguments with standard task arguments.
  103. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  104. """
  105. task_func_kwargs = {"logfile": logfile,
  106. "loglevel": loglevel,
  107. "task_id": self.task_id,
  108. "task_name": self.task_name}
  109. task_func_kwargs.update(self.kwargs)
  110. return task_func_kwargs
  111. def execute(self, loglevel=None, logfile=None):
  112. """Execute the task in a :func:`jail` and store return value
  113. and status in the task meta backend.
  114. :keyword loglevel: The loglevel used by the task.
  115. :keyword logfile: The logfile used by the task.
  116. """
  117. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  118. return jail(self.task_id, [
  119. self.task_func, self.args, task_func_kwargs])
  120. def on_success(self, ret_value, meta):
  121. task_id = meta.get("task_id")
  122. task_name = meta.get("task_name")
  123. msg = self.done_msg % {
  124. "id": task_id,
  125. "name": task_name,
  126. "return_value": ret_value}
  127. self.logger.info(msg)
  128. def on_failure(self, ret_value, meta):
  129. task_id = meta.get("task_id")
  130. task_name = meta.get("task_name")
  131. msg = self.done_msg % {
  132. "id": task_id,
  133. "name": task_name,
  134. "return_value": ret_value}
  135. self.logger.error(msg)
  136. if SEND_CELERY_TASK_ERROR_EMAILS:
  137. mail_admins(msg, ret_value.traceback, fail_silently=True)
  138. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  139. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  140. :param pool: A :class:`multiprocessing.Pool` instance.
  141. :keyword loglevel: The loglevel used by the task.
  142. :keyword logfile: The logfile used by the task.
  143. :returns :class:`multiprocessing.AsyncResult` instance.
  144. """
  145. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  146. jail_args = [self.task_id, self.task_func,
  147. self.args, task_func_kwargs]
  148. return pool.apply_async(jail, args=jail_args,
  149. callbacks=[self.on_success], errbacks=[self.on_failure],
  150. meta={"task_id": self.task_id, "task_name": self.task_name})
  151. class WorkController(object):
  152. """Executes tasks waiting in the task queue.
  153. :param concurrency: see :attr:`concurrency`.
  154. :param logfile: see :attr:`logfile`.
  155. :param loglevel: see :attr:`loglevel`.
  156. :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
  157. .. attribute:: concurrency
  158. The number of simultaneous processes doing work (default:
  159. :const:`celery.conf.DAEMON_CONCURRENCY`)
  160. .. attribute:: loglevel
  161. The loglevel used (default: :const:`logging.INFO`)
  162. .. attribute:: logfile
  163. The logfile used, if no logfile is specified it uses ``stderr``
  164. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  165. .. attribute:: queue_wakeup_after
  166. The time it takes for the daemon to wake up after the queue is empty,
  167. so it can check for more work
  168. (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
  169. .. attribute:: empty_msg_emit_every
  170. How often the daemon emits the ``"Waiting for queue..."`` message.
  171. If this is ``None``, the message will never be logged.
  172. (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
  173. .. attribute:: logger
  174. The :class:`logging.Logger` instance used for logging.
  175. .. attribute:: pool
  176. The :class:`multiprocessing.Pool` instance used.
  177. .. attribute:: task_consumer
  178. The :class:`celery.messaging.TaskConsumer` instance used.
  179. """
  180. loglevel = logging.ERROR
  181. concurrency = DAEMON_CONCURRENCY
  182. logfile = DAEMON_LOG_FILE
  183. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  184. empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
  185. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  186. queue_wakeup_after=None, is_detached=False):
  187. self.loglevel = loglevel or self.loglevel
  188. self.concurrency = concurrency or self.concurrency
  189. self.logfile = logfile or self.logfile
  190. self.queue_wakeup_after = queue_wakeup_after or \
  191. self.queue_wakeup_after
  192. self.logger = setup_logger(loglevel, logfile)
  193. self.pool = TaskPool(self.concurrency, logger=self.logger)
  194. self.task_consumer = None
  195. self.is_detached = is_detached
  196. self.reset_connection()
  197. def reset_connection(self):
  198. """Reset the AMQP connection, and reinitialize the
  199. :class:`celery.messaging.TaskConsumer` instance.
  200. Resets the task consumer in :attr:`task_consumer`.
  201. """
  202. if self.task_consumer:
  203. self.task_consumer.connection.close()
  204. amqp_connection = DjangoAMQPConnection()
  205. self.task_consumer = TaskConsumer(connection=amqp_connection)
  206. def connection_diagnostics(self):
  207. """Diagnose the AMQP connection, and reset connection if
  208. necessary."""
  209. if hasattr(self.task_consumer, "backend"):
  210. connection = self.task_consumer.backend.channel.connection
  211. else:
  212. connection = self.task_consumer.channel.connection
  213. if not connection:
  214. self.logger.info(
  215. "AMQP Connection has died, restoring connection.")
  216. self.reset_connection()
  217. def receive_message(self):
  218. """Receive the next message from the message broker.
  219. Tries to reset the AMQP connection if not available.
  220. Returns ``None`` if no message is waiting on the queue.
  221. :rtype: :class:`carrot.messaging.Message` instance.
  222. """
  223. #self.connection_diagnostics()
  224. self.logger.debug("Trying to fetch message from broker...")
  225. message = self.task_consumer.fetch()
  226. if message is not None:
  227. self.logger.debug("Acknowledging message with delivery tag %s" % (
  228. message.delivery_tag))
  229. return message
  230. def fetch_next_task(self):
  231. """Fetch the next task from the AMQP broker.
  232. Raises :exc:`EmptyQueue` exception if there is no message
  233. waiting on the queue.
  234. :returns: :class:`TaskWrapper` instance.
  235. """
  236. message = self.receive_message()
  237. if message is None: # No messages waiting.
  238. raise EmptyQueue()
  239. task = TaskWrapper.from_message(message, logger=self.logger)
  240. self.logger.info("Got task from broker: %s[%s]" % (
  241. task.task_name, task.task_id))
  242. return task, message
  243. def execute_next_task(self):
  244. """Execute the next task on the queue using the multiprocessing pool.
  245. Catches all exceptions and logs them with level
  246. :const:`logging.CRITICAL`.
  247. """
  248. self.logger.debug("Trying to fetch a task.")
  249. task, message = self.fetch_next_task()
  250. self.logger.debug("Got a task: %s. Trying to execute it..." % task)
  251. result = task.execute_using_pool(self.pool, self.loglevel,
  252. self.logfile)
  253. self.logger.debug("Task %s has been executed asynchronously." % task)
  254. return result, task.task_name, task.task_id
  255. def run_periodic_tasks(self):
  256. """Schedule all waiting periodic tasks for execution.
  257. """
  258. self.logger.debug("Looking for periodic tasks ready for execution...")
  259. default_periodic_status_backend.run_periodic_tasks()
  260. def schedule_retry_tasks(self):
  261. """Reschedule all requeued tasks waiting for retry."""
  262. pass
  263. def run(self):
  264. """Starts the workers main loop."""
  265. log_wait = lambda: self.logger.info("Waiting for queue...")
  266. ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
  267. events = [
  268. EventTimer(self.run_periodic_tasks, 1),
  269. EventTimer(self.schedule_retry_tasks, 2),
  270. ]
  271. self.pool.run()
  272. # If not running as daemon, and DEBUG logging level is enabled,
  273. # print pool PIDs and sleep for a second before we start.
  274. if self.logger.isEnabledFor(logging.DEBUG):
  275. self.logger.debug("Pool child processes: [%s]" % (
  276. "|".join(map(str, self.pool.get_worker_pids()))))
  277. if not self.is_detached:
  278. time.sleep(1)
  279. while True:
  280. [event.tick() for event in events]
  281. try:
  282. result, task_name, task_id = self.execute_next_task()
  283. except ValueError:
  284. # execute_next_task didn't return a r/name/id tuple,
  285. # probably because it got an exception.
  286. continue
  287. except EmptyQueue:
  288. ev_msg_waiting.tick()
  289. time.sleep(self.queue_wakeup_after)
  290. continue
  291. except UnknownTask, e:
  292. self.logger.info("Unknown task ignored: %s" % (e))
  293. continue
  294. except Exception, e:
  295. self.logger.critical("Message queue raised %s: %s\n%s" % (
  296. e.__class__, e, traceback.format_exc()))
  297. continue