worker.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. """celery.worker"""
  2. from carrot.connection import DjangoAMQPConnection
  3. from celery.messaging import TaskConsumer
  4. from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
  5. from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
  6. from celery.log import setup_logger
  7. from celery.registry import tasks
  8. from celery.datastructures import TaskProcessQueue
  9. from celery.models import PeriodicTaskMeta
  10. from celery.backends import default_backend, default_periodic_status_backend
  11. from celery.timer import EventTimer
  12. import multiprocessing
  13. import simplejson
  14. import traceback
  15. import logging
  16. import time
  17. class EmptyQueue(Exception):
  18. """The message queue is currently empty."""
  19. class UnknownTask(Exception):
  20. """Got an unknown task in the queue. The message is requeued and
  21. ignored."""
  22. def jail(task_id, func, args, kwargs):
  23. """Wraps the task in a jail, which catches all exceptions, and
  24. saves the status and result of the task execution to the task
  25. meta backend.
  26. If the call was successful, it saves the result to the task result
  27. backend, and sets the task status to ``"DONE"``.
  28. If the call results in an exception, it saves the exception as the task
  29. result, and sets the task status to ``"FAILURE"``.
  30. :param task_id: The id of the task.
  31. :param func: Callable object to execute.
  32. :param args: List of positional args to pass on to the function.
  33. :param kwargs: Keyword arguments mapping to pass on to the function.
  34. :returns: the function return value on success, or
  35. the exception instance on failure.
  36. """
  37. # Convert any unicode keys in the keyword arguments to ascii.
  38. kwargs = dict([(k.encode("utf-8"), v)
  39. for k, v in kwargs.items()])
  40. try:
  41. result = func(*args, **kwargs)
  42. except Exception, exc:
  43. default_backend.mark_as_failure(task_id, exc)
  44. return exc
  45. else:
  46. default_backend.mark_as_done(task_id, result)
  47. return result
  48. class TaskWrapper(object):
  49. """Class wrapping a task to be run.
  50. :param task_name: see :attr:`task_name`.
  51. :param task_id: see :attr:`task_id`.
  52. :param task_func: see :attr:`task_func`
  53. :param args: see :attr:`args`
  54. :param kwargs: see :attr:`kwargs`.
  55. .. attribute:: task_name
  56. Kind of task. Must be a name registered in the task registry.
  57. .. attribute:: task_id
  58. UUID of the task.
  59. .. attribute:: task_func
  60. The tasks callable object.
  61. .. attribute:: args
  62. List of positional arguments to apply to the task.
  63. .. attribute:: kwargs
  64. Mapping of keyword arguments to apply to the task.
  65. """
  66. def __init__(self, task_name, task_id, task_func, args, kwargs):
  67. self.task_name = task_name
  68. self.task_id = task_id
  69. self.task_func = task_func
  70. self.args = args
  71. self.kwargs = kwargs
  72. def __repr__(self):
  73. return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
  74. self.__class__.__name__,
  75. self.task_name, self.task_id,
  76. self.args, self.kwargs)
  77. @classmethod
  78. def from_message(cls, message):
  79. """Create a :class:`TaskWrapper` from a task message sent by
  80. :class:`celery.messaging.TaskPublisher`.
  81. :raises UnknownTask: if the message does not describe a task,
  82. the message is also rejected.
  83. :returns: :class:`TaskWrapper` instance.
  84. """
  85. message_data = message.decode()
  86. task_name = message_data["task"]
  87. task_id = message_data["id"]
  88. args = message_data["args"]
  89. kwargs = message_data["kwargs"]
  90. if task_name not in tasks:
  91. message.reject()
  92. raise UnknownTask(task_name)
  93. task_func = tasks[task_name]
  94. return cls(task_name, task_id, task_func, args, kwargs)
  95. def extend_with_default_kwargs(self, loglevel, logfile):
  96. """Extend the tasks keyword arguments with standard task arguments.
  97. These are ``logfile``, ``loglevel``, ``task_id`` and ``task_name``.
  98. """
  99. task_func_kwargs = {"logfile": logfile,
  100. "loglevel": loglevel,
  101. "task_id": self.task_id,
  102. "task_name": self.task_name}
  103. task_func_kwargs.update(self.kwargs)
  104. return task_func_kwargs
  105. def execute(self, loglevel=None, logfile=None):
  106. """Execute the task in a :func:`jail` and store return value
  107. and status in the task meta backend.
  108. :keyword loglevel: The loglevel used by the task.
  109. :keyword logfile: The logfile used by the task.
  110. """
  111. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  112. return jail(self.task_id, [
  113. self.task_func, self.args, task_func_kwargs])
  114. def execute_using_pool(self, pool, loglevel=None, logfile=None):
  115. """Like :meth:`execute`, but using the :mod:`multiprocessing` pool.
  116. :param pool: A :class:`multiprocessing.Pool` instance.
  117. :keyword loglevel: The loglevel used by the task.
  118. :keyword logfile: The logfile used by the task.
  119. :returns :class:`multiprocessing.AsyncResult` instance.
  120. """
  121. task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
  122. jail_args = [self.task_id, self.task_func,
  123. self.args, task_func_kwargs]
  124. return pool.apply_async(jail, jail_args, {},
  125. self.task_name, self.task_id)
  126. class WorkController(object):
  127. """Executes tasks waiting in the task queue.
  128. :param concurrency: see :attr:`concurrency`.
  129. :param logfile: see :attr:`logfile`.
  130. :param loglevel: see :attr:`loglevel`.
  131. :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
  132. .. attribute:: concurrency
  133. The number of simultaneous processes doing work (default:
  134. :const:`celery.conf.DAEMON_CONCURRENCY`)
  135. .. attribute:: loglevel
  136. The loglevel used (default: :const:`logging.INFO`)
  137. .. attribute:: logfile
  138. The logfile used, if no logfile is specified it uses ``stderr``
  139. (default: :const:`celery.conf.DAEMON_LOG_FILE`).
  140. .. attribute:: queue_wakeup_after
  141. The time it takes for the daemon to wake up after the queue is empty,
  142. so it can check for more work
  143. (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
  144. .. attribute:: empty_msg_emit_every
  145. How often the daemon emits the ``"Waiting for queue..."`` message.
  146. If this is ``None``, the message will never be logged.
  147. (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
  148. .. attribute:: logger
  149. The :class:`logging.Logger` instance used for logging.
  150. .. attribute:: pool
  151. The :class:`multiprocessing.Pool` instance used.
  152. .. attribute:: task_consumer
  153. The :class:`celery.messaging.TaskConsumer` instance used.
  154. """
  155. loglevel = logging.ERROR
  156. concurrency = DAEMON_CONCURRENCY
  157. logfile = DAEMON_LOG_FILE
  158. queue_wakeup_after = QUEUE_WAKEUP_AFTER
  159. empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
  160. def __init__(self, concurrency=None, logfile=None, loglevel=None,
  161. queue_wakeup_after=None, is_detached=False):
  162. self.loglevel = loglevel or self.loglevel
  163. self.concurrency = concurrency or self.concurrency
  164. self.logfile = logfile or self.logfile
  165. self.queue_wakeup_after = queue_wakeup_after or \
  166. self.queue_wakeup_after
  167. self.logger = setup_logger(loglevel, logfile)
  168. self.pool = TaskProcessQueue(self.concurrency, logger=self.logger,
  169. done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
  170. self.task_consumer = None
  171. self.is_detached = is_detached
  172. self.reset_connection()
  173. def reset_connection(self):
  174. """Reset the AMQP connection, and reinitialize the
  175. :class:`celery.messaging.TaskConsumer` instance.
  176. Resets the task consumer in :attr:`task_consumer`.
  177. """
  178. if self.task_consumer:
  179. self.task_consumer.connection.close()
  180. amqp_connection = DjangoAMQPConnection()
  181. self.task_consumer = TaskConsumer(connection=amqp_connection)
  182. def connection_diagnostics(self):
  183. """Diagnose the AMQP connection, and reset connection if
  184. necessary."""
  185. if hasattr(self.task_consumer, "backend"):
  186. connection = self.task_consumer.backend.channel.connection
  187. else:
  188. connection = self.task_consumer.channel.connection
  189. if not connection:
  190. self.logger.info(
  191. "AMQP Connection has died, restoring connection.")
  192. self.reset_connection()
  193. def receive_message(self):
  194. """Receive the next message from the message broker.
  195. Tries to reset the AMQP connection if not available.
  196. Returns ``None`` if no message is waiting on the queue.
  197. :rtype: :class:`carrot.messaging.Message` instance.
  198. """
  199. #self.connection_diagnostics()
  200. self.logger.debug("Trying to fetch message from broker...")
  201. message = self.task_consumer.fetch()
  202. if message is not None:
  203. self.logger.debug("Acknowledging message with delivery tag %s" % (
  204. message.delivery_tag))
  205. message.ack()
  206. return message
  207. def fetch_next_task(self):
  208. """Fetch the next task from the AMQP broker.
  209. Raises :exc:`EmptyQueue` exception if there is no message
  210. waiting on the queue.
  211. :returns: :class:`TaskWrapper` instance.
  212. """
  213. message = self.receive_message()
  214. if message is None: # No messages waiting.
  215. raise EmptyQueue()
  216. task = TaskWrapper.from_message(message)
  217. self.logger.info("Got task from broker: %s[%s]" % (
  218. task.task_name, task.task_id))
  219. return task, message
  220. def execute_next_task(self):
  221. """Execute the next task on the queue using the multiprocessing pool.
  222. Catches all exceptions and logs them with level
  223. :const:`logging.CRITICAL`.
  224. """
  225. self.logger.debug("Trying to fetch a task.")
  226. task, message = self.fetch_next_task()
  227. self.logger.debug("Got a task: %s. Trying to execute it..." % task)
  228. result = task.execute_using_pool(self.pool, self.loglevel,
  229. self.logfile)
  230. self.logger.debug("Task %s has been executed asynchronously." % task)
  231. return result, task.task_name, task.task_id
  232. def run_periodic_tasks(self):
  233. """Schedule all waiting periodic tasks for execution.
  234. """
  235. self.logger.debug("Looking for periodic tasks ready for execution...")
  236. default_periodic_status_backend.run_periodic_tasks()
  237. def schedule_retry_tasks(self):
  238. """Reschedule all requeued tasks waiting for retry."""
  239. pass
  240. def run(self):
  241. """Starts the workers main loop."""
  242. log_wait = lambda: self.logger.info("Waiting for queue...")
  243. ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
  244. events = [
  245. EventTimer(self.run_periodic_tasks, 1),
  246. EventTimer(self.schedule_retry_tasks, 2),
  247. ]
  248. # If not running as daemon, and DEBUG logging level is enabled,
  249. # print pool PIDs and sleep for a second before we start.
  250. if self.logger.isEnabledFor(logging.DEBUG):
  251. self.logger.debug("Pool child processes: [%s]" % (
  252. "|".join(map(str, self.pool.get_worker_pids()))))
  253. if not self.is_detached:
  254. time.sleep(1)
  255. while True:
  256. [event.tick() for event in events]
  257. try:
  258. result, task_name, task_id = self.execute_next_task()
  259. except ValueError:
  260. # execute_next_task didn't return a r/name/id tuple,
  261. # probably because it got an exception.
  262. continue
  263. except EmptyQueue:
  264. ev_msg_waiting.tick()
  265. time.sleep(self.queue_wakeup_after)
  266. continue
  267. except UnknownTask, e:
  268. self.logger.info("Unknown task ignored: %s" % (e))
  269. continue
  270. except Exception, e:
  271. self.logger.critical("Message queue raised %s: %s\n%s" % (
  272. e.__class__, e, traceback.format_exc()))
  273. continue