瀏覽代碼

Some cosmetic changes.

Ask Solem 15 年之前
父節點
當前提交
44dbf5b6ac

+ 5 - 0
celery/beat.py

@@ -1,3 +1,8 @@
+"""
+
+Periodic Task Scheduler
+
+"""
 import time
 import time
 import shelve
 import shelve
 import threading
 import threading

+ 25 - 23
celery/datastructures.py

@@ -27,6 +27,8 @@ class PositionQueue(UserList):
         """Describes an unfilled slot."""
         """Describes an unfilled slot."""
 
 
         def __init__(self, position):
         def __init__(self, position):
+            # This is not used, but is an argument from xrange
+            # so why not.
             self.position = position
             self.position = position
 
 
     def __init__(self, length):
     def __init__(self, length):
@@ -79,6 +81,29 @@ class ExceptionInfo(object):
                 str(self.exception))
                 str(self.exception))
 
 
 
 
+def consume_queue(queue):
+    """Iterator yielding all immediately available items in a
+    :class:`Queue.Queue`.
+
+    The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
+
+    Example
+
+        >>> q = Queue()
+        >>> map(q.put, range(4))
+        >>> list(consume_queue(q))
+        [0, 1, 2, 3]
+        >>> list(consume_queue(q))
+        []
+
+    """
+    while 1:
+        try:
+            yield queue.get_nowait()
+        except QueueEmpty:
+            break
+
+
 class SharedCounter(object):
 class SharedCounter(object):
     """Thread-safe counter.
     """Thread-safe counter.
 
 
@@ -198,26 +223,3 @@ class LimitedSet(object):
     def first(self):
     def first(self):
         """Get the oldest member."""
         """Get the oldest member."""
         return self.chronologically[0]
         return self.chronologically[0]
-
-
-def consume_queue(queue):
-    """Iterator yielding all immediately available items in a
-    :class:`Queue.Queue`.
-
-    The iterator stops as soon as the queue raises :exc:`Queue.Empty`.
-
-    Example
-
-        >>> q = Queue()
-        >>> map(q.put, range(4))
-        >>> list(consume_queue(q))
-        [0, 1, 2, 3]
-        >>> list(consume_queue(q))
-        []
-
-    """
-    while 1:
-        try:
-            yield queue.get_nowait()
-        except QueueEmpty:
-            break

+ 12 - 3
celery/decorators.py

@@ -1,3 +1,8 @@
+"""
+
+Decorators
+
+"""
 from inspect import getargspec
 from inspect import getargspec
 
 
 from billiard.utils.functional import wraps
 from billiard.utils.functional import wraps
@@ -45,6 +50,10 @@ def task(*args, **options):
             @wraps(fun)
             @wraps(fun)
             def run(self, *args, **kwargs):
             def run(self, *args, **kwargs):
                 return fun(*args, **kwargs)
                 return fun(*args, **kwargs)
+
+            # Save the argspec for this task so we can recognize
+            # which default task kwargs we're going to pass to it later.
+            # (this happens in celery.utils.fun_takes_kwargs)
             run.argspec = getargspec(fun)
             run.argspec = getargspec(fun)
 
 
             cls_dict = dict(options, run=run, __module__=fun.__module__)
             cls_dict = dict(options, run=run, __module__=fun.__module__)
@@ -60,7 +69,7 @@ def task(*args, **options):
 def periodic_task(**options):
 def periodic_task(**options):
     """Task decorator to create a periodic task.
     """Task decorator to create a periodic task.
 
 
-    Run a task once every day:
+    Example task, scheduling a task once every day:
 
 
     .. code-block:: python
     .. code-block:: python
 
 
@@ -72,5 +81,5 @@ def periodic_task(**options):
             logger.warn("Task running...")
             logger.warn("Task running...")
 
 
     """
     """
-    options.setdefault("base", PeriodicTask)
-    return task(**options)
+    return task(**dict({"base": PeriodicTask}, **options))
+

+ 8 - 6
celery/events.py

@@ -52,11 +52,10 @@ class EventDispatcher(object):
         """
         """
         if not self.enabled:
         if not self.enabled:
             return
             return
+
         self._lock.acquire()
         self._lock.acquire()
         try:
         try:
-            fields["timestamp"] = time.time()
-            fields["hostname"] = self.hostname
-            self.publisher.send(Event(type, **fields))
+            self.publisher.send(Event(type, hostname=self.hostname))
         finally:
         finally:
             self._lock.release()
             self._lock.release()
 
 
@@ -87,13 +86,16 @@ class EventReceiver(object):
     def process(self, type, event):
     def process(self, type, event):
         """Process the received event by dispatching it to the appropriate
         """Process the received event by dispatching it to the appropriate
         handler."""
         handler."""
-        print("Received event: %s" % event)
         handler = self.handlers.get(type) or self.handlers.get("*")
         handler = self.handlers.get(type) or self.handlers.get("*")
         handler and handler(event)
         handler and handler(event)
 
 
     def capture(self, limit=None):
     def capture(self, limit=None):
-        """Open up a consumer capturing events. This has to be running
-        in the main process, and it will never stop unless forced"""
+        """Open up a consumer capturing events.
+
+        This has to run in the main process, and it will never
+        stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
+
+        """
         consumer = EventConsumer(self.connection)
         consumer = EventConsumer(self.connection)
         consumer.register_callback(self._receive)
         consumer.register_callback(self._receive)
         it = consumer.iterconsume(limit=limit)
         it = consumer.iterconsume(limit=limit)

+ 17 - 13
celery/exceptions.py

@@ -1,22 +1,14 @@
-"""celery.exceptions"""
+"""
+
+Common Exceptions
+
+"""
 
 
 UNREGISTERED_FMT = """
 UNREGISTERED_FMT = """
 Task of kind %s is not registered, please make sure it's imported.
 Task of kind %s is not registered, please make sure it's imported.
 """.strip()
 """.strip()
 
 
 
 
-class MaxRetriesExceededError(Exception):
-    """The tasks max restart limit has been exceeded."""
-
-
-class RetryTaskError(Exception):
-    """The task is to be retried later."""
-
-    def __init__(self, message, exc, *args, **kwargs):
-        self.exc = exc
-        super(RetryTaskError, self).__init__(message, exc, *args, **kwargs)
-
-
 class NotRegistered(KeyError):
 class NotRegistered(KeyError):
     """The task is not registered."""
     """The task is not registered."""
 
 
@@ -31,3 +23,15 @@ class AlreadyRegistered(Exception):
 
 
 class TimeoutError(Exception):
 class TimeoutError(Exception):
     """The operation timed out."""
     """The operation timed out."""
+
+
+class MaxRetriesExceededError(Exception):
+    """The tasks max restart limit has been exceeded."""
+
+
+class RetryTaskError(Exception):
+    """The task is to be retried later."""
+
+    def __init__(self, message, exc, *args, **kwargs):
+        self.exc = exc
+        super(RetryTaskError, self).__init__(message, exc, *args, **kwargs)

+ 15 - 9
celery/platform.py

@@ -12,8 +12,11 @@ def reset_signal(signal_name):
     or the specified signal in particular.
     or the specified signal in particular.
 
 
     """
     """
-    if hasattr(signal, signal_name):
-        signal.signal(getattr(signal, signal_name), signal.SIG_DFL)
+    try:
+        signum = getattr(signal, signal_name)
+        signal.signal(signum, signal.SIG_DFL)
+    except AttributeError:
+        pass
 
 
 
 
 def install_signal_handler(signal_name, handler):
 def install_signal_handler(signal_name, handler):
@@ -23,16 +26,19 @@ def install_signal_handler(signal_name, handler):
     or the specified signal in particular.
     or the specified signal in particular.
 
 
     """
     """
-    if not hasattr(signal, signal_name):
-        return
-
-    signum = getattr(signal, signal_name)
-    signal.signal(signum, handler)
+    try:
+        signum = getattr(signal, signal_name)
+        signal.signal(signum, handler)
+    except AttributeError:
+        pass
 
 
 
 
 def set_process_title(progname, info=None):
 def set_process_title(progname, info=None):
-    """Set the ps name for the currently running process
-    if :mod`setproctitle` is installed."""
+    """Set the ps name for the currently running process.
+
+    Only works if :mod`setproctitle` is installed.
+
+    """
     if _setproctitle:
     if _setproctitle:
         proctitle = "[%s]" % progname
         proctitle = "[%s]" % progname
         proctitle = info and "%s %s" % (proctitle, info) or proctitle
         proctitle = info and "%s %s" % (proctitle, info) or proctitle

+ 5 - 2
celery/registry.py

@@ -28,7 +28,6 @@ class TaskRegistry(UserDict):
         instance.
         instance.
 
 
         """
         """
-
         task = inspect.isclass(task) and task() or task
         task = inspect.isclass(task) and task() or task
         name = task.name
         name = task.name
         self.data[name] = task
         self.data[name] = task
@@ -43,8 +42,12 @@ class TaskRegistry(UserDict):
             been registered.
             been registered.
 
 
         """
         """
-        if hasattr(name, "run"):
+        try:
+            # Might be a task class
             name = name.name
             name = name.name
+        except AttributeError:
+            pass
+
         self.pop(name)
         self.pop(name)
 
 
     def filter_types(self, type):
     def filter_types(self, type):

+ 7 - 0
celery/task/base.py

@@ -412,6 +412,13 @@ class Task(object):
         """
         """
         wrapper.execute_using_pool(pool, loglevel, logfile)
         wrapper.execute_using_pool(pool, loglevel, logfile)
 
 
+    def __repr__(self):
+        try:
+            kind = self.__class__.mro()[1].__name__
+        except (AttributeError, IndexError):
+            kind = "%s(Task)" % self.__class__.__name__
+        return "<%s: %s (%s)>" % (kind, self.name, self.type)
+
 
 
 class ExecuteRemoteTask(Task):
 class ExecuteRemoteTask(Task):
     """Execute an arbitrary function or object.
     """Execute an arbitrary function or object.

+ 2 - 3
celery/tests/test_datastructures.py

@@ -1,10 +1,9 @@
-import unittest
 import sys
 import sys
+import unittest
 from Queue import Queue
 from Queue import Queue
 
 
 from celery.datastructures import PositionQueue, ExceptionInfo
 from celery.datastructures import PositionQueue, ExceptionInfo
-from celery.datastructures import LimitedSet, consume_queue
-from celery.datastructures import SharedCounter
+from celery.datastructures import LimitedSet, SharedCounter, consume_queue
 
 
 
 
 class TestPositionQueue(unittest.TestCase):
 class TestPositionQueue(unittest.TestCase):

+ 2 - 4
docs/getting-started/first-steps-with-celery.rst

@@ -79,7 +79,7 @@ Running the celery worker server
 To test we will run the worker server in the foreground, so we can
 To test we will run the worker server in the foreground, so we can
 see what's going on in the terminal::
 see what's going on in the terminal::
 
 
-    $ celeryd --loglevel=INFO
+    $ PYTHONPATH="." celeryd --loglevel=INFO
 
 
 However, in production you probably want to run the worker in the
 However, in production you probably want to run the worker in the
 background as a daemon. To do this you need to use to tools provided
 background as a daemon. To do this you need to use to tools provided
@@ -88,13 +88,11 @@ by your platform, or something like `supervisord`_.
 For a complete listing of the command line options available, use the
 For a complete listing of the command line options available, use the
 help command::
 help command::
 
 
-    $  celeryd --help
+    $  PYTHONPATH="." celeryd --help
 
 
 For info on how to run celery as standalone daemon, see 
 For info on how to run celery as standalone daemon, see 
 :doc:`daemon mode reference<../cookbook/daemonizing>`
 :doc:`daemon mode reference<../cookbook/daemonizing>`
 
 
-
-
 Executing the task
 Executing the task
 ==================
 ==================