Переглянути джерело

Refactor celery/worker into multiple modules

Ask Solem 16 роки тому
батько
коміт
bc1c629591
3 змінених файлів з 244 додано та 225 видалено
  1. 171 0
      celery/worker/__init__.py
  2. 69 0
      celery/worker/controllers.py
  3. 4 225
      celery/worker/job.py

+ 171 - 0
celery/worker/__init__.py

@@ -0,0 +1,171 @@
+"""celery.worker"""
+from carrot.connection import DjangoAMQPConnection
+from celery.worker.controllers import Mediator, PeriodicWorkController
+from celery.worker.job import TaskWrapper, UnknownTaskError
+from celery.messaging import TaskConsumer
+from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
+from celery.log import setup_logger
+from celery.pool import TaskPool
+from Queue import Queue
+import traceback
+import logging
+
+
+class AMQPListener(object):
+
+    def __init__(self, bucket_queue, hold_queue, logger):
+        self.amqp_connection = None
+        self.task_consumer = None
+        self.bucket_queue = bucket_queue
+        self.hold_queue = hold_queue
+        self.logger = logger
+
+    def start(self):
+        task_consumer = self.reset_connection()
+        it = task_consumer.iterconsume(limit=None)
+        
+        while True:
+            it.next()
+
+    def stop(self):
+        self.close_connection()
+
+    def receive_message(self, message_data, message):
+        task = TaskWrapper.from_message(message, message_data,
+                                        logger=self.logger)
+        eta = message_data.get("eta")
+        if eta:
+           self.hold_queue.put((task, eta))
+        else:
+            self.bucket_queue.put(task)
+
+    def close_connection(self):
+        """Close the AMQP connection."""
+        if self.task_consumer:
+            self.task_consumer.close()
+        if self.amqp_connection:
+            self.amqp_connection.close()
+
+    def reset_connection(self):
+        """Reset the AMQP connection, and reinitialize the
+        :class:`celery.messaging.TaskConsumer` instance.
+
+        Resets the task consumer in :attr:`task_consumer`.
+
+        """
+        self.close_connection()
+        self.amqp_connection = DjangoAMQPConnection()
+        self.task_consumer = TaskConsumer(connection=self.amqp_connection)
+        self.task_consumer.register_callback(self.receive_message)
+        return self.task_consumer
+
+
+class WorkController(object):
+    """Executes tasks waiting in the task queue.
+
+    :param concurrency: see :attr:`concurrency`.
+
+    :param logfile: see :attr:`logfile`.
+
+    :param loglevel: see :attr:`loglevel`.
+
+
+    .. attribute:: concurrency
+
+        The number of simultaneous processes doing work (default:
+        :const:`celery.conf.DAEMON_CONCURRENCY`)
+
+    .. attribute:: loglevel
+
+        The loglevel used (default: :const:`logging.INFO`)
+
+    .. attribute:: logfile
+
+        The logfile used, if no logfile is specified it uses ``stderr``
+        (default: :const:`celery.conf.DAEMON_LOG_FILE`).
+
+    .. attribute:: logger
+
+        The :class:`logging.Logger` instance used for logging.
+
+    .. attribute:: pool
+
+        The :class:`multiprocessing.Pool` instance used.
+
+    .. attribute:: task_consumer
+
+        The :class:`celery.messaging.TaskConsumer` instance used.
+
+    """
+    loglevel = logging.ERROR
+    concurrency = DAEMON_CONCURRENCY
+    logfile = DAEMON_LOG_FILE
+    _state = None
+
+    def __init__(self, concurrency=None, logfile=None, loglevel=None,
+            is_detached=False):
+
+        # Options
+        self.loglevel = loglevel or self.loglevel
+        self.concurrency = concurrency or self.concurrency
+        self.logfile = logfile or self.logfile
+        self.is_detached = is_detached
+        self.logger = setup_logger(loglevel, logfile)
+
+        # Queues
+        self.bucket_queue = Queue()
+        self.hold_queue = Queue()
+
+        # Threads+Pool
+        self.periodicworkcontroller = PeriodicWorkController(
+                                                    self.bucket_queue,
+                                                    self.hold_queue)
+        self.pool = TaskPool(self.concurrency, logger=self.logger)
+        self.mediator = Mediator(self.bucket_queue, self.process_task)
+        self.amqp_listener = AMQPListener(self.bucket_queue, self.hold_queue,
+                                          logger=self.logger)
+
+    def run(self):
+        """Starts the workers main loop."""
+        self._state = "RUN"
+
+        self.pool.run()
+        self.mediator.start()
+        self.periodicworkcontroller.start()
+
+        try:
+            self.amqp_listener.start()
+        except (SystemExit, KeyboardInterrupt):
+            self.shutdown()
+
+    def process_task(self, task):
+        """Process task by passing it to the pool of workers."""
+        try:
+            try:
+                self.logger.info("Got task from broker: %s[%s]" % (
+                    task.task_name, task.task_id))
+                task.execute_using_pool(self.pool, self.loglevel,
+                                        self.logfile)
+                self.logger.debug("Task %s has been executed." % task)
+            except ValueError:
+                # execute_next_task didn't return a r/name/id tuple,
+                # probably because it got an exception.
+                pass
+            except UnknownTaskError, exc:
+                self.logger.info("Unknown task ignored: %s" % (exc))
+            except Exception, exc:
+                self.logger.critical("Message queue raised %s: %s\n%s" % (
+                                exc.__class__, exc, traceback.format_exc()))
+        except (SystemExit, KeyboardInterrupt):
+            self.shutdown()
+
+    def shutdown(self):
+        """Make sure ``celeryd`` exits cleanly."""
+        # shut down the periodic work controller thread
+        if self._state != "RUN":
+            return
+        self._state = "TERMINATE"
+        self.amqp_listener.stop()
+        self.mediator.stop()
+        self.periodicworkcontroller.stop()
+        self.pool.terminate()

+ 69 - 0
celery/worker/controllers.py

@@ -0,0 +1,69 @@
+from celery.backends import default_periodic_status_backend
+from Queue import Empty as QueueEmpty
+from datetime import datetime
+import threading
+import time
+
+
+class Mediator(threading.Thread):
+    """Thread continuously passing tasks in the queue
+    to the pool."""
+
+    def __init__(self, bucket_queue, callback):
+        super(Mediator, self).__init__()
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+        self.bucket_queue = bucket_queue
+        self.callback = callback
+
+    def run(self):
+        while True:
+            if self._shutdown.isSet():
+                break
+            # This blocks until there's a message in the queue.
+            task = self.bucket_queue.get()
+            self.callback(task)
+        self._stopped.set() # indicate that we are stopped
+
+    def stop(self):
+        """Shutdown the thread."""
+        self._shutdown.set()
+        self._stopped.wait() # block until this thread is done
+
+
+class PeriodicWorkController(threading.Thread):
+    """A thread that continuously checks if there are
+    :class:`celery.task.PeriodicTask` tasks waiting for execution,
+    and executes them."""
+
+    def __init__(self, bucket_queue, hold_queue):
+        super(PeriodicWorkController, self).__init__()
+        self._shutdown = threading.Event()
+        self._stopped = threading.Event()
+        self.hold_queue = hold_queue
+        self.bucket_queue = bucket_queue
+
+    def run(self):
+        """Run when you use :meth:`Thread.start`"""
+        while True:
+            if self._shutdown.isSet():
+                break
+            default_periodic_status_backend.run_periodic_tasks()
+            self.process_hold_queue()
+            time.sleep(1)
+        self._stopped.set() # indicate that we are stopped
+
+    def process_hold_queue(self):
+        try:
+            task, eta = self.hold_queue.get_nowait()
+        except QueueEmpty:
+            return
+        if datetime.now() >= eta:
+            self.bucket_queue.put(task)
+        else:
+            self.hold_queue.put((task, eta))
+
+    def stop(self):
+        """Shutdown the thread."""
+        self._shutdown.set()
+        self._stopped.wait() # block until this thread is done

+ 4 - 225
celery/worker.py → celery/worker/job.py

@@ -1,27 +1,12 @@
-"""celery.worker"""
-from carrot.connection import DjangoAMQPConnection
-from celery.messaging import TaskConsumer
-from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
 from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
-from celery.log import setup_logger
 from celery.registry import tasks
-from celery.pool import TaskPool
 from celery.datastructures import ExceptionInfo
-from celery.backends import default_backend, default_periodic_status_backend
-from celery.timer import EventTimer
+from celery.backends import default_backend
 from django.core.mail import mail_admins
 from celery.monitoring import TaskTimerStats
-from datetime import datetime, timedelta
-from Queue import Queue
-from Queue import Empty as QueueEmpty
-from multiprocessing import TimeoutError
 import multiprocessing
 import traceback
-import threading
-import logging
-import signal
 import socket
-import time
 import sys
 
 
@@ -41,7 +26,7 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
-class UnknownTask(Exception):
+class UnknownTaskError(Exception):
     """Got an unknown task in the queue. The message is requeued and
     ignored."""
 
@@ -163,7 +148,7 @@ class TaskWrapper(object):
         """Create a :class:`TaskWrapper` from a task message sent by
         :class:`celery.messaging.TaskPublisher`.
 
-        :raises UnknownTask: if the message does not describe a task,
+        :raises UnknownTaskError: if the message does not describe a task,
             the message is also rejected.
 
         :returns: :class:`TaskWrapper` instance.
@@ -179,7 +164,7 @@ class TaskWrapper(object):
                     for key, value in kwargs.items()])
 
         if task_name not in tasks:
-            raise UnknownTask(task_name)
+            raise UnknownTaskError(task_name)
         task_func = tasks[task_name]
         return cls(task_name, task_id, task_func, args, kwargs,
                     on_acknowledge=message.ack, logger=logger)
@@ -264,209 +249,3 @@ class TaskWrapper(object):
                 on_acknowledge=self.on_acknowledge,
                 meta={"task_id": self.task_id, "task_name": self.task_name})
 
-
-class Mediator(threading.Thread):
-    """Thread continuously passing tasks in the queue
-    to the pool."""
-
-    def __init__(self, bucket_queue, callback):
-        super(Mediator, self).__init__()
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
-        self.bucket_queue = bucket_queue
-        self.callback = callback
-
-    def run(self):
-        while True:
-            if self._shutdown.isSet():
-                break
-            # This blocks until there's a message in the queue.
-            task = self.bucket_queue.get()
-            self.callback(task)
-        self._stopped.set() # indicate that we are stopped
-
-    def stop(self):
-        """Shutdown the thread."""
-        self._shutdown.set()
-        self._stopped.wait() # block until this thread is done
-
-
-class PeriodicWorkController(threading.Thread):
-    """A thread that continuously checks if there are
-    :class:`celery.task.PeriodicTask` tasks waiting for execution,
-    and executes them."""
-
-    def __init__(self, bucket_queue, hold_queue):
-        super(PeriodicWorkController, self).__init__()
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
-        self.hold_queue = hold_queue
-        self.bucket_queue = bucket_queue
-
-    def run(self):
-        """Run when you use :meth:`Thread.start`"""
-        while True:
-            if self._shutdown.isSet():
-                break
-            default_periodic_status_backend.run_periodic_tasks()
-            self.process_hold_queue()
-            time.sleep(1)
-        self._stopped.set() # indicate that we are stopped
-
-    def process_hold_queue(self):
-        try:
-            task, eta = self.hold_queue.get_nowait()
-        except QueueEmpty:
-            return
-        if datetime.now() >= eta:
-            self.bucket_queue.put(task)
-        else:
-            self.hold_queue.put((task, eta))
-
-    def stop(self):
-        """Shutdown the thread."""
-        self._shutdown.set()
-        self._stopped.wait() # block until this thread is done
-
-
-class WorkController(object):
-    """Executes tasks waiting in the task queue.
-
-    :param concurrency: see :attr:`concurrency`.
-
-    :param logfile: see :attr:`logfile`.
-
-    :param loglevel: see :attr:`loglevel`.
-
-
-    .. attribute:: concurrency
-
-        The number of simultaneous processes doing work (default:
-        :const:`celery.conf.DAEMON_CONCURRENCY`)
-
-    .. attribute:: loglevel
-
-        The loglevel used (default: :const:`logging.INFO`)
-
-    .. attribute:: logfile
-
-        The logfile used, if no logfile is specified it uses ``stderr``
-        (default: :const:`celery.conf.DAEMON_LOG_FILE`).
-
-    .. attribute:: logger
-
-        The :class:`logging.Logger` instance used for logging.
-
-    .. attribute:: pool
-
-        The :class:`multiprocessing.Pool` instance used.
-
-    .. attribute:: task_consumer
-
-        The :class:`celery.messaging.TaskConsumer` instance used.
-
-    """
-    loglevel = logging.ERROR
-    concurrency = DAEMON_CONCURRENCY
-    logfile = DAEMON_LOG_FILE
-    _state = None
-
-    def __init__(self, concurrency=None, logfile=None, loglevel=None,
-            is_detached=False):
-
-        # Options
-        self.loglevel = loglevel or self.loglevel
-        self.concurrency = concurrency or self.concurrency
-        self.logfile = logfile or self.logfile
-        self.is_detached = is_detached
-        self.logger = setup_logger(loglevel, logfile)
-        self.amqp_connection = None
-        self.task_consumer = None
-
-        # Queues
-        self.bucket_queue = Queue()
-        self.hold_queue = Queue()
-
-        # Threads+Pool
-        self.periodicworkcontroller = PeriodicWorkController(
-                                                    self.bucket_queue,
-                                                    self.hold_queue)
-        self.pool = TaskPool(self.concurrency, logger=self.logger)
-        self.mediator = Mediator(self.bucket_queue, self.process_task)
-
-    def run(self):
-        """Starts the workers main loop."""
-        self._state = "RUN"
-        task_consumer = self.reset_connection()
-        it = task_consumer.iterconsume(limit=None)
-
-        self.pool.run()
-        self.mediator.start()
-        self.periodicworkcontroller.start()
-
-        try:
-            while True:
-                it.next()
-        except (SystemExit, KeyboardInterrupt):
-            self.shutdown()
-
-    def process_task(self, task):
-        """Process task by passing it to the pool of workers."""
-        try:
-            try:
-                self.logger.info("Got task from broker: %s[%s]" % (
-                    task.task_name, task.task_id))
-                task.execute_using_pool(self.pool, self.loglevel,
-                                        self.logfile)
-                self.logger.debug("Task %s has been executed." % task)
-            except ValueError:
-                # execute_next_task didn't return a r/name/id tuple,
-                # probably because it got an exception.
-                pass
-            except UnknownTask, exc:
-                self.logger.info("Unknown task ignored: %s" % (exc))
-            except Exception, exc:
-                self.logger.critical("Message queue raised %s: %s\n%s" % (
-                                exc.__class__, exc, traceback.format_exc()))
-        except (SystemExit, KeyboardInterrupt):
-            self.shutdown()
-
-    def receive_message(self, message_data, message):
-        task = TaskWrapper.from_message(message, message_data,
-                                        logger=self.logger)
-        eta = message_data.get("eta")
-        if eta:
-           self.hold_queue.put((task, eta))
-        else:
-            self.bucket_queue.put(task)
-
-    def close_connection(self):
-        """Close the AMQP connection."""
-        if self.task_consumer:
-            self.task_consumer.close()
-        if self.amqp_connection:
-            self.amqp_connection.close()
-
-    def reset_connection(self):
-        """Reset the AMQP connection, and reinitialize the
-        :class:`celery.messaging.TaskConsumer` instance.
-
-        Resets the task consumer in :attr:`task_consumer`.
-
-        """
-        self.close_connection()
-        self.amqp_connection = DjangoAMQPConnection()
-        self.task_consumer = TaskConsumer(connection=self.amqp_connection)
-        self.task_consumer.register_callback(self.receive_message)
-        return self.task_consumer
-
-    def shutdown(self):
-        """Make sure ``celeryd`` exits cleanly."""
-        # shut down the periodic work controller thread
-        if self._state != "RUN":
-            return
-        self._state = "TERMINATE"
-        self.mediator.stop()
-        self.periodicworkcontroller.stop()
-        self.pool.terminate()
-        self.close_connection()