Browse Source

Cosmetics

Ask Solem 14 years ago
parent
commit
8e915e543a

+ 3 - 1
celery/datastructures.py

@@ -206,9 +206,10 @@ def consume_queue(queue):
         []
         []
 
 
     """
     """
+    get = queue.get_nowait
     while 1:
     while 1:
         try:
         try:
-            yield queue.get_nowait()
+            yield get()
         except Empty:
         except Empty:
             break
             break
 
 
@@ -225,6 +226,7 @@ class LimitedSet(object):
     :keyword expires: Time in seconds, before a membership expires.
     :keyword expires: Time in seconds, before a membership expires.
 
 
     """
     """
+    __slots__ = ("maxlen", "expires", "_data")
 
 
     def __init__(self, maxlen=None, expires=None):
     def __init__(self, maxlen=None, expires=None):
         self.maxlen = maxlen
         self.maxlen = maxlen

+ 4 - 1
celery/loaders/base.py

@@ -94,7 +94,10 @@ class BaseLoader(object):
     def config_from_object(self, obj, silent=False):
     def config_from_object(self, obj, silent=False):
         if isinstance(obj, basestring):
         if isinstance(obj, basestring):
             try:
             try:
-                obj = self.import_from_cwd(obj)
+                if "." in obj:
+                    obj = get_cls_by_name(obj, imp=self.import_from_cwd)
+                else:
+                    obj = self.import_from_cwd(obj)
             except (ImportError, AttributeError):
             except (ImportError, AttributeError):
                 if silent:
                 if silent:
                     return False
                     return False

+ 1 - 1
celery/log.py

@@ -314,7 +314,7 @@ class SilenceRepeated(object):
         self._iterations = 0
         self._iterations = 0
 
 
     def __call__(self, *msgs):
     def __call__(self, *msgs):
-        if self._iterations >= self.max_iterations:
+        if not self._iterations or self._iterations >= self.max_iterations:
             for msg in msgs:
             for msg in msgs:
                 self.action(msg)
                 self.action(msg)
             self._iterations = 0
             self._iterations = 0

+ 1 - 0
celery/worker/__init__.py

@@ -285,6 +285,7 @@ class WorkController(object):
 
 
         if self._state != self.RUN or self._running != len(self.components):
         if self._state != self.RUN or self._running != len(self.components):
             # Not fully started, can safely exit.
             # Not fully started, can safely exit.
+            self._state = self.TERMINATE
             return
             return
 
 
         self._state = self.CLOSE
         self._state = self.CLOSE

+ 5 - 3
celery/worker/buckets.py

@@ -297,10 +297,12 @@ class TokenBucketQueue(object):
     def wait(self, block=False):
     def wait(self, block=False):
         """Wait until a token can be retrieved from the bucket and return
         """Wait until a token can be retrieved from the bucket and return
         the next item."""
         the next item."""
-        while True:
-            remaining = self.expected_time()
+        get = self.get
+        expected_time = self.expected_time
+        while 1:
+            remaining = expected_time()
             if not remaining:
             if not remaining:
-                return self.get(block=block)
+                return get(block=block)
             sleep(remaining)
             sleep(remaining)
 
 
     def expected_time(self, tokens=1):
     def expected_time(self, tokens=1):

+ 34 - 44
celery/worker/job.py

@@ -9,15 +9,14 @@ import warnings
 from datetime import datetime
 from datetime import datetime
 
 
 from celery import current_app
 from celery import current_app
+from celery import exceptions
 from celery import platforms
 from celery import platforms
+from celery import registry
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
-from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
-from celery.exceptions import WorkerLostError, RetryTaskError
 from celery.execute.trace import TaskTrace
 from celery.execute.trace import TaskTrace
-from celery.registry import tasks
-from celery.utils import noop, kwdict, fun_takes_kwargs
-from celery.utils import get_symbol_by_name, truncate_text
+from celery.utils import (noop, kwdict, fun_takes_kwargs,
+                          get_symbol_by_name, truncate_text)
 from celery.utils.encoding import safe_repr, safe_str
 from celery.utils.encoding import safe_repr, safe_str
 from celery.utils.timeutils import maybe_iso8601
 from celery.utils.timeutils import maybe_iso8601
 from celery.worker import state
 from celery.worker import state
@@ -68,7 +67,7 @@ class WorkerTaskTrace(TaskTrace):
     If the call was successful, it saves the result to the task result
     If the call was successful, it saves the result to the task result
     backend, and sets the task status to `"SUCCESS"`.
     backend, and sets the task status to `"SUCCESS"`.
 
 
-    If the call raises :exc:`celery.exceptions.RetryTaskError`, it extracts
+    If the call raises :exc:`~celery.exceptions.RetryTaskError`, it extracts
     the original exception, uses that as the result and sets the task status
     the original exception, uses that as the result and sets the task status
     to `"RETRY"`.
     to `"RETRY"`.
 
 
@@ -269,14 +268,14 @@ class TaskRequest(object):
         self.expires = expires
         self.expires = expires
         self.chord = chord
         self.chord = chord
         self.on_ack = on_ack
         self.on_ack = on_ack
-        self.delivery_info = delivery_info or {}
+        self.delivery_info = {} if delivery_info is None else delivery_info
         self.hostname = hostname or socket.gethostname()
         self.hostname = hostname or socket.gethostname()
         self.logger = logger or self.app.log.get_default_logger()
         self.logger = logger or self.app.log.get_default_logger()
         self.eventer = eventer
         self.eventer = eventer
         self.email_subject = email_subject or self.email_subject
         self.email_subject = email_subject or self.email_subject
         self.email_body = email_body or self.email_body
         self.email_body = email_body or self.email_body
 
 
-        self.task = tasks[self.task_name]
+        self.task = registry.tasks[self.task_name]
         self._store_errors = True
         self._store_errors = True
         if self.task.ignore_result:
         if self.task.ignore_result:
             self._store_errors = self.task.store_errors_even_if_ignored
             self._store_errors = self.task.store_errors_even_if_ignored
@@ -306,19 +305,13 @@ class TaskRequest(object):
                    retries=body.get("retries", 0),
                    retries=body.get("retries", 0),
                    eta=maybe_iso8601(body.get("eta")),
                    eta=maybe_iso8601(body.get("eta")),
                    expires=maybe_iso8601(body.get("expires")),
                    expires=maybe_iso8601(body.get("expires")),
-                   on_ack=on_ack,
-                   delivery_info=delivery_info,
-                   **kw)
+                   on_ack=on_ack, delivery_info=delivery_info, **kw)
 
 
     def get_instance_attrs(self, loglevel, logfile):
     def get_instance_attrs(self, loglevel, logfile):
-        return {"logfile": logfile,
-                "loglevel": loglevel,
-                "id": self.task_id,
-                "taskset": self.taskset_id,
-                "retries": self.retries,
-                "is_eager": False,
-                "delivery_info": self.delivery_info,
-                "chord": self.chord}
+        return {"logfile": logfile, "loglevel": loglevel,
+                "id": self.task_id, "taskset": self.taskset_id,
+                "retries": self.retries, "is_eager": False,
+                "delivery_info": self.delivery_info, "chord": self.chord}
 
 
     def extend_with_default_kwargs(self, loglevel, logfile):
     def extend_with_default_kwargs(self, loglevel, logfile):
         """Extend the tasks keyword arguments with standard task arguments.
         """Extend the tasks keyword arguments with standard task arguments.
@@ -328,6 +321,9 @@ class TaskRequest(object):
 
 
         See :meth:`celery.task.base.Task.run` for more information.
         See :meth:`celery.task.base.Task.run` for more information.
 
 
+        Magic keyword arguments are deprecated and will be removed
+        in version 3.0.
+
         """
         """
         if not self.task.accept_magic_kwargs:
         if not self.task.accept_magic_kwargs:
             return self.kwargs
             return self.kwargs
@@ -451,11 +447,11 @@ class TaskRequest(object):
         if soft:
         if soft:
             self.logger.warning("Soft time limit (%ss) exceeded for %s[%s]" % (
             self.logger.warning("Soft time limit (%ss) exceeded for %s[%s]" % (
                 timeout, self.task_name, self.task_id))
                 timeout, self.task_name, self.task_id))
-            exc = SoftTimeLimitExceeded(timeout)
+            exc = exceptions.SoftTimeLimitExceeded(timeout)
         else:
         else:
             self.logger.error("Hard time limit (%ss) exceeded for %s[%s]" % (
             self.logger.error("Hard time limit (%ss) exceeded for %s[%s]" % (
                 timeout, self.task_name, self.task_id))
                 timeout, self.task_name, self.task_id))
-            exc = TimeLimitExceeded(timeout)
+            exc = exceptions.TimeLimitExceeded(timeout)
 
 
         if self._store_errors:
         if self._store_errors:
             self.task.backend.mark_as_failure(self.task_id, exc)
             self.task.backend.mark_as_failure(self.task_id, exc)
@@ -472,10 +468,10 @@ class TaskRequest(object):
                         result=safe_repr(ret_value), runtime=runtime)
                         result=safe_repr(ret_value), runtime=runtime)
 
 
         self.logger.info(self.success_msg.strip() % {
         self.logger.info(self.success_msg.strip() % {
-                "id": self.task_id,
-                "name": self.task_name,
-                "return_value": self.repr_result(ret_value),
-                "runtime": runtime})
+                            "id": self.task_id,
+                            "name": self.task_name,
+                            "return_value": self.repr_result(ret_value),
+                            "runtime": runtime})
 
 
     def on_retry(self, exc_info):
     def on_retry(self, exc_info):
         """Handler called if the task should be retried."""
         """Handler called if the task should be retried."""
@@ -484,9 +480,9 @@ class TaskRequest(object):
                          traceback=safe_repr(exc_info.traceback))
                          traceback=safe_repr(exc_info.traceback))
 
 
         self.logger.info(self.retry_msg.strip() % {
         self.logger.info(self.retry_msg.strip() % {
-                "id": self.task_id,
-                "name": self.task_name,
-                "exc": safe_repr(exc_info.exception.exc)})
+                            "id": self.task_id,
+                            "name": self.task_name,
+                            "exc": safe_repr(exc_info.exception.exc)})
 
 
     def on_failure(self, exc_info):
     def on_failure(self, exc_info):
         """Handler called if the task raised an exception."""
         """Handler called if the task raised an exception."""
@@ -495,15 +491,14 @@ class TaskRequest(object):
         if self.task.acks_late:
         if self.task.acks_late:
             self.acknowledge()
             self.acknowledge()
 
 
-        if isinstance(exc_info.exception, RetryTaskError):
+        if isinstance(exc_info.exception, exceptions.RetryTaskError):
             return self.on_retry(exc_info)
             return self.on_retry(exc_info)
 
 
         # This is a special case as the process would not have had
         # This is a special case as the process would not have had
         # time to write the result.
         # time to write the result.
-        if isinstance(exc_info.exception, WorkerLostError):
-            if self._store_errors:
-                self.task.backend.mark_as_failure(self.task_id,
-                                                  exc_info.exception)
+        if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
+                self._store_errors:
+            self.task.backend.mark_as_failure(self.task_id, exc_info.exception)
 
 
         self.send_event("task-failed", uuid=self.task_id,
         self.send_event("task-failed", uuid=self.task_id,
                          exception=safe_repr(exc_info.exception),
                          exception=safe_repr(exc_info.exception),
@@ -523,7 +518,7 @@ class TaskRequest(object):
                                           "name": self.task_name,
                                           "name": self.task_name,
                                           "hostname": self.hostname}})
                                           "hostname": self.hostname}})
 
 
-        task_obj = tasks.get(self.task_name, object)
+        task_obj = registry.tasks.get(self.task_name, object)
         self.send_error_email(task_obj, context, exc_info.exception,
         self.send_error_email(task_obj, context, exc_info.exception,
                               enabled=task_obj.send_error_emails,
                               enabled=task_obj.send_error_emails,
                               whitelist=task_obj.error_whitelist)
                               whitelist=task_obj.error_whitelist)
@@ -549,16 +544,10 @@ class TaskRequest(object):
         return truncate_text(safe_repr(result), maxlen)
         return truncate_text(safe_repr(result), maxlen)
 
 
     def info(self, safe=False):
     def info(self, safe=False):
-        args = self.args
-        kwargs = self.kwargs
-        if not safe:
-            args = safe_repr(args)
-            kwargs = safe_repr(self.kwargs)
-
         return {"id": self.task_id,
         return {"id": self.task_id,
                 "name": self.task_name,
                 "name": self.task_name,
-                "args": args,
-                "kwargs": kwargs,
+                "args": self.args if safe else safe_repr(self.args),
+                "kwargs": self.kwargs if safe else safe_repr(self.kwargs),
                 "hostname": self.hostname,
                 "hostname": self.hostname,
                 "time_start": self.time_start,
                 "time_start": self.time_start,
                 "acknowledged": self.acknowledged,
                 "acknowledged": self.acknowledged,
@@ -569,8 +558,9 @@ class TaskRequest(object):
         return "%s[%s]%s%s" % (
         return "%s[%s]%s%s" % (
                     self.task_name,
                     self.task_name,
                     self.task_id,
                     self.task_id,
-                    self.eta and " eta:[%s]" % (self.eta, ) or "",
-                    self.expires and " expires:[%s]" % (self.expires, ) or "")
+                    " eta:[%s]" % (self.eta, ) if self.eta else "",
+                    " expires:[%s]" % (self.expires, ) if self.expires else "")
+    __str__ = shortinfo
 
 
     def __repr__(self):
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (

+ 2 - 7
celery/worker/mediator.py

@@ -1,8 +1,3 @@
-"""
-
-Worker Controller Threads
-
-"""
 import os
 import os
 import sys
 import sys
 import threading
 import threading
@@ -14,7 +9,7 @@ from celery.app import app_or_default
 
 
 
 
 class Mediator(threading.Thread):
 class Mediator(threading.Thread):
-    """Thread continuously moving tasks from the ready queue into the pool."""
+    """Thread continuously moving tasks from the ready queue onto the pool."""
 
 
     #: The task queue, a :class:`~Queue.Queue` instance.
     #: The task queue, a :class:`~Queue.Queue` instance.
     ready_queue = None
     ready_queue = None
@@ -57,7 +52,7 @@ class Mediator(threading.Thread):
                                               "hostname": task.hostname}})
                                               "hostname": task.hostname}})
 
 
     def run(self):
     def run(self):
-        """Move tasks forver or until :meth:`stop` is called."""
+        """Move tasks until :meth:`stop` is called."""
         while not self._shutdown.isSet():
         while not self._shutdown.isSet():
             try:
             try:
                 self.move()
                 self.move()