Browse Source

Some documentation and nitpicking.

Ask Solem 15 years ago
parent
commit
4041250e89

+ 5 - 2
celery/beat.py

@@ -186,13 +186,16 @@ class ClockService(object):
                 synced[0] = True
                 self._stopped.set()
 
+        silence = self.max_interval < 60 and 10 or 1
+        debug = log.SilenceRepeated(self.logger.debug, max_iterations=silence)
+
         try:
             while True:
                 if self._shutdown.isSet():
                     break
                 interval = scheduler.tick()
-                self.logger.debug("ClockService: Waking up %s." % (
-                    humanize_seconds(interval, prefix="in ")))
+                debug("ClockService: Waking up %s." % (
+                        humanize_seconds(interval, prefix="in ")))
                 time.sleep(interval)
         except (KeyboardInterrupt, SystemExit):
             _stop()

+ 21 - 0
celery/log.py

@@ -10,6 +10,11 @@ from celery.patch import monkeypatch
 
 
 def get_default_logger(loglevel=None):
+    """Get default logger instance.
+
+    :keyword loglevel: Initial log level.
+
+    """
     from multiprocessing.util import get_logger
     logger = get_logger()
     loglevel is not None and logger.setLevel(loglevel)
@@ -153,3 +158,19 @@ class LoggingProxy(object):
 
     def fileno(self):
         return None
+
+
+class SilenceRepeated(object):
+    """Only log action every n iterations."""
+
+    def __init__(self, action, max_iterations=10):
+        self.action = action
+        self.max_iterations = max_iterations
+        self._iterations = 0
+
+    def __call__(self, *msgs):
+        if self._iterations >= self.max_iterations:
+            map(self.action, msgs)
+            self._iterations = 0
+        else:
+            self._iterations += 1

+ 4 - 2
celery/worker/__init__.py

@@ -122,7 +122,8 @@ class WorkController(object):
         self.logger.debug("Instantiating thread components...")
 
         # Threads+Pool
-        self.schedule_controller = ScheduleController(self.eta_scheduler)
+        self.schedule_controller = ScheduleController(self.eta_scheduler,
+                                                      logger=self.logger)
         self.pool = TaskPool(self.concurrency, logger=self.logger,
                              initializer=process_initializer)
         self.broker_listener = CarrotListener(self.ready_queue,
@@ -130,7 +131,8 @@ class WorkController(object):
                                         logger=self.logger,
                                         send_events=send_events,
                                         initial_prefetch_count=concurrency)
-        self.mediator = Mediator(self.ready_queue, self.safe_process_task)
+        self.mediator = Mediator(self.ready_queue, self.safe_process_task,
+                                 logger=self.logger)
 
         self.clockservice = None
         if self.embed_clockservice:

+ 1 - 3
celery/worker/buckets.py

@@ -5,9 +5,7 @@ RATE_MODIFIER_MAP = {"s": lambda n: n,
                      "m": lambda n: n / 60.0,
                      "h": lambda n: n / 60.0 / 60.0}
 
-BASE_IDENTIFIERS = {"0x": 16,
-                    "0o": 8,
-                    "0b": 2}
+BASE_IDENTIFIERS = {"0x": 16, "0o": 8, "0b": 2}
 
 
 class RateLimitExceeded(Exception):

+ 53 - 7
celery/worker/control.py

@@ -1,24 +1,44 @@
-from celery.worker.revoke import revoked
+import socket
+
+from celery import log
 from celery.registry import tasks
+from celery.worker.revoke import revoked
 
 
 def expose(fun):
+    """Expose method as a celery worker control command, allowed to be called
+    from a message."""
     fun.exposed = True
     return fun
 
 
 class Control(object):
+    """The worker control panel.
+
+    :param logger: The current logger to use.
 
-    def __init__(self, logger):
+    """
+
+    def __init__(self, logger, hostname=None):
         self.logger = logger
+        self.hostname = hostname or socket.gethostname()
 
     @expose
     def revoke(self, task_id, **kwargs):
+        """Revoke task by task id."""
         revoked.add(task_id)
         self.logger.warn("Task %s revoked." % task_id)
 
     @expose
-    def rate_limit(self, task_name, rate_limit):
+    def rate_limit(self, task_name, rate_limit, **kwargs):
+        """Set new rate limit for a task type.
+
+        See :attr:`celery.task.base.Task.rate_limit`.
+
+        :param task_name: Type of task.
+        :param rate_limit: New rate limit.
+
+        """
         try:
             tasks[task_name].rate_limit = rate_limit
         except KeyError:
@@ -33,13 +53,39 @@ class Control(object):
 
 
 class ControlDispatch(object):
+    """Execute worker control panel commands."""
+
     panel_cls = Control
 
-    def __init__(self, logger):
-        self.logger = logger
-        self.panel = self.panel_cls(self.logger)
+    def __init__(self, logger, hostname=None):
+        self.logger = logger or log.get_default_logger()
+        self.hostname = hostname
+        self.panel = self.panel_cls(self.logger, hostname=self.hostname)
+
+    def dispatch_from_message(self, message):
+        """Dispatch by using message data received by the broker.
+
+        Example:
+
+            >>> def receive_message(message_data, message):
+            ...     control = message_data.get("control")
+            ...     if control:
+            ...         ControlDispatch().dispatch_from_message(control)
+
+        """
+        message = dict(message) # don't modify callers message.
+        command = message.pop("command")
+        destination = message.pop("destination", None)
+        if not destination or destination == self.hostname:
+            return self.execute(command, message)
+
+    def execute(self, command, kwargs):
+        """Execute control command by name and keyword arguments.
+
+        :param command: Name of the command to execute.
+        :param kwargs: Keyword arguments.
 
-    def dispatch(self, command, kwargs):
+        """
         control = None
         try:
             control = getattr(self.panel, command)

+ 15 - 21
celery/worker/controllers.py

@@ -5,10 +5,10 @@ Worker Controller Threads
 """
 import time
 import threading
-from Queue import Empty as QueueEmpty
 from datetime import datetime
+from Queue import Empty as QueueEmpty
 
-from celery.log import get_default_logger
+from celery import log
 from celery.worker.revoke import revoked
 
 
@@ -78,14 +78,14 @@ class Mediator(BackgroundThread):
 
     """
 
-    def __init__(self, ready_queue, callback):
+    def __init__(self, ready_queue, callback, logger=None):
         super(Mediator, self).__init__()
+        self.logger = logger or log.get_default_logger()
         self.ready_queue = ready_queue
         self.callback = callback
 
     def on_iteration(self):
         """Get tasks from bucket queue and apply the task callback."""
-        logger = get_default_logger()
         try:
             # This blocks until there's a message in the queue.
             task = self.ready_queue.get(timeout=1)
@@ -94,38 +94,32 @@ class Mediator(BackgroundThread):
         else:
             if task.task_id in revoked: # task revoked
                 task.on_ack()
-                logger.warn("Mediator: Skipping revoked task: %s[%s]" % (
-                    task.task_name, task.task_id))
+                self.logger.warn("Mediator: Skipping revoked task: %s[%s]" % (
+                        task.task_name, task.task_id))
                 return
 
-            logger.debug("Mediator: Running callback for task: %s[%s]" % (
-                task.task_name, task.task_id))
+            self.logger.debug(
+                    "Mediator: Running callback for task: %s[%s]" % (
+                        task.task_name, task.task_id))
             self.callback(task) # execute
 
 
 class ScheduleController(BackgroundThread):
     """Schedules tasks with an ETA by moving them to the bucket queue."""
 
-    def __init__(self, eta_schedule):
+    def __init__(self, eta_schedule, logger=None):
         super(ScheduleController, self).__init__()
+        self.logger = logger or log.get_default_logger()
         self._scheduler = iter(eta_schedule)
-        self.iterations = 0
 
     def on_iteration(self):
         """Wake-up scheduler"""
-        logger = get_default_logger()
+        debug = log.SilenceRepeated(self.logger.debug, max_iterations=10)
         delay = self._scheduler.next()
         debug_log = True
         if delay is None:
             delay = 1
-            if self.iterations == 10:
-                self.iterations = 0
-            else:
-                debug_log = False
-                self.iterations += 1
-        if debug_log:
-            logger.debug("ScheduleController: Scheduler wake-up")
-            logger.debug(
-                "ScheduleController: Next wake-up eta %s seconds..." % (
-                    delay))
+
+        debug("ScheduleController: Scheduler wake-up",
+              "ScheduleController: Next wake-up eta %s seconds..." % delay)
         time.sleep(delay)

+ 15 - 5
celery/worker/heartbeat.py

@@ -3,12 +3,23 @@ from time import time, sleep
 
 
 class Heart(threading.Thread):
-    interval = 60
+    """Thread sending heartbeats at an interval.
+
+    :param eventer: Event dispatcher used to send the event.
+    :keyword interval: Time in seconds between heartbeats.
+        Default is 2 minutes.
+
+    .. attribute:: bpm
+
+        Beats per minute.
+
+    """
+    bpm = 0.5
 
     def __init__(self, eventer, interval=None):
         super(Heart, self).__init__()
         self.eventer = eventer
-        self.interval = interval or self.interval
+        self.bpm = interval and interval / 60.0 or self.bpm
         self._shutdown = threading.Event()
         self._stopped = threading.Event()
         self.setDaemon(True)
@@ -16,12 +27,11 @@ class Heart(threading.Thread):
 
     def run(self):
         self._state = "RUN"
-        interval = self.interval
+        bpm = self.bpm
         dispatch = self.eventer.send
 
         dispatch("worker-online")
 
-
         # We can't sleep all of the interval, because then
         # it takes 60 seconds (or value of interval) to shutdown
         # the thread.
@@ -31,7 +41,7 @@ class Heart(threading.Thread):
             if self._shutdown.isSet():
                 break
             now = time()
-            if not last_beat or now > last_beat + interval:
+            if not last_beat or now > last_beat + (60.0 / bpm):
                 last_beat = now
                 dispatch("worker-heartbeat")
             sleep(1)

+ 4 - 9
celery/worker/listener.py

@@ -50,12 +50,13 @@ class CarrotListener(object):
         self.eta_scheduler = eta_scheduler
         self.send_events = send_events
         self.logger = logger
-        self.control_dispatch = ControlDispatch(logger=logger)
+        self.hostname = socket.gethostname()
+        self.control_dispatch = ControlDispatch(logger=logger,
+                                                hostname=self.hostname)
         self.prefetch_count = SharedCounter(initial_prefetch_count)
         self.event_dispatcher = None
         self.heart = None
         self._state = None
-        self.hostname = socket.gethostname()
 
     def start(self):
         """Start the consumer.
@@ -92,12 +93,6 @@ class CarrotListener(object):
 
             wait_for_message()
 
-    def on_control_command(self, message):
-        command = message.pop("command")
-        destination = message.pop("destination", None)
-        if not destination or destination == self.hostname:
-            return self.control_dispatch.dispatch(command, message)
-
     def on_task(self, task, eta=None):
         """Handle received task.
 
@@ -147,7 +142,7 @@ class CarrotListener(object):
         # Handle control command
         control = message_data.get("control")
         if control:
-            self.on_control_command(control)
+            self.control_dispatch.dispatch_from_message(control)
         return
 
     def close_connection(self):

+ 6 - 4
celery/worker/pool.py

@@ -14,8 +14,8 @@ from celery.datastructures import ExceptionInfo
 class TaskPool(object):
     """Process Pool for processing tasks in parallel.
 
-    :param limit: see :attr:`limit` attribute.
-    :param logger: see :attr:`logger` attribute.
+    :param limit: see :attr:`limit`.
+    :param logger: see :attr:`logger`.
 
 
     .. attribute:: limit
@@ -28,10 +28,11 @@ class TaskPool(object):
 
     """
 
-    def __init__(self, limit, logger=None, initializer=None):
+    def __init__(self, limit, logger=None, initializer=None, initargs=None):
         self.limit = limit
         self.logger = logger or log.get_default_logger()
         self.initializer = initializer
+        self.initargs = initargs
         self._pool = None
 
     def start(self):
@@ -41,7 +42,8 @@ class TaskPool(object):
 
         """
         self._pool = DynamicPool(processes=self.limit,
-                                 initializer=self.initializer)
+                                 initializer=self.initializer,
+                                 initargs=self.initargs)
 
     def stop(self):
         """Terminate the pool."""

+ 12 - 2
celery/worker/revoke.py

@@ -1,6 +1,16 @@
 from celery.datastructures import LimitedSet
 
-REVOKES_MAX = 1000
-REVOKE_EXPIRES = 60 * 60 # one hour.
+REVOKES_MAX = 10000
+REVOKE_EXPIRES = 3600 # One hour.
 
+"""
+
+.. data:: revoked
+
+    A :class:`celery.datastructures.LimitedSet` containing revoked task ids.
+
+    Items expire after one hour, and the structure can only hold
+    10000 expired items at a time (about 300kb).
+
+"""
 revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)

+ 15 - 0
celery/worker/scheduler.py

@@ -3,12 +3,26 @@ import heapq
 
 
 class Scheduler(object):
+    """ETA scheduler.
+
+    :param ready_queue: Queue to move items ready for processing.
+
+    """
 
     def __init__(self, ready_queue):
         self.ready_queue = ready_queue
         self._queue = []
 
     def enter(self, item, eta=None, priority=0, callback=None):
+        """Enter item into the scheduler.
+
+        :param item: Item to enter.
+        :param eta: Scheduled time as a :class:`datetime.datetime` object.
+        :param priority: Unused.
+        :param callback: Callback called when the item is scheduled.
+            This callback takes no arguments.
+
+        """
         eta = time.mktime(eta.timetuple()) if eta else time.time()
         heapq.heappush(self._queue, (eta, priority, item, callback))
 
@@ -40,6 +54,7 @@ class Scheduler(object):
             yield None
 
     def empty(self):
+        """Is the schedule empty?"""
         return not self._queue
 
     @property