فهرست منبع

Merge branch 'nocallables' of github.com:ask/celery into nocallables

Ask Solem 15 سال پیش
والد
کامیت
4e9655d700

+ 25 - 33
README.rst

@@ -2,7 +2,7 @@
  celery - Distributed Task Queue
 =================================
 
-:Version: 0.8.0
+:Version: 0.9.0
 
 Introduction
 ============
@@ -21,18 +21,9 @@ languages see `Executing tasks on a remote web server`_.
 
 .. _`Executing tasks on a remote web server`: http://bit.ly/CgXSc
 
-It is used for executing tasks *asynchronously*, routed to one or more
+It is used for executing functions *asynchronously*, routed to one or more
 worker servers, running concurrently using multiprocessing.
 
-It is designed to solve certain problems related to running websites
-demanding high-availability and performance.
-
-It is perfect for filling caches, posting updates to twitter, mass
-downloading data like syndication feeds or web scraping. Use-cases are
-plentiful. Implementing these features asynchronously using ``celery`` is
-easy and fun, and the performance improvements can make it more than
-worthwhile.
-
 Overview
 ========
 
@@ -266,14 +257,21 @@ advanced features of celery later.
 This is a task that basically does nothing but take some arguments,
 and return a value:
 
-    >>> from celery.task import Task
-    >>> from celery.registry import tasks
-    >>> class MyTask(Task):
-    ...     def run(self, some_arg, **kwargs):
-    ...         logger = self.get_logger(**kwargs)
-    ...         logger.info("Did something: %s" % some_arg)
-    ...         return 42
-    >>> tasks.register(MyTask)
+    >>> from celery.decorators import task
+    >>> @task()
+    ... def add(x, y):
+    ...     return x * y
+
+
+You can also use the workers logger to add some diagnostic output to
+the worker log:
+
+    >>> from celery.decorators import task
+    >>> @task()
+    ... def add(x, y, **kwargs):
+    ...     logger = add.get_logger(**kwargs)
+    ...     logger.info("Adding %s + %s" % (x, y))
+    ...     return x + y
 
 As you can see the worker is sending some keyword arguments to this task,
 this is the default keyword arguments. A task can choose not to take these,
@@ -307,16 +305,16 @@ Now if we want to execute this task, we can use the ``delay`` method of the
 task class (this is a handy shortcut to the ``apply_async`` method which gives
 you greater control of the task execution).
 
-    >>> from myapp.tasks import MyTask
-    >>> MyTask.delay(some_arg="foo")
+    >>> from myapp.tasks import add
+    >>> add.delay(4, 4)
 
 At this point, the task has been sent to the message broker. The message
 broker will hold on to the task until a celery worker server has successfully
 picked it up.
 
-*Note* If everything is just hanging when you execute ``delay``, please check
-that RabbitMQ is running, and that the user/password has access to the virtual
-host you configured earlier.
+*Note* If everything is just hanging when you execute ``delay``, please make
+sure the RabbitMQ user/password has access to the virtual host configured
+earlier.
 
 Right now we have to check the celery worker logfiles to know what happened with
 the task. This is because we didn't keep the ``AsyncResult`` object returned
@@ -327,15 +325,15 @@ finish and get its return value (or exception if the task failed).
 
 So, let's execute the task again, but this time we'll keep track of the task:
 
-    >>> result = MyTask.delay("do_something", some_arg="foo bar baz")
+    >>> result = add.delay(4, 4)
     >>> result.ready() # returns True if the task has finished processing.
     False
     >>> result.result # task is not ready, so no return value yet.
     None
     >>> result.get()   # Waits until the task is done and return the retval.
-    42
+    8
     >>> result.result
-    42
+    8
     >>> result.successful() # returns True if the task didn't end in failure.
     True
 
@@ -358,7 +356,6 @@ Periodic tasks are tasks that are run every ``n`` seconds.
 Here's an example of a periodic task:
 
     >>> from celery.task import PeriodicTask
-    >>> from celery.registry import tasks
     >>> from datetime import timedelta
     >>> class MyPeriodicTask(PeriodicTask):
     ...     run_every = timedelta(seconds=30)
@@ -367,11 +364,6 @@ Here's an example of a periodic task:
     ...         logger = self.get_logger(**kwargs)
     ...         logger.info("Running periodic task!")
     ...
-    >>> tasks.register(MyPeriodicTask)
-
-**Note:** Periodic tasks does not support arguments, as this doesn't
-really make sense.
-
 
 A look inside the worker
 ========================

+ 3 - 19
celery/__init__.py

@@ -1,20 +1,4 @@
 """Distributed Task Queue"""
-
-VERSION = (0, 8, 0)
-
-__version__ = ".".join(map(str, VERSION))
-__author__ = "Ask Solem"
-__contact__ = "askh@opera.com"
-__homepage__ = "http://github.com/ask/celery/"
-__docformat__ = "restructuredtext"
-
-
-def is_stable_release():
-    return bool(not VERSION[1] % 2)
-
-
-def version_with_meta():
-    meta = "unstable"
-    if is_stable_release():
-        meta = "stable"
-    return "%s (%s)" % (__version__, meta)
+from celery.distmeta import (__version__, __author__, __contact__,
+                             __homepage__, __docformat__, VERSION,
+                             is_stable_release, version_with_meta)

+ 1 - 1
celery/conf.py

@@ -11,7 +11,7 @@ DEFAULT_AMQP_EXCHANGE_TYPE = "direct"
 DEFAULT_DAEMON_CONCURRENCY = 0 # defaults to cpu count
 DEFAULT_DAEMON_PID_FILE = "celeryd.pid"
 DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
-DEFAULT_DAEMON_LOG_LEVEL = "INFO"
+DEFAULT_DAEMON_LOG_LEVEL = "WARN"
 DEFAULT_DAEMON_LOG_FILE = "celeryd.log"
 DEFAULT_AMQP_CONNECTION_TIMEOUT = 4
 DEFAULT_STATISTICS = False

+ 51 - 0
celery/decorators.py

@@ -0,0 +1,51 @@
+from celery.task.base import Task
+from inspect import getargspec
+
+
+def task(**options):
+    """Make a task out of any callable.
+
+        Examples:
+
+            >>> @task()
+            ... def refresh_feed(url):
+            ...     return Feed.objects.get(url=url).refresh()
+
+
+            >>> refresh_feed("http://example.com/rss") # Regular
+            <Feed: http://example.com/rss>
+            >>> refresh_feed.delay("http://example.com/rss") # Async
+            <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
+
+            # With setting extra options and using retry.
+
+            >>> @task(exchange="feeds")
+            ... def refresh_feed(url, **kwargs):
+            ...     try:
+            ...         return Feed.objects.get(url=url).refresh()
+            ...     except socket.error, exc:
+            ...         refresh_feed.retry(args=[url], kwargs=kwargs,
+            ...                            exc=exc)
+
+
+    """
+
+    def _create_task_cls(fun):
+        base = options.pop("base", Task)
+
+        cls_name = fun.__name__
+
+        def run(self, *args, **kwargs):
+            return fun(*args, **kwargs)
+        run.__name__ = fun.__name__
+        run.argspec = getargspec(fun)
+
+        cls_dict = dict(options)
+        cls_dict["run"] = run
+        cls_dict["__module__"] = fun.__module__
+
+        task = type(cls_name, (base, ), cls_dict)()
+
+        return task
+
+    return _create_task_cls

+ 20 - 0
celery/distmeta.py

@@ -0,0 +1,20 @@
+"""Distributed Task Queue"""
+
+VERSION = (0, 9, 0)
+
+__version__ = ".".join(map(str, VERSION))
+__author__ = "Ask Solem"
+__contact__ = "askh@opera.com"
+__homepage__ = "http://github.com/ask/celery/"
+__docformat__ = "restructuredtext"
+
+
+def is_stable_release():
+    return bool(not VERSION[1] % 2)
+
+
+def version_with_meta():
+    meta = "unstable"
+    if is_stable_release():
+        meta = "stable"
+    return "%s (%s)" % (__version__, meta)

+ 4 - 5
celery/execute.py

@@ -203,9 +203,8 @@ class ExecuteWrapper(object):
     If the call results in an exception, it saves the exception as the task
     result, and sets the task status to ``"FAILURE"``.
 
-    :param fun: Callable object to execute.
+    :param task_name: The name of the task to execute.
     :param task_id: The unique id of the task.
-    :param task_name: Name of the task.
     :param args: List of positional args to pass on to the function.
     :param kwargs: Keyword arguments mapping to pass on to the function.
 
@@ -214,8 +213,7 @@ class ExecuteWrapper(object):
 
     """
 
-    def __init__(self, fun, task_id, task_name, args=None, kwargs=None):
-        self.fun = fun
+    def __init__(self, task_name, task_id, args=None, kwargs=None):
         self.task_id = task_id
         self.task_name = task_name
         self.args = args or []
@@ -236,11 +234,12 @@ class ExecuteWrapper(object):
 
     def execute(self):
         # Convenience variables
-        fun = self.fun
         task_id = self.task_id
         task_name = self.task_name
         args = self.args
         kwargs = self.kwargs
+        fun = tasks[task_name]
+        self.fun = fun # Set fun for handlers.
 
         # Run task loader init handler.
         current_loader.on_task_init(task_id, fun)

+ 8 - 26
celery/registry.py

@@ -1,14 +1,14 @@
 """celery.registry"""
 from celery import discovery
 from celery.utils import get_full_cls_name
-from celery.exceptions import NotRegistered, AlreadyRegistered
+from celery.exceptions import NotRegistered
 from UserDict import UserDict
+import inspect
 
 
 class TaskRegistry(UserDict):
     """Site registry for tasks."""
 
-    AlreadyRegistered = AlreadyRegistered
     NotRegistered = NotRegistered
 
     def __init__(self):
@@ -18,33 +18,15 @@ class TaskRegistry(UserDict):
         """Autodiscovers tasks using :func:`celery.discovery.autodiscover`."""
         discovery.autodiscover()
 
-    def register(self, task, name=None):
+    def register(self, task):
         """Register a task in the task registry.
-
-        Task can either be a regular function, or a class inheriting
-        from :class:`celery.task.Task`.
-
-        :keyword name: By default the :attr:`Task.name` attribute on the
-            task is used as the name of the task, but you can override it
-            using this option.
-
-        :raises AlreadyRegistered: if the task is already registered.
-
+         
+        The task will be automatically instantiated if it's a class
+        not an instance.
         """
-        is_class = hasattr(task, "run")
-        if is_class:
-            task = task() # instantiate Task class
-        if not name:
-            name = getattr(task, "name")
-
-        if name in self.data:
-            raise self.AlreadyRegistered(
-                    "Task with name %s is already registered." % name)
-
-        if not is_class:
-            task.name = name
-            task.type = "regular"
 
+        task = task() if inspect.isclass(task) else task
+        name = task.name
         self.data[name] = task
 
     def unregister(self, name):

+ 1 - 1
celery/task/__init__.py

@@ -12,7 +12,7 @@ from celery.task.base import Task, TaskSet, PeriodicTask
 from celery.task.base import ExecuteRemoteTask
 from celery.task.base import AsynchronousMapTask
 from celery.task.builtins import DeleteExpiredTaskMetaTask, PingTask
-from celery.execute import apply_async, delay_task
+from celery.execute import apply_async
 from celery.serialization import pickle
 from celery.task.rest import RESTProxyTask
 

+ 52 - 17
celery/task/base.py

@@ -3,12 +3,51 @@ from celery import conf
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
 from celery.result import TaskSetResult, EagerResult
-from celery.execute import apply_async, delay_task, apply
+from celery.execute import apply_async, apply
 from celery.utils import gen_unique_id, get_full_cls_name
 from celery.registry import tasks
 from celery.serialization import pickle
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from datetime import timedelta
+import sys
+
+
+class TaskType(type):
+    """Metaclass for tasks.
+
+    Automatically registers the task in the task registry, except
+    if the ``abstract`` attribute is set.
+
+    If no ``name`` attribute is provided, the name is automatically
+    set to the name of the module it was defined in, and the class name.
+
+    """
+
+    def __new__(cls, name, bases, attrs):
+        super_new = super(TaskType, cls).__new__
+        task_module = attrs["__module__"]
+
+        # Abstract class, remove the abstract attribute so
+        # any class inheriting from this won't be abstract by default.
+        if attrs.pop("abstract", None):
+            return super_new(cls, name, bases, attrs)
+
+        # Automatically generate missing name.
+        if not attrs.get("name"):
+            task_module = sys.modules[task_module]
+            task_name = ".".join([task_module.__name__, name])
+            attrs["name"] = task_name
+
+        # Because of the way import happens (recursively)
+        # we may or may not be the first time the task tries to register
+        # with the framework. There should only be one class for each task
+        # name, so we always return the registered version.
+
+        task_name = attrs["name"]
+        if task_name not in tasks:
+            task_cls = super_new(cls, name, bases, attrs)
+            tasks.register(task_cls)
+        return tasks[task_name].__class__
 
 
 class Task(object):
@@ -26,7 +65,12 @@ class Task(object):
 
         *REQUIRED* All subclasses of :class:`Task` has to define the
         :attr:`name` attribute. This is the name of the task, registered
-        in the task registry, and passed to :func:`delay_task`.
+        in the task registry, and passed on to the workers.
+
+    .. attribute:: abstract
+
+        Abstract classes are not registered in the task registry, so they're
+        only used for making new tasks by subclassing.
 
     .. attribute:: type
 
@@ -108,7 +152,6 @@ class Task(object):
         ...         logger.info("Running MyTask with arg some_arg=%s" %
         ...                     some_arg))
         ...         return 42
-        ... tasks.register(MyTask)
 
     You can delay the task using the classmethod :meth:`delay`...
 
@@ -118,15 +161,12 @@ class Task(object):
         >>> result.result
         42
 
-    ...or using the :func:`delay_task` function, by passing the name of
-    the task.
-
-        >>> from celery.task import delay_task
-        >>> result = delay_task(MyTask.name, some_arg="foo")
-
 
     """
+    __metaclass__ = TaskType
+
     name = None
+    abstract = True
     type = "regular"
     exchange = None
     routing_key = None
@@ -253,7 +293,8 @@ class Task(object):
 
     @classmethod
     def delay(cls, *args, **kwargs):
-        """Delay this task for execution by the ``celery`` daemon(s).
+        """Shortcut to :meth:`apply_async` but with star arguments,
+        and doesn't support the extra options.
 
         :param \*args: positional arguments passed on to the task.
 
@@ -261,8 +302,6 @@ class Task(object):
 
         :rtype: :class:`celery.result.AsyncResult`
 
-        See :func:`celery.execute.delay_task`.
-
         """
         return apply_async(cls, args, kwargs)
 
@@ -429,7 +468,6 @@ class ExecuteRemoteTask(Task):
         """
         callable_ = pickle.loads(ser_callable)
         return callable_(*fargs, **fkwargs)
-tasks.register(ExecuteRemoteTask)
 
 
 class AsynchronousMapTask(Task):
@@ -441,7 +479,6 @@ class AsynchronousMapTask(Task):
         """The method run by ``celeryd``."""
         timeout = kwargs.get("timeout")
         return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
-tasks.register(AsynchronousMapTask)
 
 
 class TaskSet(object):
@@ -599,8 +636,6 @@ class PeriodicTask(Task):
     :raises NotImplementedError: if the :attr:`run_every` attribute is
         not defined.
 
-    You have to register the periodic task in the task registry.
-
     Example
 
         >>> from celery.task import tasks, PeriodicTask
@@ -612,9 +647,9 @@ class PeriodicTask(Task):
         ...     def run(self, **kwargs):
         ...         logger = self.get_logger(**kwargs)
         ...         logger.info("Running MyPeriodicTask")
-        >>> tasks.register(MyPeriodicTask)
 
     """
+    abstract = True
     run_every = timedelta(days=1)
     type = "periodic"
 

+ 0 - 2
celery/task/builtins.py

@@ -20,7 +20,6 @@ class DeleteExpiredTaskMetaTask(PeriodicTask):
         logger = self.get_logger(**kwargs)
         logger.info("Deleting expired task meta objects...")
         default_backend.cleanup()
-tasks.register(DeleteExpiredTaskMetaTask)
 
 
 class PingTask(Task):
@@ -30,4 +29,3 @@ class PingTask(Task):
     def run(self, **kwargs):
         """:returns: the string ``"pong"``."""
         return "pong"
-tasks.register(PingTask)

+ 0 - 1
celery/task/rest.py

@@ -135,7 +135,6 @@ class RESTProxyTask(BaseTask):
         logger = self.get_logger(**kwargs)
         proxy = RESTProxy(url, kwargs, logger)
         return proxy.execute()
-tasks.register(RESTProxyTask)
 
 
 def task_response(fun, *args, **kwargs):

+ 0 - 1
celery/tests/test_backends/test_database.py

@@ -19,7 +19,6 @@ class MyPeriodicTask(PeriodicTask):
 
     def run(self, **kwargs):
         return 42
-registry.tasks.register(MyPeriodicTask)
 
 
 class TestDatabaseBackend(unittest.TestCase):

+ 0 - 1
celery/tests/test_models.py

@@ -52,7 +52,6 @@ class TestModels(unittest.TestCase):
         self.assertFalse(m1 in TaskMeta.objects.all())
 
     def test_periodic_taskmeta(self):
-        tasks.register(TestPeriodicTask)
         p = self.createPeriodicTaskMeta(TestPeriodicTask.name)
         # check that repr works.
         self.assertTrue(unicode(p).startswith("<PeriodicTask:"))

+ 0 - 15
celery/tests/test_registry.py

@@ -2,8 +2,6 @@ import unittest
 from celery import registry
 from celery.task import Task, PeriodicTask
 
-FUNC_TASK_NAME = "celery.unittest.func_task"
-
 
 class TestTask(Task):
     name = "celery.unittest.test_task"
@@ -20,9 +18,6 @@ class TestPeriodicTask(PeriodicTask):
         return True
 
 
-def func_task(**kwargs):
-    return True
-
 
 class TestTaskRegistry(unittest.TestCase):
 
@@ -30,13 +25,11 @@ class TestTaskRegistry(unittest.TestCase):
         self.assertRaises(r.NotRegistered, r.unregister, task)
         r.register(task)
         self.assertTrue(task.name in r)
-        self.assertRaises(r.AlreadyRegistered, r.register, task)
 
     def assertRegisterUnregisterFunc(self, r, task, task_name):
         self.assertRaises(r.NotRegistered, r.unregister, task_name)
         r.register(task, task_name)
         self.assertTrue(task_name in r)
-        self.assertRaises(r.AlreadyRegistered, r.register, task, task_name)
 
     def test_task_registry(self):
         r = registry.TaskRegistry()
@@ -44,37 +37,29 @@ class TestTaskRegistry(unittest.TestCase):
                 "TaskRegistry has composited dict")
 
         self.assertRegisterUnregisterCls(r, TestTask)
-        self.assertRegisterUnregisterFunc(r, func_task, FUNC_TASK_NAME)
         self.assertRegisterUnregisterCls(r, TestPeriodicTask)
 
         tasks = r.get_all()
         self.assertTrue(isinstance(tasks.get(TestTask.name), TestTask))
         self.assertTrue(isinstance(tasks.get(TestPeriodicTask.name),
                                    TestPeriodicTask))
-        self.assertEquals(tasks.get(FUNC_TASK_NAME), func_task)
 
         regular = r.get_all_regular()
         self.assertTrue(TestTask.name in regular)
         self.assertFalse(TestPeriodicTask.name in regular)
-        self.assertTrue(FUNC_TASK_NAME in regular)
 
         periodic = r.get_all_periodic()
         self.assertFalse(TestTask.name in periodic)
         self.assertTrue(TestPeriodicTask.name in periodic)
-        self.assertFalse(FUNC_TASK_NAME in periodic)
 
         self.assertTrue(isinstance(r.get_task(TestTask.name), TestTask))
         self.assertTrue(isinstance(r.get_task(TestPeriodicTask.name),
                                    TestPeriodicTask))
-        self.assertEquals(r.get_task(FUNC_TASK_NAME), func_task)
 
         r.unregister(TestTask)
         self.assertFalse(TestTask.name in r)
         r.unregister(TestPeriodicTask)
         self.assertFalse(TestPeriodicTask.name in r)
-        r.unregister(FUNC_TASK_NAME)
-        self.assertFalse(FUNC_TASK_NAME in r)
 
-        self.assertTrue(func_task())
         self.assertTrue(TestTask().run())
         self.assertTrue(TestPeriodicTask().run())

+ 10 - 21
celery/tests/test_task.py

@@ -10,12 +10,14 @@ from celery import messaging
 from celery.result import EagerResult
 from celery.backends import default_backend
 from datetime import datetime, timedelta
+from celery.decorators import task as task_dec
 
-
-def return_True(self, **kwargs):
+def return_True(*args, **kwargs):
     # Task run functions can't be closures/lambdas, as they're pickled.
     return True
-registry.tasks.register(return_True, "cu.return-true")
+
+
+return_True_task = task_dec()(return_True)
 
 
 def raise_exception(self, **kwargs):
@@ -120,9 +122,10 @@ class TestTaskRetries(unittest.TestCase):
 class TestCeleryTasks(unittest.TestCase):
 
     def createTaskCls(self, cls_name, task_name=None):
-        attrs = {}
+        attrs = {"__module__": self.__module__}
         if task_name:
             attrs["name"] = task_name
+
         cls = type(cls_name, (task.Task, ), attrs)
         cls.run = return_True
         return cls
@@ -190,7 +193,6 @@ class TestCeleryTasks(unittest.TestCase):
         T2 = self.createTaskCls("T2")
         self.assertEquals(T2().name, "celery.tests.test_task.T2")
 
-        registry.tasks.register(T1)
         t1 = T1()
         consumer = t1.get_consumer()
         self.assertRaises(NotImplementedError, consumer.receive, "foo", "foo")
@@ -202,7 +204,7 @@ class TestCeleryTasks(unittest.TestCase):
         self.assertNextTaskDataEquals(consumer, presult, t1.name)
 
         # With arguments.
-        presult2 = task.delay_task(t1.name, name="George Constanza")
+        presult2 = task.apply_async(t1, name="George Constanza")
         self.assertNextTaskDataEquals(consumer, presult2, t1.name,
                 name="George Constanza")
 
@@ -218,12 +220,9 @@ class TestCeleryTasks(unittest.TestCase):
         self.assertNextTaskDataEquals(consumer, presult2, t1.name,
                 name="George Constanza", test_eta=True)
 
-        self.assertRaises(registry.tasks.NotRegistered, task.delay_task,
-                "some.task.that.should.never.exist.X.X.X.X.X")
-
         # Discarding all tasks.
         task.discard_all()
-        tid3 = task.delay_task(t1.name)
+        tid3 = task.apply_async(t1)
         self.assertEquals(task.discard_all(), 1)
         self.assertTrue(consumer.fetch() is None)
 
@@ -250,7 +249,7 @@ class TestTaskSet(unittest.TestCase):
     def test_function_taskset(self):
         from celery import conf
         conf.ALWAYS_EAGER = True
-        ts = task.TaskSet("cu.return-true", [
+        ts = task.TaskSet(return_True_task.name, [
             [[1], {}], [[2], {}], [[3], {}], [[4], {}], [[5], {}]])
         res = ts.run()
         self.assertEquals(res.join(), [True, True, True, True, True])
@@ -313,13 +312,3 @@ class TestTaskApply(unittest.TestCase):
         self.assertFalse(f.is_done())
         self.assertTrue(f.traceback)
         self.assertRaises(KeyError, f.get)
-
-
-class TestPeriodicTask(unittest.TestCase):
-
-    def test_interface(self):
-
-        class MyPeriodicTask(task.PeriodicTask):
-            run_every = None
-
-        self.assertRaises(NotImplementedError, MyPeriodicTask)

+ 11 - 8
celery/tests/test_worker.py

@@ -10,11 +10,12 @@ from celery import registry
 from celery.serialization import pickle
 from celery.utils import gen_unique_id
 from datetime import datetime, timedelta
+from celery.decorators import task as task_dec
 
 
+@task_dec()
 def foo_task(x, y, z, **kwargs):
     return x * y * z
-registry.tasks.register(foo_task, name="c.u.foo")
 
 
 class MockLogger(object):
@@ -108,13 +109,14 @@ class TestAMQPListener(unittest.TestCase):
     def test_receieve_message(self):
         l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
         backend = MockBackend()
-        m = create_message(backend, task="c.u.foo", args=[2, 4, 8], kwargs={})
+        m = create_message(backend, task=foo_task.name,
+                           args=[2, 4, 8], kwargs={})
 
         l.receive_message(m.decode(), m)
 
         in_bucket = self.bucket_queue.get_nowait()
         self.assertTrue(isinstance(in_bucket, TaskWrapper))
-        self.assertEquals(in_bucket.task_name, "c.u.foo")
+        self.assertEquals(in_bucket.task_name, foo_task.name)
         self.assertEquals(in_bucket.execute(), 2 * 4 * 8)
         self.assertRaises(Empty, self.hold_queue.get_nowait)
 
@@ -130,7 +132,8 @@ class TestAMQPListener(unittest.TestCase):
     def test_receieve_message_eta(self):
         l = AMQPListener(self.bucket_queue, self.hold_queue, self.logger)
         backend = MockBackend()
-        m = create_message(backend, task="c.u.foo", args=[2, 4, 8], kwargs={},
+        m = create_message(backend, task=foo_task.name,
+                           args=[2, 4, 8], kwargs={},
                            eta=datetime.now() + timedelta(days=1))
 
         l.receive_message(m.decode(), m)
@@ -141,7 +144,7 @@ class TestAMQPListener(unittest.TestCase):
         self.assertTrue(isinstance(task, TaskWrapper))
         self.assertTrue(isinstance(eta, datetime))
         self.assertTrue(callable(on_accept))
-        self.assertEquals(task.task_name, "c.u.foo")
+        self.assertEquals(task.task_name, foo_task.name)
         self.assertEquals(task.execute(), 2 * 4 * 8)
         self.assertRaises(Empty, self.bucket_queue.get_nowait)
 
@@ -167,7 +170,7 @@ class TestWorkController(unittest.TestCase):
         worker = self.worker
         worker.pool = MockPool()
         backend = MockBackend()
-        m = create_message(backend, task="c.u.foo", args=[4, 8, 10],
+        m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
         task = TaskWrapper.from_message(m, m.decode())
         worker.safe_process_task(task)
@@ -177,7 +180,7 @@ class TestWorkController(unittest.TestCase):
         worker = self.worker
         worker.pool = MockPool(raise_base=True)
         backend = MockBackend()
-        m = create_message(backend, task="c.u.foo", args=[4, 8, 10],
+        m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
         task = TaskWrapper.from_message(m, m.decode())
         worker.safe_process_task(task)
@@ -187,7 +190,7 @@ class TestWorkController(unittest.TestCase):
         worker = self.worker
         worker.pool = MockPool(raise_regular=True)
         backend = MockBackend()
-        m = create_message(backend, task="c.u.foo", args=[4, 8, 10],
+        m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
         task = TaskWrapper.from_message(m, m.decode())
         worker.safe_process_task(task)

+ 28 - 35
celery/tests/test_worker_job.py

@@ -12,6 +12,7 @@ from carrot.backends.base import BaseMessage
 from StringIO import StringIO
 from celery.log import setup_logger
 from django.core import cache
+from celery.decorators import task as task_dec
 import simplejson
 import logging
 
@@ -19,35 +20,37 @@ scratch = {"ACK": False}
 some_kwargs_scratchpad = {}
 
 
-def jail(task_id, task_name, fun, args, kwargs):
-    return ExecuteWrapper(fun, task_id, task_name, args, kwargs)()
+def jail(task_id, task_name, args, kwargs):
+    return ExecuteWrapper(task_name, task_id, args, kwargs)()
 
 
 def on_ack():
     scratch["ACK"] = True
 
 
+@task_dec()
 def mytask(i, **kwargs):
     return i ** i
-tasks.register(mytask, name="cu.mytask")
 
 
+@task_dec()
 def mytask_no_kwargs(i):
     return i ** i
-tasks.register(mytask_no_kwargs, name="mytask_no_kwargs")
 
 
+
+@task_dec()
 def mytask_some_kwargs(i, logfile):
     some_kwargs_scratchpad["logfile"] = logfile
     return i ** i
-tasks.register(mytask_some_kwargs, name="mytask_some_kwargs")
 
 
+@task_dec()
 def mytask_raising(i, **kwargs):
     raise KeyError(i)
-tasks.register(mytask_raising, name="cu.mytask-raising")
 
 
+@task_dec()
 def get_db_connection(i, **kwargs):
     from django.db import connection
     return id(connection)
@@ -57,11 +60,12 @@ get_db_connection.ignore_result = True
 class TestJail(unittest.TestCase):
 
     def test_execute_jail_success(self):
-        ret = jail(gen_unique_id(), gen_unique_id(), mytask, [2], {})
+        ret = jail(gen_unique_id(), mytask.name, [2], {})
         self.assertEquals(ret, 4)
 
     def test_execute_jail_failure(self):
-        ret = jail(gen_unique_id(), gen_unique_id(), mytask_raising, [4], {})
+        ret = jail(gen_unique_id(), mytask_raising.name,
+                   [4], {})
         self.assertTrue(isinstance(ret, ExceptionInfo))
         self.assertEquals(ret.exception.args, (4, ))
 
@@ -76,8 +80,8 @@ class TestJail(unittest.TestCase):
 
         connection.close = monkeypatched_connection_close
 
-        ret = jail(gen_unique_id(), gen_unique_id(),
-                   get_db_connection, [2], {})
+        ret = jail(gen_unique_id(),
+                   get_db_connection.name, [2], {})
         self.assertTrue(connection._was_closed)
 
         connection.close = old_connection_close
@@ -96,7 +100,7 @@ class TestJail(unittest.TestCase):
 
         cache.cache.close = monkeypatched_cache_close
 
-        jail(gen_unique_id(), gen_unique_id(), mytask, [4], {})
+        jail(gen_unique_id(), mytask.name, [4], {})
         self.assertTrue(cache._was_closed)
         cache.cache.close = old_cache_close
         cache.settings.CACHE_BACKEND = old_backend
@@ -116,7 +120,7 @@ class TestJail(unittest.TestCase):
 
         cache.cache.close = monkeypatched_cache_close
 
-        jail(gen_unique_id(), gen_unique_id(), mytask, [4], {})
+        jail(gen_unique_id(), mytask.name, [4], {})
         self.assertTrue(cache._was_closed)
         cache.cache.close = old_cache_close
         cache.settings.CACHE_BACKEND = old_backend
@@ -128,19 +132,12 @@ class TestJail(unittest.TestCase):
 
 class TestTaskWrapper(unittest.TestCase):
 
-    def test_task_wrapper_attrs(self):
-        tw = TaskWrapper(gen_unique_id(), gen_unique_id(),
-                         mytask, [1], {"f": "x"})
-        for attr in ("task_name", "task_id", "args", "kwargs", "logger"):
-            self.assertTrue(getattr(tw, attr, None))
-
     def test_task_wrapper_repr(self):
-        tw = TaskWrapper(gen_unique_id(), gen_unique_id(),
-                         mytask, [1], {"f": "x"})
+        tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
         self.assertTrue(repr(tw))
 
     def test_task_wrapper_mail_attrs(self):
-        tw = TaskWrapper(gen_unique_id(), gen_unique_id(), mytask, [], {})
+        tw = TaskWrapper(mytask.name, gen_unique_id(), [], {})
         x = tw.success_msg % {"name": tw.task_name,
                               "id": tw.task_id,
                               "return_value": 10}
@@ -157,7 +154,7 @@ class TestTaskWrapper(unittest.TestCase):
         self.assertTrue(x)
 
     def test_from_message(self):
-        body = {"task": "cu.mytask", "id": gen_unique_id(),
+        body = {"task": mytask.name, "id": gen_unique_id(),
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
         m = BaseMessage(body=simplejson.dumps(body), backend="foo",
                         content_type="application/json",
@@ -170,7 +167,6 @@ class TestTaskWrapper(unittest.TestCase):
         self.assertEquals(tw.kwargs.keys()[0],
                           u"æØåveéðƒeæ".encode("utf-8"))
         self.assertFalse(isinstance(tw.kwargs.keys()[0], unicode))
-        self.assertEquals(id(mytask), id(tw.task_func))
         self.assertTrue(tw.logger)
 
     def test_from_message_nonexistant_task(self):
@@ -184,7 +180,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute(self):
         tid = gen_unique_id()
-        tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
+        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
         self.assertEquals(tw.execute(), 256)
         meta = TaskMeta.objects.get(task_id=tid)
         self.assertEquals(meta.result, 256)
@@ -192,8 +188,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_success_no_kwargs(self):
         tid = gen_unique_id()
-        tw = TaskWrapper("cu.mytask_no_kwargs", tid, mytask_no_kwargs,
-                         [4], {})
+        tw = TaskWrapper(mytask_no_kwargs.name, tid, [4], {})
         self.assertEquals(tw.execute(), 256)
         meta = TaskMeta.objects.get(task_id=tid)
         self.assertEquals(meta.result, 256)
@@ -201,8 +196,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_success_some_kwargs(self):
         tid = gen_unique_id()
-        tw = TaskWrapper("cu.mytask_some_kwargs", tid, mytask_some_kwargs,
-                         [4], {})
+        tw = TaskWrapper(mytask_some_kwargs.name, tid, [4], {})
         self.assertEquals(tw.execute(logfile="foobaz.log"), 256)
         meta = TaskMeta.objects.get(task_id=tid)
         self.assertEquals(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
@@ -211,7 +205,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_ack(self):
         tid = gen_unique_id()
-        tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"},
+        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"},
                         on_ack=on_ack)
         self.assertEquals(tw.execute(), 256)
         meta = TaskMeta.objects.get(task_id=tid)
@@ -221,8 +215,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_fail(self):
         tid = gen_unique_id()
-        tw = TaskWrapper("cu.mytask-raising", tid, mytask_raising, [4],
-                         {"f": "x"})
+        tw = TaskWrapper(mytask_raising.name, tid, [4], {"f": "x"})
         self.assertTrue(isinstance(tw.execute(), ExceptionInfo))
         meta = TaskMeta.objects.get(task_id=tid)
         self.assertEquals(meta.status, "FAILURE")
@@ -230,7 +223,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_using_pool(self):
         tid = gen_unique_id()
-        tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
+        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
         p = TaskPool(2)
         p.start()
         asyncres = tw.execute_using_pool(p)
@@ -239,7 +232,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_default_kwargs(self):
         tid = gen_unique_id()
-        tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
+        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
         self.assertEquals(tw.extend_with_default_kwargs(10, "some_logfile"), {
             "f": "x",
             "logfile": "some_logfile",
@@ -250,7 +243,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_on_failure(self):
         tid = gen_unique_id()
-        tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
+        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
         try:
             raise Exception("Inside unit tests")
         except Exception:
@@ -265,7 +258,7 @@ class TestTaskWrapper(unittest.TestCase):
 
         tw.on_failure(exc_info)
         logvalue = logfh.getvalue()
-        self.assertTrue("cu.mytask" in logvalue)
+        self.assertTrue(mytask.name in logvalue)
         self.assertTrue(tid in logvalue)
         self.assertTrue("ERROR" in logvalue)
 

+ 37 - 0
celery/tests/utils.py

@@ -1,11 +1,48 @@
 from __future__ import with_statement
 from contextlib import contextmanager
 from StringIO import StringIO
+from functools import wraps
 import os
 import sys
 import __builtin__
 
 
+def _skip_test(reason, sign):
+
+    def _wrap_test(fun):
+
+        @wraps(fun)
+        def _skipped_test(*args, **kwargs):
+            sys.stderr.write("(%s: %s) " % (sign, reason))
+
+        return _skipped_test
+    return _wrap_test
+
+
+def todo(reason):
+    """TODO test decorator."""
+    return _skip_test(reason, "TODO")
+
+
+def skip(reason):
+    """Skip test decorator."""
+    return _skip_test(reason, "SKIP")
+
+
+def skip_if(predicate, reason):
+    """Skip test if predicate is ``True``."""
+
+    def _inner(fun):
+        return skip(reason)(fun) if predicate else fun
+
+    return _inner
+
+
+def skip_unless(predicate, reason):
+    """Skip test if predicate is ``False``."""
+    return skip_if(not predicate, reason)
+
+
 @contextmanager
 def mask_modules(*modnames):
     """Ban some modules from being importable inside the context

+ 5 - 1
celery/utils/__init__.py

@@ -124,6 +124,9 @@ def fun_takes_kwargs(fun, kwlist=[]):
     """With a function, and a list of keyword arguments, returns arguments
     in the list which the function takes.
 
+    If the object has an ``argspec`` attribute that is used instead
+    of using the :meth:`inspect.getargspec`` introspection.
+
     :param fun: The function to inspect arguments of.
     :param kwlist: The list of keyword arguments.
 
@@ -139,7 +142,8 @@ def fun_takes_kwargs(fun, kwlist=[]):
         ["logfile", "loglevel", "task_id"]
 
     """
-    args, _varargs, keywords, _defaults = getargspec(fun)
+    argspec = getattr(fun, "argspec", getargspec(fun))
+    args, _varargs, keywords, _defaults = argspec
     if keywords != None:
         return kwlist
     return filter(curry(operator.contains, args), kwlist)

+ 7 - 14
celery/worker/job.py

@@ -43,8 +43,6 @@ class TaskWrapper(object):
 
     :param task_id: see :attr:`task_id`.
 
-    :param task_func: see :attr:`task_func`
-
     :param args: see :attr:`args`
 
     :param kwargs: see :attr:`kwargs`.
@@ -57,10 +55,6 @@ class TaskWrapper(object):
 
         UUID of the task.
 
-    .. attribute:: task_func
-
-        The tasks callable object.
-
     .. attribute:: args
 
         List of positional arguments to apply to the task.
@@ -88,11 +82,10 @@ class TaskWrapper(object):
     """
     fail_email_body = TASK_FAIL_EMAIL_BODY
 
-    def __init__(self, task_name, task_id, task_func, args, kwargs,
+    def __init__(self, task_name, task_id, args, kwargs,
             on_ack=noop, retries=0, **opts):
         self.task_name = task_name
         self.task_id = task_id
-        self.task_func = task_func
         self.retries = retries
         self.args = args
         self.kwargs = kwargs
@@ -104,6 +97,9 @@ class TaskWrapper(object):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
         if not self.logger:
             self.logger = get_default_logger()
+        if self.task_name not in tasks:
+            raise NotRegistered(self.task_name)
+        self.task = tasks[self.task_name]
 
     def __repr__(self):
         return '<%s: {name:"%s", id:"%s", args:"%s", kwargs:"%s"}>' % (
@@ -132,10 +128,7 @@ class TaskWrapper(object):
         kwargs = dict((key.encode("utf-8"), value)
                         for key, value in kwargs.items())
 
-        if task_name not in tasks:
-            raise NotRegistered(task_name)
-        task_func = tasks[task_name]
-        return cls(task_name, task_id, task_func, args, kwargs,
+        return cls(task_name, task_id, args, kwargs,
                     retries=retries, on_ack=message.ack, logger=logger)
 
     def extend_with_default_kwargs(self, loglevel, logfile):
@@ -153,7 +146,7 @@ class TaskWrapper(object):
                             "task_id": self.task_id,
                             "task_name": self.task_name,
                             "task_retries": self.retries}
-        fun = getattr(self.task_func, "run", self.task_func)
+        fun = self.task.run
         supported_keys = fun_takes_kwargs(fun, default_kwargs)
         extend_with = dict((key, val) for key, val in default_kwargs.items()
                                 if key in supported_keys)
@@ -163,7 +156,7 @@ class TaskWrapper(object):
     def _executeable(self, loglevel=None, logfile=None):
         """Get the :class:`celery.execute.ExecuteWrapper` for this task."""
         task_func_kwargs = self.extend_with_default_kwargs(loglevel, logfile)
-        return ExecuteWrapper(self.task_func, self.task_id, self.task_name,
+        return ExecuteWrapper(self.task_name, self.task_id,
                               self.args, task_func_kwargs)
 
     def _set_executed_bit(self):

+ 5 - 3
docs/configuration.rst

@@ -270,9 +270,10 @@ Task execution settings
 
 * CELERY_ALWAYS_EAGER
     If this is ``True``, all tasks will be executed locally by blocking
-    until it is finished. ``apply_async`` and ``delay_task`` will return
+    until it is finished. ``apply_async`` and ``Task.delay`` will return
     a :class:`celery.result.EagerResult` which emulates the behaviour of
-    an :class:`celery.result.AsyncResult`.
+    :class:`celery.result.AsyncResult`, except the result has already
+    been evaluated.
 
     Tasks will never be sent to the queue, but executed locally
     instead.
@@ -282,7 +283,8 @@ Task execution settings
     stored task tombstones are deleted.
 
     **NOTE**: For the moment this only works for the database and MongoDB
-    backends.
+    backends., except the result has already
+    been evaluated.
 
 * CELERY_TASK_SERIALIZER
     A string identifying the default serialization

+ 0 - 2
docs/cookbook/tasks.rst

@@ -23,7 +23,6 @@ The cache key expires after some time in case something unexpected happens
 .. code-block:: python
 
     from celery.task import Task
-    from celery.registry import tasks
     from django.core.cache import cache
     from django.utils.hashcompat import md5_constructor as md5
     from djangofeeds.models import Feed
@@ -61,4 +60,3 @@ The cache key expires after some time in case something unexpected happens
                 release_lock()
 
             return feed.url
-    tasks.register(FeedImporter)

+ 0 - 2
docs/slidesource/slide-example1.py

@@ -1,9 +1,7 @@
 from celery.task import Task
-from celery.registry import tasks
 
 
 class MyTask(Task):
 
     def run(self, x, y):
         return x * y
-tasks.register(MyTask)

+ 1 - 4
docs/tutorials/clickcounter.rst

@@ -208,7 +208,6 @@ Processing the clicks every 30 minutes is easy using celery periodic tasks.
 .. code-block:: python
 
     from celery.task import PeriodicTask
-    from celery.registry import tasks
     from clickmuncher.messaging import process_clicks
     from datetime import timedelta
 
@@ -218,12 +217,10 @@ Processing the clicks every 30 minutes is easy using celery periodic tasks.
     
         def run(self, \*\*kwargs):
             process_clicks()
-    tasks.register(ProcessClicksTask)
 
 We subclass from :class:`celery.task.base.PeriodicTask`, set the ``run_every``
 attribute and in the body of the task just call the ``process_clicks``
-function we wrote earlier. Finally, we register the task in the task registry
-so the celery workers is able to recognize and find it.
+function we wrote earlier. 
 
 
 Finishing

+ 6 - 6
setup.py

@@ -12,7 +12,7 @@ except ImportError:
     use_setuptools()
     from setuptools import setup, find_packages, Command
 
-import celery
+from celery import distmeta
 
 
 class RunTests(Command):
@@ -69,11 +69,11 @@ else:
 
 setup(
     name='celery',
-    version=celery.__version__,
-    description=celery.__doc__,
-    author=celery.__author__,
-    author_email=celery.__contact__,
-    url=celery.__homepage__,
+    version=distmeta.__version__,
+    description=distmeta.__doc__,
+    author=distmeta.__author__,
+    author_email=distmeta.__contact__,
+    url=distmeta.__homepage__,
     platforms=["any"],
     license="BSD",
     packages=find_packages(exclude=['ez_setup']),

+ 0 - 1
testproj/someapp/tasks.py

@@ -6,4 +6,3 @@ class SomeAppTask(Task):
 
     def run(self, **kwargs):
         return 42
-tasks.register(SomeAppTask)