worker.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. """celery.worker"""
  2. from carrot.connection import DjangoAMQPConnection
  3. from celery.messaging import TaskConsumer
  4. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  5. from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  6. from celery.log import setup_logger
  7. from celery.registry import tasks
  8. from celery.pool import TaskPool
  9. from celery.models import PeriodicTaskMeta
  10. from celery.backends import default_backend, default_periodic_status_backend
  11. from celery.timer import EventTimer
  12. import multiprocessing
  13. import simplejson
  14. import traceback
  15. import logging
  16. import time
  17. class EmptyQueue(Exception):
  18. """The message queue is currently empty."""
  19. class UnknownTask(Exception):
  20. """Got an unknown task in the queue. The message is requeued and
  21. ignored."""
  22. def jail(task_id, func, args, kwargs):
  23. """Wraps the task in a jail, which catches all exceptions, and
  24. saves the status and result of the task execution to the task
  25. meta backend.
  26. If the call was successful, it saves the result to the task result
  27. backend, and sets the task status to ``"DONE"``.
  28. If the call results in an exception, it saves the exception as the task
  29. result, and sets the task status to ``"FAILURE"``.
  30. :param task_id: The id of the task.
  31. :param func: Callable object to execute.
  32. :param args: List of positional args to pass on to the function.
  33. :param kwargs: Keyword arguments mapping to pass on to the function.
  34. :returns: the function return value on success, or
  35. the exception instance on failure.
  36. """
  37. # Convert any unicode keys in the keyword arguments to ascii.
  38. kwargs = dict([(k.encode("utf-8"), v)
  39. for k, v in kwargs.items()])
  40. try:
  41. result = func(*args, **kwargs)
  42. except Exception, exc:
  43. default_backend.mark_as_failure(task_id, exc)
  44. return exc
  45. else:
  46. default_backend.mark_as_done(task_id, result)
  47. return result
  48. class TaskWrapper(object):
  49. """Class wrapping a task to be run.
  50. :param task_name: see :attr:`task_name`.
  51. :param task_id: see :attr:`task_id`.
  52. :param task_func: see :attr:`task_func`
  53. :param args: see :attr:`args`
  54. :param kwargs: see :attr:`kwargs`.
  55. .. attribute:: task_name
  56. Kind of task. Must be a name registered in the task registry.
  57. .. attribute:: task_id
  58. UUID of the task.
  59. .. attribute:: task_func
  60. The tasks callable object.
  61. .. attribute:: args
  62. List of positional arguments to apply to the task.
  63. .. attribute:: kwargs
  64. Mapping of keyword arguments to apply to the task.
  65. """
  66. done_msg = "Task %(name)s[%(id)s] processed: %(return_value)s"
  67. def __init__(self, task_name, task_id, task_func, args, kwargs, **opts):
  68. self.task_name = task_name
  69. self.task_id = task_id
  70. self.task_func = task_func
  71. self.args = args
  72. self.kwargs = kwargs
  73. self.done_msg = opts.get("done_msg", self.done_msg)
  74. self.logger = opts.get("logger", multiprocessing.get_logger())
  75. def __repr__(self):
  76. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  77. self.__class__.__name__,
  78. self.task_name, self.task_id,
  79. self.args, self.kwargs)
  80. @classmethod
  81. def from_message(cls, message, logger):
  82. """Create a :class:`TaskWrapper` from a task message sent by
  83. :class:`celery.messaging.TaskPublisher`.
  84. :raises UnknownTask: if the message does not describe a task,
  85. the message is also rejected.
  86. :returns: :class:`TaskWrapper` instance.
  87. """
  88. message_data = message.decode()
  89. task_name = message_data["task"]
  90. task_id = message_data["id"]
  91. args = message_data["args"]
  92. kwargs = message_data["kwargs"]
  93. if task_name not in tasks:
  94. raise UnknownTask(task_name)
  95. task_func = tasks[task_name]
  96. return cls(task_name, task_id, task_func, args, kwargs, logger=logger)
  97. def extend_with_default_kwargs(self, loglevel, logfile):
  98. """Extend the tasks keyword arguments with standard task arguments.
  99. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  100. """
  101. task_func_kwargs = {"logfile": logfile,
  102. "loglevel": loglevel,
  103. "task_id": self.task_id,
  104. "task_name": self.task_name}
  105. task_func_kwargs.update(self.kwargs)
  106. return task_func_kwargs
  107. def execute(self, loglevel=None, logfile=None):
  108. """Execute the task in a :func:`jail` and store return value
  109. and status in the task meta backend.
  110. :keyword loglevel: The loglevel used by the task.
  111. :keyword logfile: The logfile used by the task.
  112. """
  113. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  114. return jail(self.task_id, [
  115. self.task_func, self.args, task_func_kwargs])
  116. def on_success(self, ret_value, meta):
  117. task_id = meta.get("task_id")
  118. task_name = meta.get("task_name")
  119. msg = self.done_msg % {
  120. "id": task_id,
  121. "name": task_name,
  122. "return_value": ret_value}
  123. self.logger.info(msg)
  124. def on_failure(self, ret_value, meta):
  125. task_id = meta.get("task_id")
  126. task_name = meta.get("task_name")
  127. msg = self.done_msg % {
  128. "id": task_id,
  129. "name": task_name,
  130. "return_value": ret_value}
  131. self.logger.error(msg)
  132. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  133. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  134. :param pool: A :class:`multiprocessing.Pool` instance.
  135. :keyword loglevel: The loglevel used by the task.
  136. :keyword logfile: The logfile used by the task.
  137. :returns :class:`multiprocessing.AsyncResult` instance.
  138. """
  139. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  140. jail_args = [self.task_id, self.task_func,
  141. self.args, task_func_kwargs]
  142. return pool.apply_async(jail, args=jail_args,
  143. callbacks=[self.on_success], errbacks=[self.on_failure],
  144. meta={"task_id": self.task_id, "task_name": self.task_name})
  145. class WorkController(object):
  146. """Executes tasks waiting in the task queue.
  147. :param concurrency: see :attr:`concurrency`.
  148. :param logfile: see :attr:`logfile`.
  149. :param loglevel: see :attr:`loglevel`.
  150. :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
  151. .. attribute:: concurrency
  152. The number of simultaneous processes doing work (default:
  153. :const:`celery.conf.DAEMON_CONCURRENCY`)
  154. .. attribute:: loglevel
  155. The loglevel used (default: :const:`logging.INFO`)
  156. .. attribute:: logfile
  157. The logfile used, if no logfile is specified it uses ``stderr``
  158. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  159. .. attribute:: queue_wakeup_after
  160. The time it takes for the daemon to wake up after the queue is empty,
  161. so it can check for more work
  162. (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
  163. .. attribute:: empty_msg_emit_every
  164. How often the daemon emits the ``"Waiting for queue..."`` message.
  165. If this is ``None``, the message will never be logged.
  166. (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
  167. .. attribute:: logger
  168. The :class:`logging.Logger` instance used for logging.
  169. .. attribute:: pool
  170. The :class:`multiprocessing.Pool` instance used.
  171. .. attribute:: task_consumer
  172. The :class:`celery.messaging.TaskConsumer` instance used.
  173. """
  174. loglevel = logging.ERROR
  175. concurrency = DAEMON_CONCURRENCY
  176. logfile = DAEMON_LOG_FILE
  177. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  178. empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
  179. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  180. queue_wakeup_after=None, is_detached=False):
  181. self.loglevel = loglevel or self.loglevel
  182. self.concurrency = concurrency or self.concurrency
  183. self.logfile = logfile or self.logfile
  184. self.queue_wakeup_after = queue_wakeup_after or \
  185. self.queue_wakeup_after
  186. self.logger = setup_logger(loglevel, logfile)
  187. self.pool = TaskPool(self.concurrency)
  188. self.task_consumer = None
  189. self.is_detached = is_detached
  190. self.reset_connection()
  191. def reset_connection(self):
  192. """Reset the AMQP connection, and reinitialize the
  193. :class:`celery.messaging.TaskConsumer` instance.
  194. Resets the task consumer in :attr:`task_consumer`.
  195. """
  196. if self.task_consumer:
  197. self.task_consumer.connection.close()
  198. amqp_connection = DjangoAMQPConnection()
  199. self.task_consumer = TaskConsumer(connection=amqp_connection)
  200. def connection_diagnostics(self):
  201. """Diagnose the AMQP connection, and reset connection if
  202. necessary."""
  203. if hasattr(self.task_consumer, "backend"):
  204. connection = self.task_consumer.backend.channel.connection
  205. else:
  206. connection = self.task_consumer.channel.connection
  207. if not connection:
  208. self.logger.info(
  209. "AMQP Connection has died, restoring connection.")
  210. self.reset_connection()
  211. def receive_message(self):
  212. """Receive the next message from the message broker.
  213. Tries to reset the AMQP connection if not available.
  214. Returns ``None`` if no message is waiting on the queue.
  215. :rtype: :class:`carrot.messaging.Message` instance.
  216. """
  217. #self.connection_diagnostics()
  218. self.logger.debug("Trying to fetch message from broker...")
  219. message = self.task_consumer.fetch()
  220. if message is not None:
  221. self.logger.debug("Acknowledging message with delivery tag %s" % (
  222. message.delivery_tag))
  223. return message
  224. def fetch_next_task(self):
  225. """Fetch the next task from the AMQP broker.
  226. Raises :exc:`EmptyQueue` exception if there is no message
  227. waiting on the queue.
  228. :returns: :class:`TaskWrapper` instance.
  229. """
  230. message = self.receive_message()
  231. if message is None: # No messages waiting.
  232. raise EmptyQueue()
  233. task = TaskWrapper.from_message(message, logger=self.logger)
  234. self.logger.info("Got task from broker: %s[%s]" % (
  235. task.task_name, task.task_id))
  236. return task, message
  237. def execute_next_task(self):
  238. """Execute the next task on the queue using the multiprocessing pool.
  239. Catches all exceptions and logs them with level
  240. :const:`logging.CRITICAL`.
  241. """
  242. self.logger.debug("Trying to fetch a task.")
  243. task, message = self.fetch_next_task()
  244. self.logger.debug("Got a task: %s. Trying to execute it..." % task)
  245. result = task.execute_using_pool(self.pool, self.loglevel,
  246. self.logfile)
  247. self.logger.debug("Task %s has been executed asynchronously." % task)
  248. return result, task.task_name, task.task_id
  249. def run_periodic_tasks(self):
  250. """Schedule all waiting periodic tasks for execution.
  251. """
  252. self.logger.debug("Looking for periodic tasks ready for execution...")
  253. default_periodic_status_backend.run_periodic_tasks()
  254. def schedule_retry_tasks(self):
  255. """Reschedule all requeued tasks waiting for retry."""
  256. pass
  257. def run(self):
  258. """Starts the workers main loop."""
  259. log_wait = lambda: self.logger.info("Waiting for queue...")
  260. ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
  261. events = [
  262. EventTimer(self.run_periodic_tasks, 1),
  263. EventTimer(self.schedule_retry_tasks, 2),
  264. ]
  265. self.pool.run()
  266. # If not running as daemon, and DEBUG logging level is enabled,
  267. # print pool PIDs and sleep for a second before we start.
  268. if self.logger.isEnabledFor(logging.DEBUG):
  269. self.logger.debug("Pool child processes: [%s]" % (
  270. "|".join(map(str, self.pool.get_worker_pids()))))
  271. if not self.is_detached:
  272. time.sleep(1)
  273. while True:
  274. [event.tick() for event in events]
  275. try:
  276. result, task_name, task_id = self.execute_next_task()
  277. except ValueError:
  278. # execute_next_task didn't return a r/name/id tuple,
  279. # probably because it got an exception.
  280. continue
  281. except EmptyQueue:
  282. ev_msg_waiting.tick()
  283. time.sleep(self.queue_wakeup_after)
  284. continue
  285. except UnknownTask, e:
  286. self.logger.info("Unknown task ignored: %s" % (e))
  287. continue
  288. except Exception, e:
  289. self.logger.critical("Message queue raised %s: %s\n%s" % (
  290. e.__class__, e, traceback.format_exc()))
  291. continue