Browse Source

More cleanup (docstrings etc)

Ask Solem 15 years ago
parent
commit
54af6cb491
6 changed files with 41 additions and 25 deletions
  1. 4 11
      celery/log.py
  2. 2 2
      celery/task/base.py
  3. 15 2
      celery/utils/__init__.py
  4. 4 0
      celery/utils/info.py
  5. 9 3
      celery/utils/patch.py
  6. 7 7
      celery/worker/scheduler.py

+ 4 - 11
celery/log.py

@@ -7,18 +7,11 @@ import traceback
 
 from celery import conf
 from celery.utils import noop
+from celery.utils.patch import ensure_process_aware_logger
 
 _hijacked = False
 _monkeypatched = False
 
-def _ensure_process_aware_logger():
-    global _monkeypatched
-
-    if not _monkeypatched:
-        from celery.utils.patch import monkeypatch
-        monkeypatch()
-        _monkeypatched = True
-
 
 def _hijack_multiprocessing_logger():
     from multiprocessing import util as mputil
@@ -27,7 +20,7 @@ def _hijack_multiprocessing_logger():
     if _hijacked:
         return mputil.get_logger()
 
-    _ensure_process_aware_logger()
+    ensure_process_aware_logger()
 
     logging.Logger.manager.loggerDict.clear()
 
@@ -62,9 +55,9 @@ def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
     """
 
     logger = get_default_logger(loglevel=loglevel)
-    if logger.handlers:
-        # Logger already configured
+    if logger.handlers: # Logger already configured
         return logger
+
     if logfile:
         handler = logging.FileHandler
         if hasattr(logfile, "write"):

+ 2 - 2
celery/task/base.py

@@ -7,7 +7,7 @@ from billiard.serialization import pickle
 
 from celery import conf
 from celery.log import setup_logger
-from celery.utils import gen_unique_id, mexpand, timedelta_seconds
+from celery.utils import gen_unique_id, padlist, timedelta_seconds
 from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
 from celery.execute import apply_async, apply
 from celery.registry import tasks
@@ -559,7 +559,7 @@ class TaskSet(object):
         return result
 
     def apply_part(self, arglist, taskset_id, publisher):
-        args, kwargs, opts = mexpand(arglist, 3, default={})
+        args, kwargs, opts = padlist(arglist, 3, default={})
         return apply_async(self.task, args, kwargs,
                            taskset_id=taskset_id, publisher=publisher, **opts)
 

+ 15 - 2
celery/utils/__init__.py

@@ -61,7 +61,20 @@ def gen_unique_id():
     return str(uuid4())
 
 
-def mexpand(container, size, default=None):
+def padlist(container, size, default=None):
+    """Pad list with default elements.
+
+    Examples:
+
+        >>> first, last, city = padlist(["George", "Constanza", "NYC"], 3)
+        ("George", "Constanza", "NYC")
+        >>> first, last, city = padlist(["George", "Constanza"], 3)
+        ("George", "Constanza", None)
+        >>> first, last, city, planet = padlist(["George", "Constanza",
+                                                 "NYC"], 4, default="Earth")
+        ("George", "Constanza", "NYC", "Earth")
+
+    """
     return container[:size] + [default] * (size - len(container))
 
 
@@ -89,7 +102,7 @@ def repeatlast(it):
     yield the last value infinitely."""
     for item in it:
         yield item
-    for item in repeat(item): # pragma: no cover
+    while 1: # pragma: no cover
         yield item
 
 

+ 4 - 0
celery/utils/info.py

@@ -43,16 +43,20 @@ def format_routing_table(table=None, indent=0):
 def format_broker_info():
     """Get message broker connection info string for log dumps."""
     broker_connection = establish_connection()
+
     carrot_backend = broker_connection.backend_cls
     if carrot_backend and not isinstance(carrot_backend, str):
         carrot_backend = carrot_backend.__name__
     carrot_backend = carrot_backend or "amqp"
+
     port = broker_connection.port or \
                 broker_connection.get_backend_cls().default_port
     port = port and ":%s" % port or ""
+
     vhost = broker_connection.virtual_host
     if not vhost.startswith("/"):
         vhost = "/" + vhost
+
     return BROKER_FORMAT % {"carrot_backend": carrot_backend,
                             "userid": broker_connection.userid,
                             "host": broker_connection.hostname,

+ 9 - 3
celery/utils/patch.py

@@ -1,7 +1,9 @@
 import logging
 
+_process_aware = False
 
-def _check_logger_class():
+
+def _patch_logger_class():
     """Make sure process name is recorded when loggers are used."""
 
     from multiprocessing.process import current_process
@@ -22,5 +24,9 @@ def _check_logger_class():
         logging._releaseLock()
 
 
-def monkeypatch():
-    _check_logger_class()
+def ensure_process_aware_logger():
+    global _process_aware
+
+    if not _process_aware:
+        _patch_logger_class()
+        _process_aware = True

+ 7 - 7
celery/worker/scheduler.py

@@ -19,7 +19,7 @@ class Scheduler(object):
         :param item: Item to enter.
         :param eta: Scheduled time as a :class:`datetime.datetime` object.
         :param priority: Unused.
-        :param callback: Callback called when the item is scheduled.
+        :param callback: Callback to call when the item is scheduled.
             This callback takes no arguments.
 
         """
@@ -30,27 +30,27 @@ class Scheduler(object):
         """The iterator yields the time to sleep for between runs."""
 
         # localize variable access
-        q = self._queue
+        heap = self._queue
         nowfun = time.time
         pop = heapq.heappop
         ready_queue = self.ready_queue
 
-        while True:
-            if q:
-                eta, priority, item, callback = verify = q[0]
+        while 1:
+            if heap:
+                eta, priority, item, callback = verify = heap[0]
                 now = nowfun()
 
                 if now < eta:
                     yield eta - now
                 else:
-                    event = pop(q)
+                    event = pop(heap)
 
                     if event is verify: # pragma: no cover
                         ready_queue.put(item)
                         callback and callback()
                         yield 0
                     else:
-                        heapq.heappush(q, event)
+                        heapq.heappush(heap, event)
             yield None
 
     def empty(self):