瀏覽代碼

Forgot to import default-backend (closes JIRA #OPAL-224)

Ask Solem 16 年之前
父節點
當前提交
58900c42ef
共有 4 個文件被更改,包括 122 次插入31 次删除
  1. 1 1
      celery/bin/celeryd.py
  2. 1 0
      celery/task/builtins.py
  3. 85 24
      celery/worker/__init__.py
  4. 35 6
      celery/worker/controllers.py

+ 1 - 1
celery/bin/celeryd.py

@@ -264,7 +264,7 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
                                 logfile=logfile,
                                 is_detached=detach)
         try:
-            worker.run()
+            worker.start()
         except Exception, e:
             emergency_error(logfile, "celeryd raised exception %s: %s\n%s" % (
                             e.__class__, e, traceback.format_exc()))

+ 1 - 0
celery/task/builtins.py

@@ -1,5 +1,6 @@
 from celery.task.base import Task, TaskSet, PeriodicTask
 from celery.registry import tasks
+from celery.backends import default_backend
 from datetime import timedelta
 try:
     import cPickle as pickle

+ 85 - 24
celery/worker/__init__.py

@@ -1,4 +1,10 @@
-"""celery.worker"""
+"""
+
+The Multiprocessing Worker Server
+
+Documentation for this module is in ``docs/reference/celery.worker.rst``.
+
+"""
 from carrot.connection import DjangoAMQPConnection
 from celery.worker.controllers import Mediator, PeriodicWorkController
 from celery.worker.job import TaskWrapper, UnknownTaskError
@@ -12,6 +18,26 @@ import logging
 
 
 class AMQPListener(object):
+    """Listen for messages received from the AMQP broker and
+    move them the the bucket queue for task processing.
+
+    :param bucket_queue: See :attr:`bucket_queue`.
+    :param hold_queue: See :attr:`hold_queue`.
+   
+    .. attribute:: bucket_queue
+
+        The queue that holds tasks ready for processing immediately.
+
+    .. attribute:: hold_queue
+
+        The queue that holds paused tasks. Reasons for being paused include
+        a countdown/eta or that it's waiting for retry.
+
+    .. attribute:: logger
+
+        The logger used.
+
+    """
 
     def __init__(self, bucket_queue, hold_queue, logger):
         self.amqp_connection = None
@@ -21,6 +47,7 @@ class AMQPListener(object):
         self.logger = logger
 
     def start(self):
+        """Start processing AMQP messages."""
         task_consumer = self.reset_connection()
         it = task_consumer.iterconsume(limit=None)
         
@@ -28,9 +55,17 @@ class AMQPListener(object):
             it.next()
 
     def stop(self):
+        """Stop processing AMQP messages and close the connection
+        to the broker."""
         self.close_connection()
 
     def receive_message(self, message_data, message):
+        """The callback called when a new message is received.
+        
+        If the message has an ``eta`` we move it to the hold queue,
+        otherwise we move it the bucket queue for immediate processing.
+        
+        """
         task = TaskWrapper.from_message(message, message_data,
                                         logger=self.logger)
         eta = message_data.get("eta")
@@ -64,9 +99,7 @@ class WorkController(object):
     """Executes tasks waiting in the task queue.
 
     :param concurrency: see :attr:`concurrency`.
-
     :param logfile: see :attr:`logfile`.
-
     :param loglevel: see :attr:`loglevel`.
 
 
@@ -88,13 +121,36 @@ class WorkController(object):
 
         The :class:`logging.Logger` instance used for logging.
 
+    .. attribute:: is_detached
+
+        Flag describing if the worker is running as a daemon or not.
+
     .. attribute:: pool
 
         The :class:`multiprocessing.Pool` instance used.
 
-    .. attribute:: task_consumer
+    .. attribute:: bucket_queue
+
+        The :class:`Queue.Queue` that holds tasks ready for immediate
+        processing.
+
+    .. attribute:: hold_queue
+
+        The :class:`Queue.Queue` that holds paused tasks. Reasons for holding
+        back the task include waiting for ``eta`` to pass or the task is being
+        retried.
+
+    .. attribute:: periodic_work_controller
+
+        Instance of :class:`celery.worker.controllers.PeriodicWorkController`.
+
+    .. attribute:: mediator
 
-        The :class:`celery.messaging.TaskConsumer` instance used.
+        Instance of :class:`celery.worker.controllers.Mediator`.
+
+    .. attribute:: amqp_listener
+
+        Instance of :class:`AMQPListener`.
 
     """
     loglevel = logging.ERROR
@@ -117,36 +173,33 @@ class WorkController(object):
         self.hold_queue = Queue()
 
         # Threads+Pool
-        self.periodicworkcontroller = PeriodicWorkController(
+        self.periodic_work_controller = PeriodicWorkController(
                                                     self.bucket_queue,
                                                     self.hold_queue)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
-        self.mediator = Mediator(self.bucket_queue, self.process_task)
         self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
                                           logger=self.logger)
+        self.mediator = Mediator(self.bucket_queue, self.safe_process_task)
 
-    def run(self):
+    def start(self):
         """Starts the workers main loop."""
         self._state = "RUN"
 
-        self.pool.run()
-        self.mediator.start()
-        self.periodicworkcontroller.start()
-
         try:
+            self.pool.run()
+            self.mediator.start()
+            self.periodic_work_controller.start()
             self.amqp_listener.start()
         finally:
-            self.shutdown()
+            self.stop()
 
-    def process_task(self, task):
-        """Process task by passing it to the pool of workers."""
+    def safe_process_task(self, task):
+        """Same as :meth:`process_task`, but catch all exceptions
+        the task raises and log them as errors, to make sure the
+        worker doesn't die."""
         try:
             try:
-                self.logger.info("Got task from broker: %s[%s]" % (
-                    task.task_name, task.task_id))
-                task.execute_using_pool(self.pool, self.loglevel,
-                                        self.logfile)
-                self.logger.debug("Task %s has been executed." % task)
+                self.process_task(task)
             except ValueError:
                 # execute_next_task didn't return a r/name/id tuple,
                 # probably because it got an exception.
@@ -157,15 +210,23 @@ class WorkController(object):
                 self.logger.critical("Message queue raised %s: %s\n%s" % (
                                 exc.__class__, exc, traceback.format_exc()))
         except (SystemExit, KeyboardInterrupt):
-            self.shutdown()
+            self.stop()
+
+    def process_task(self, task):
+        """Process task by sending it to the pool of workers."""
+        self.logger.info("Got task from broker: %s[%s]" % (
+                task.task_name, task.task_id))
+        task.execute_using_pool(self.pool, self.loglevel, self.logfile)
+        self.logger.debug("Task %s has been executed." % task)
 
-    def shutdown(self):
-        """Make sure ``celeryd`` exits cleanly."""
+    def stop(self):
+        """Gracefully shutdown the worker server."""
         # shut down the periodic work controller thread
         if self._state != "RUN":
             return
         self._state = "TERMINATE"
         self.amqp_listener.stop()
         self.mediator.stop()
-        self.periodicworkcontroller.stop()
+        self.periodic_work_controller.stop()
         self.pool.terminate()
+        self._state = "STOP"

+ 35 - 6
celery/worker/controllers.py

@@ -1,3 +1,8 @@
+"""
+
+Worker Controller Threads
+
+"""
 from celery.backends import default_periodic_status_backend
 from Queue import Empty as QueueEmpty
 from datetime import datetime
@@ -6,8 +11,18 @@ import time
 
 
 class Mediator(threading.Thread):
-    """Thread continuously passing tasks in the queue
-    to the pool."""
+    """Thread continuously sending tasks in the queue to the pool.
+
+    .. attribute:: bucket_queue
+
+        The task queue, a :class:`Queue.Queue` instance.
+
+    .. attribute:: callback
+
+        The callback used to process tasks retrieved from the
+        :attr:`bucket_queue`.
+
+    """
 
     def __init__(self, bucket_queue, callback):
         super(Mediator, self).__init__()
@@ -17,6 +32,8 @@ class Mediator(threading.Thread):
         self.callback = callback
 
     def run(self):
+        """Run the mediator thread. Should not be used directly, use
+        :meth:`start` instead."""
         while True:
             if self._shutdown.isSet():
                 break
@@ -30,7 +47,7 @@ class Mediator(threading.Thread):
         self._stopped.set() # indicate that we are stopped
 
     def stop(self):
-        """Shutdown the thread."""
+        """Gracefully shutdown the thread."""
         self._shutdown.set()
         self._stopped.wait() # block until this thread is done
 
@@ -38,7 +55,13 @@ class Mediator(threading.Thread):
 class PeriodicWorkController(threading.Thread):
     """A thread that continuously checks if there are
     :class:`celery.task.PeriodicTask` tasks waiting for execution,
-    and executes them."""
+    and executes them. It also finds tasks in the hold queue that is
+    ready for execution and moves them to the bucket queue.
+
+    (Tasks in the hold queue are tasks waiting for retry, or with an
+    ``eta``/``countdown``.)
+
+    """
 
     def __init__(self, bucket_queue, hold_queue):
         super(PeriodicWorkController, self).__init__()
@@ -48,7 +71,11 @@ class PeriodicWorkController(threading.Thread):
         self.bucket_queue = bucket_queue
 
     def run(self):
-        """Run when you use :meth:`Thread.start`"""
+        """Run the thread.
+        
+        Should not be used directly, use :meth:`start` instead.
+        
+        """
         while True:
             if self._shutdown.isSet():
                 break
@@ -58,6 +85,8 @@ class PeriodicWorkController(threading.Thread):
         self._stopped.set() # indicate that we are stopped
 
     def process_hold_queue(self):
+        """Finds paused tasks that are ready for execution and move
+        them to the :attr:`bucket_queue`."""
         try:
             task, eta = self.hold_queue.get_nowait()
         except QueueEmpty:
@@ -68,6 +97,6 @@ class PeriodicWorkController(threading.Thread):
             self.hold_queue.put((task, eta))
 
     def stop(self):
-        """Shutdown the thread."""
+        """Gracefully shutdown the thread."""
         self._shutdown.set()
         self._stopped.wait() # block until this thread is done