Browse Source

Cosmetics

Ask Solem 13 years ago
parent
commit
9571e0e551
8 changed files with 119 additions and 120 deletions
  1. 2 2
      celery/app/amqp.py
  2. 4 0
      celery/app/annotations.py
  3. 90 98
      celery/app/base.py
  4. 1 1
      celery/app/routes.py
  5. 11 10
      celery/result.py
  6. 2 2
      celery/task/base.py
  7. 8 6
      celery/utils/functional.py
  8. 1 1
      setup.py

+ 2 - 2
celery/app/amqp.py

@@ -15,7 +15,7 @@ from datetime import timedelta
 
 from kombu import BrokerConnection, Exchange
 from kombu import compat as messaging
-from kombu.pools import ProducerPool
+from kombu import pools
 
 from celery import signals
 from celery.utils import cached_property, lpmerge, uuid
@@ -259,7 +259,7 @@ class TaskPublisher(messaging.Publisher):
             self.close()
 
 
-class PublisherPool(ProducerPool):
+class PublisherPool(pools.ProducerPool):
 
     def __init__(self, app):
         self.app = app

+ 4 - 0
celery/app/annotations.py

@@ -7,6 +7,10 @@ _first_match = firstmethod("annotate")
 _first_match_any = firstmethod("annotate_any")
 
 
+def resolve_all(anno, task):
+    return filter(None, (_first_match(anno, task), _first_match_any(anno)))
+
+
 class MapAnnotation(dict):
 
     def annotate_any(self):

+ 90 - 98
celery/app/base.py

@@ -29,24 +29,24 @@ from celery.utils import cached_property, register_after_fork
 from celery.utils.functional import first
 from celery.utils.imports import instantiate, symbol_by_name
 
-from .annotations import (
-    _first_match, _first_match_any,
-    prepare as prepare_annotations,
-)
+from .annotations import prepare as prepare_annotations
 from .builtins import load_builtin_tasks
 from .defaults import DEFAULTS, find_deprecated_settings
 from .state import _tls, get_current_app
 from .utils import AppPickler, Settings, bugreport, _unpickle_app
 
 
-def _unpickle_appattr(reverse, args):
-    return getattr(get_current_app(), reverse)(*args)
+def _unpickle_appattr(reverse_name, args):
+    """Given an attribute name and a list of args, gets
+    the attribute from the current app and calls it."""
+    return getattr(get_current_app(), reverse_name)(*args)
 
 
 class Celery(object):
     """Celery Application.
 
     :param main: Name of the main module if running as `__main__`.
+    :keyword broker: URL of the default broker used.
     :keyword loader: The loader class, or the name of the loader class to use.
                      Default is :class:`celery.loaders.app.AppLoader`.
     :keyword backend: The result store backend class, or the name of the
@@ -57,6 +57,7 @@ class Celery(object):
     :keyword log: Log object or class name.
     :keyword control: Control object or class name.
     :keyword set_as_current:  Make this the global current app.
+    :keyword tasks: A task registry or the name of a registry class.
 
     """
     Pickler = AppPickler
@@ -108,6 +109,7 @@ class Celery(object):
         _tls.current_app = self
 
     def on_init(self):
+        """Optional callback called at init."""
         pass
 
     def create_task_cls(self):
@@ -116,50 +118,6 @@ class Celery(object):
         return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
                                        attribute="_app", abstract=True)
 
-    def subclass_with_self(self, Class, name=None, attribute="app",
-            reverse=None, **kw):
-        """Subclass an app-compatible class by setting its app attribute
-        to be this app instance.
-
-        App-compatible means that the class has a class attribute that
-        provides the default app it should use, e.g.
-        ``class Foo: app = None``.
-
-        :param Class: The app-compatible class to subclass.
-        :keyword name: Custom name for the target class.
-        :keyword attribute: Name of the attribute holding the app,
-                            default is "app".
-
-        """
-        Class = symbol_by_name(Class)
-        reverse = reverse if reverse else Class.__name__
-
-        def __reduce__(self):
-            return _unpickle_appattr, (reverse, self.__reduce_args__())
-
-        attrs = dict({attribute: self}, __module__=Class.__module__,
-                     __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
-
-        return type(name or Class.__name__, (Class, ), attrs)
-
-    @cached_property
-    def Worker(self):
-        """Create new :class:`~celery.apps.worker.Worker` instance."""
-        return self.subclass_with_self("celery.apps.worker:Worker")
-
-    @cached_property
-    def WorkController(self, **kwargs):
-        return self.subclass_with_self("celery.worker:WorkController")
-
-    @cached_property
-    def Beat(self, **kwargs):
-        """Create new :class:`~celery.apps.beat.Beat` instance."""
-        return self.subclass_with_self("celery.apps.beat:Beat")
-
-    @cached_property
-    def TaskSet(self):
-        return self.subclass_with_self("celery.task.sets:group")
-
     def start(self, argv=None):
         """Run :program:`celery` using `argv`.  Uses :data:`sys.argv`
         if `argv` is not specified."""
@@ -233,46 +191,9 @@ class Celery(object):
         task.bind(self)
         return task
 
-    def annotate_task(self, task):
-        if self.annotations:
-            match = _first_match(self.annotations, task)
-            for attr, value in (match or {}).iteritems():
-                setattr(task, attr, value)
-            match_any = _first_match_any(self.annotations)
-            for attr, value in (match_any or {}).iteritems():
-                setattr(task, attr, value)
-
-    @cached_property
-    def Task(self):
-        """Default Task base class for this application."""
-        return self.create_task_cls()
-
-    @cached_property
-    def annotations(self):
-        return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
-
-    def __repr__(self):
-        return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )
-
-    def __reduce__(self):
-        # Reduce only pickles the configuration changes,
-        # so the default configuration doesn't have to be passed
-        # between processes.
-        return (_unpickle_app, (self.__class__, self.Pickler)
-                              + self.__reduce_args__())
-
-    def __reduce_args__(self):
-        return (self.main,
-                self.conf.changes,
-                self.loader_cls,
-                self.backend_cls,
-                self.amqp_cls,
-                self.events_cls,
-                self.log_cls,
-                self.control_cls,
-                self.accept_magic_kwargs)
-
     def finalize(self):
+        """Finalizes the app by loading built-in tasks,
+        and evaluating pending task decorators."""
         if not self.finalized:
             load_builtin_tasks(self)
 
@@ -351,14 +272,6 @@ class Celery(object):
                 publisher or publish.close()
             return result_cls(new_id)
 
-    @cached_property
-    def AsyncResult(self):
-        return self.subclass_with_self("celery.result:AsyncResult")
-
-    @cached_property
-    def TaskSetResult(self):
-        return self.subclass_with_self("celery.result:TaskSetResult")
-
     def broker_connection(self, hostname=None, userid=None,
             password=None, virtual_host=None, port=None, ssl=None,
             insist=None, connect_timeout=None, transport=None,
@@ -483,6 +396,85 @@ class Celery(object):
             self._pool.force_close_all()
             self._pool = None
 
+    def subclass_with_self(self, Class, name=None, attribute="app",
+            reverse=None, **kw):
+        """Subclass an app-compatible class by setting its app attribute
+        to be this app instance.
+
+        App-compatible means that the class has a class attribute that
+        provides the default app it should use, e.g.
+        ``class Foo: app = None``.
+
+        :param Class: The app-compatible class to subclass.
+        :keyword name: Custom name for the target class.
+        :keyword attribute: Name of the attribute holding the app,
+                            default is "app".
+
+        """
+        Class = symbol_by_name(Class)
+        reverse = reverse if reverse else Class.__name__
+
+        def __reduce__(self):
+            return _unpickle_appattr, (reverse, self.__reduce_args__())
+
+        attrs = dict({attribute: self}, __module__=Class.__module__,
+                     __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
+
+        return type(name or Class.__name__, (Class, ), attrs)
+
+    def __repr__(self):
+        return "<%s %s:0x%x>" % (self.__class__.__name__,
+                                 self.main or "__main__", id(self), )
+
+    def __reduce__(self):
+        # Reduce only pickles the configuration changes,
+        # so the default configuration doesn't have to be passed
+        # between processes.
+        return (_unpickle_app, (self.__class__, self.Pickler)
+                              + self.__reduce_args__())
+
+    def __reduce_args__(self):
+        return (self.main, self.conf.changes, self.loader_cls,
+                self.backend_cls, self.amqp_cls, self.events_cls,
+                self.log_cls, self.control_cls, self.accept_magic_kwargs)
+
+
+    @cached_property
+    def Worker(self):
+        """Create new :class:`~celery.apps.worker.Worker` instance."""
+        return self.subclass_with_self("celery.apps.worker:Worker")
+
+    @cached_property
+    def WorkController(self, **kwargs):
+        return self.subclass_with_self("celery.worker:WorkController")
+
+    @cached_property
+    def Beat(self, **kwargs):
+        """Create new :class:`~celery.apps.beat.Beat` instance."""
+        return self.subclass_with_self("celery.apps.beat:Beat")
+
+    @cached_property
+    def TaskSet(self):
+        return self.subclass_with_self("celery.task.sets:group")
+
+    @cached_property
+    def Task(self):
+        """Default Task base class for this application."""
+        return self.create_task_cls()
+
+    @cached_property
+    def annotations(self):
+        return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
+
+
+    @cached_property
+    def AsyncResult(self):
+        return self.subclass_with_self("celery.result:AsyncResult")
+
+    @cached_property
+    def TaskSetResult(self):
+        return self.subclass_with_self("celery.result:TaskSetResult")
+
     @property
     def pool(self):
         if self._pool is None:
@@ -537,4 +529,4 @@ class Celery(object):
         """
         self.finalize()
         return self._tasks
-App = Celery
+App = Celery  # compat

+ 1 - 1
celery/app/routes.py

@@ -50,7 +50,7 @@ class Router(object):
                 return lpmerge(self.expand_destination(route), options)
         if "queue" not in options:
             options = lpmerge(self.expand_destination(
-                                self.app.conf.CELERY_DEFAULT_QUEUE), options)
+                              self.app.conf.CELERY_DEFAULT_QUEUE), options)
         return options
 
     def expand_destination(self, route):

+ 11 - 10
celery/result.py

@@ -184,6 +184,14 @@ class AsyncResult(ResultBase):
         """Returns :const:`True` if the task failed."""
         return self.state == states.FAILURE
 
+    def build_graph(self, intermediate=False):
+        graph = DependencyGraph()
+        for parent, node in self.iterdeps(intermediate=intermediate):
+            if parent:
+                graph.add_arc(parent)
+                graph.add_edge(parent, node)
+        return graph
+
     def __str__(self):
         """`str(self) -> self.id`"""
         return self.id
@@ -203,21 +211,14 @@ class AsyncResult(ResultBase):
         return NotImplemented
 
     def __copy__(self):
-        return self.__class__(self.id, backend=self.backend)
+        r = self.__reduce__()
+        return r[0](*r[1])
 
     def __reduce__(self):
         return self.__class__, self.__reduce_args__()
 
     def __reduce_args__(self):
-        return self.id, self.task_name, self.backend
-
-    def build_graph(self, intermediate=False):
-        graph = DependencyGraph()
-        for parent, node in self.iterdeps(intermediate=intermediate):
-            if parent:
-                graph.add_arc(parent)
-                graph.add_edge(parent, node)
-        return graph
+        return self.id, self.backend, self.task_name, self.parent
 
     def set_parent(self, parent):
         self.parent = parent

+ 2 - 2
celery/task/base.py

@@ -18,11 +18,11 @@ from celery.__compat__ import class_property, reclassmethod
 from celery.app.task import Context, TaskType, BaseTask  # noqa
 from celery.schedules import maybe_schedule
 
-#: list of methods that are classmethods in the old API.
+#: list of methods that must be classmethods in the old API.
 _COMPAT_CLASSMETHODS = (
     "get_logger", "establish_connection", "get_publisher", "get_consumer",
     "delay", "apply_async", "retry", "apply", "AsyncResult", "subtask",
-    "bind", "on_bound", "_get_app")
+    "bind", "on_bound", "_get_app", "annotate")
 
 
 class Task(BaseTask):

+ 8 - 6
celery/utils/functional.py

@@ -179,21 +179,23 @@ def first(predicate, iterable):
 
 
 def firstmethod(method):
-    """Returns a functions that with a list of instances,
+    """Returns a function that with a list of instances,
     finds the first instance that returns a value for the given method.
 
     The list can also contain promises (:class:`promise`.)
 
     """
 
-    def _matcher(seq, *args, **kwargs):
-        for cls in seq:
+    def _matcher(it, *args, **kwargs):
+        for obj in it:
             try:
-                answer = getattr(maybe_promise(cls), method)(*args, **kwargs)
-                if answer is not None:
-                    return answer
+                answer = getattr(maybe_promise(obj), method)(*args, **kwargs)
             except AttributeError:
                 pass
+            else:
+                if answer is not None:
+                    return answer
+
     return _matcher
 
 

+ 1 - 1
setup.py

@@ -165,7 +165,7 @@ entrypoints["bundle.bundles"] = ["celery = celery.contrib.bundles:bundles"]
 # -*- %%% -*-
 
 setup(
-    name="celery",
+    name=NAME,
     version=meta["VERSION"],
     description=meta["doc"],
     author=meta["author"],