Bladeren bron

Merge branch 'consumebasic'

Ask Solem 16 jaren geleden
bovenliggende
commit
2a0fcf6443
4 gewijzigde bestanden met toevoegingen van 39 en 128 verwijderingen
  1. 2 16
      celery/bin/celeryd.py
  2. 0 24
      celery/conf.py
  3. 0 4
      celery/tests/test_conf.py
  4. 37 84
      celery/worker.py

+ 2 - 16
celery/bin/celeryd.py

@@ -20,12 +20,6 @@
 
     Path to pidfile.
 
-.. cmdoption:: -w, --wakeup-after
-
-    If the queue is empty, this is the time *in seconds* the
-    daemon sleeps until it wakes up to check if there's any
-    new messages on the queue.
-
 .. cmdoption:: -d, --detach, --daemon
 
     Run in the background as a daemon.
@@ -68,7 +62,6 @@ from django.conf import settings
 from celery.log import emergency_error
 from celery.conf import LOG_LEVELS, DAEMON_LOG_FILE, DAEMON_LOG_LEVEL
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_PID_FILE
-from celery.conf import QUEUE_WAKEUP_AFTER
 from celery import discovery
 from celery.task import discard_all
 from celery.worker import WorkController
@@ -111,9 +104,8 @@ def acquire_pidlock(pidfile):
 
 def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
         loglevel=DAEMON_LOG_LEVEL, logfile=DAEMON_LOG_FILE, discard=False,
-        pidfile=DAEMON_PID_FILE, queue_wakeup_after=QUEUE_WAKEUP_AFTER,
-        umask=0, uid=None, gid=None, working_directory=None, chroot=None,
-        **kwargs):
+        pidfile=DAEMON_PID_FILE, umask=0, uid=None, gid=None,
+        working_directory=None, chroot=None, **kwargs):
     """Run the celery daemon."""
     if settings.DATABASE_ENGINE == "sqlite3" and concurrency > 1:
         import warnings
@@ -158,7 +150,6 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, daemon=False,
     celeryd = WorkController(concurrency=concurrency,
                                loglevel=loglevel,
                                logfile=logfile,
-                               queue_wakeup_after=queue_wakeup_after,
                                is_detached=daemon)
     try:
         celeryd.run()
@@ -189,11 +180,6 @@ OPTION_LIST = (
     optparse.make_option('-p', '--pidfile', default=DAEMON_PID_FILE,
             action="store", dest="pidfile",
             help="Path to pidfile."),
-    optparse.make_option('-w', '--wakeup-after', default=QUEUE_WAKEUP_AFTER,
-            action="store", type="float", dest="queue_wakeup_after",
-            help="If the queue is empty, this is the time *in seconds* the "
-                 "daemon sleeps until it wakes up to check if there's any "
-                 "new messages on the queue."),
     optparse.make_option('-d', '--detach', '--daemon', default=False,
             action="store_true", dest="daemon",
             help="Run in the background as a daemon."),

+ 0 - 24
celery/conf.py

@@ -8,8 +8,6 @@ DEFAULT_AMQP_CONSUMER_ROUTING_KEY = "celery"
 DEFAULT_AMQP_CONSUMER_QUEUE = "celery"
 DEFAULT_AMQP_EXCHANGE_TYPE = "direct"
 DEFAULT_DAEMON_CONCURRENCY = 10
-DEFAULT_QUEUE_WAKEUP_AFTER = 0.1
-DEFAULT_EMPTY_MSG_EMIT_EVERY = 5
 DEFAULT_DAEMON_PID_FILE = "celeryd.pid"
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
 DEFAULT_DAEMON_LOG_LEVEL = "INFO"
@@ -61,28 +59,6 @@ DAEMON_LOG_FILE = getattr(settings, "CELERYD_LOG_FILE",
 DAEMON_LOG_LEVEL = LOG_LEVELS[getattr(settings, "CELERYD_DAEMON_LOG_LEVEL",
                                       DEFAULT_DAEMON_LOG_LEVEL).upper()]
 
-"""
-.. data:: QUEUE_WAKEUP_AFTER
-
-    The time (in seconds) the celery worker should sleep when there's
-    no messages left on the queue. After the time is slept, the worker
-    wakes up and checks the queue again.
-
-"""
-QUEUE_WAKEUP_AFTER = getattr(settings, "CELERYD_QUEUE_WAKEUP_AFTER",
-                             DEFAULT_QUEUE_WAKEUP_AFTER)
-
-"""
-.. data:: EMPTY_MSG_EMIT_EVERY
-
-    How often the celery daemon should write a log message saying there are no
-    messages in the queue. If this is ``None`` or ``0``, it will never print
-    this message.
-
-"""
-EMPTY_MSG_EMIT_EVERY = getattr(settings, "CELERYD_EMPTY_MSG_EMIT_EVERY",
-                               DEFAULT_EMPTY_MSG_EMIT_EVERY)
-
 """
 .. data:: DAEMON_PID_FILE
 

+ 0 - 4
celery/tests/test_conf.py

@@ -18,10 +18,6 @@ SETTING_VARS = (
         "DEFAULT_DAEMON_CONCURRENCY"),
     ("CELERYD_PID_FILE", "DAEMON_PID_FILE",
         "DEFAULT_DAEMON_PID_FILE"),
-    ("CELERYD_EMPTY_MSG_EMIT_EVERY", "EMPTY_MSG_EMIT_EVERY",
-        "DEFAULT_EMPTY_MSG_EMIT_EVERY"),
-    ("CELERYD_QUEUE_WAKEUP_AFTER", "QUEUE_WAKEUP_AFTER",
-        "DEFAULT_QUEUE_WAKEUP_AFTER"),
     ("CELERYD_LOG_FILE", "DAEMON_LOG_FILE",
         "DEFAULT_DAEMON_LOG_FILE"),
     ("CELERYD_DAEMON_LOG_FORMAT", "LOG_FORMAT",

+ 37 - 84
celery/worker.py

@@ -2,7 +2,6 @@
 from carrot.connection import DjangoAMQPConnection
 from celery.messaging import TaskConsumer
 from celery.conf import DAEMON_CONCURRENCY, DAEMON_LOG_FILE
-from celery.conf import QUEUE_WAKEUP_AFTER, EMPTY_MSG_EMIT_EVERY
 from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
 from celery.log import setup_logger
 from celery.registry import tasks
@@ -36,10 +35,6 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
-class EmptyQueue(Exception):
-    """The message queue is currently empty."""
-
-
 class UnknownTask(Exception):
     """Got an unknown task in the queue. The message is requeued and
     ignored."""
@@ -303,8 +298,6 @@ class WorkController(object):
 
     :param loglevel: see :attr:`loglevel`.
 
-    :param queue_wakeup_after: see :attr:`queue_wakeup_after`.
-
 
     .. attribute:: concurrency
 
@@ -320,18 +313,6 @@ class WorkController(object):
         The logfile used, if no logfile is specified it uses ``stderr``
         (default: :const:`celery.conf.DAEMON_LOG_FILE`).
 
-    .. attribute:: queue_wakeup_after
-
-        The time it takes for the daemon to wake up after the queue is empty,
-        so it can check for more work
-        (default: :const:`celery.conf.QUEUE_WAKEUP_AFTER`).
-
-    .. attribute:: empty_msg_emit_every
-
-        How often the daemon emits the ``"Waiting for queue..."`` message.
-        If this is ``None``, the message will never be logged.
-        (default: :const:`celery.conf.EMPTY_MSG_EMIT_EVERY`)
-
     .. attribute:: logger
 
         The :class:`logging.Logger` instance used for logging.
@@ -348,23 +329,24 @@ class WorkController(object):
     loglevel = logging.ERROR
     concurrency = DAEMON_CONCURRENCY
     logfile = DAEMON_LOG_FILE
-    queue_wakeup_after = QUEUE_WAKEUP_AFTER
-    empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
 
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
-            queue_wakeup_after=None, is_detached=False):
+            is_detached=False):
         self.loglevel = loglevel or self.loglevel
         self.concurrency = concurrency or self.concurrency
         self.logfile = logfile or self.logfile
-        self.queue_wakeup_after = queue_wakeup_after or \
-                                    self.queue_wakeup_after
         self.logger = setup_logger(loglevel, logfile)
         self.pool = TaskPool(self.concurrency, logger=self.logger)
         self.periodicworkcontroller = PeriodicWorkController()
-        self.task_consumer = None
-        self.task_consumer_it = None
         self.is_detached = is_detached
-        self.reset_connection()
+        self.amqp_connection = None
+        self.task_consumer = None
+
+    def close_connection(self):
+        if self.task_consumer:
+            self.task_consumer.close()
+        if self.amqp_connection:
+            self.amqp_connection.close()
 
     def reset_connection(self):
         """Reset the AMQP connection, and reinitialize the
@@ -373,11 +355,10 @@ class WorkController(object):
         Resets the task consumer in :attr:`task_consumer`.
 
         """
-        if self.task_consumer:
-            self.task_consumer.connection.close()
-        amqp_connection = DjangoAMQPConnection()
-        self.task_consumer = TaskConsumer(connection=amqp_connection)
-        self.task_consumer_it = self.task_consumer.iterqueue(infinite=True)
+        self.close_connection()
+        self.amqp_connection = DjangoAMQPConnection()
+        self.task_consumer = TaskConsumer(connection=self.amqp_connection)
+        return self.task_consumer
 
     def connection_diagnostics(self):
         """Diagnose the AMQP connection, and reset connection if
@@ -389,19 +370,24 @@ class WorkController(object):
                     "AMQP Connection has died, restoring connection.")
             self.reset_connection()
 
-    def receive_message(self):
-        """Receive the next message from the message broker.
-
-        Tries to reset the AMQP connection if not available.
-        Returns ``None`` if no message is waiting on the queue.
-
-        :rtype: :class:`carrot.messaging.Message` instance.
-
-        """
-        message = self.task_consumer_it.next()
-        if not message:
-            raise EmptyQueue()
-        return message
+    def _message_callback(self, message_data, message):
+        try:
+            try:
+                self.process_task(message)
+            except ValueError:
+                # execute_next_task didn't return a r/name/id tuple,
+                # probably because it got an exception.
+                pass
+            except UnknownTask, exc:
+                self.logger.info("Unknown task ignored: %s" % (exc))
+            except Exception, exc:
+                self.logger.critical("Message queue raised %s: %s\n%s" % (
+                                exc.__class__, exc, traceback.format_exc()))
+            except:
+                self.shutdown()
+                raise
+        except (SystemExit, KeyboardInterrupt):
+            self.shutdown()
 
     def process_task(self, message):
         """Process task message by passing it to the pool of workers."""
@@ -417,31 +403,17 @@ class WorkController(object):
 
         return result
 
-    def execute_next_task(self):
-        """Execute the next task on the queue using the multiprocessing pool.
-
-        Catches all exceptions and logs them with level
-        :const:`logging.CRITICAL`.
-
-        Raises :exc:`EmptyQueue` exception if there is no message
-        waiting on the queue.
-
-        """
-        self.process_task(self.receive_message())
-
-    def schedule_retry_tasks(self):
-        """Reschedule all requeued tasks waiting for retry."""
-        pass
-    
     def shutdown(self):
         # shut down the periodic work controller thread
         self.periodicworkcontroller.stop()
         self.pool.terminate()
+        self.close_connection()
 
     def run(self):
         """Starts the workers main loop."""
-        log_wait = lambda: self.logger.debug("Waiting for queue...")
-        ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
+        task_consumer = self.reset_connection()
+        task_consumer.register_callback(self._message_callback)
+        it = task_consumer.iterconsume(limit=None)
 
         self.pool.run()
         self.periodicworkcontroller.start()
@@ -455,26 +427,7 @@ class WorkController(object):
                 time.sleep(1)
         
         try:
-            while True:
-                try:
-                    self.execute_next_task()
-                except ValueError:
-                    # execute_next_task didn't return a r/name/id tuple,
-                    # probably because it got an exception.
-                    continue
-                except EmptyQueue:
-                    ev_msg_waiting.tick()
-                    time.sleep(self.queue_wakeup_after)
-                    continue
-                except UnknownTask, exc:
-                    self.logger.info("Unknown task ignored: %s" % (exc))
-                    continue
-                except Exception, exc:
-                    self.logger.critical("Message queue raised %s: %s\n%s" % (
-                                 exc.__class__, exc, traceback.format_exc()))
-                    continue
-                except:
-                    self.shutdown()
-                    raise
+            while True: 
+                it.next()
         except (SystemExit, KeyboardInterrupt):
             self.shutdown()