Browse Source

celery.contrib.methods: Task decorator for methods

To use::

    from celery.contrib.methods import task

    class X(object):

        @task
        def add(self, x, y):
            return x + y

or with any task decorator:

    from celery.contrib.methods import task_method

    class X(object):

        @celery.task(filter=task_method)
        def add(self, x, y):
            return x + y

Caveats:

- Autmatic naming won't be able to know what the class name is.

The name will still be module_name + task_name,
so two methods with the same name in the same module will collide
so that only one task can run::

    class A(object):
        @task
        def add(self, x, y):
            return x + y

    class B(object):
        @task
        def add(self, x, y):
            return x + y

would have to be written as:

    class A(object):
        @task(name="A.add")
        def add(self, x, y):
            return x + y

    class B(object):
        @task(name="B.add")
        def add(self, x, y):
            return x + y
Ask Solem 13 years ago
parent
commit
b10d2336f6
3 changed files with 36 additions and 3 deletions
  1. 7 2
      celery/app/base.py
  2. 8 1
      celery/app/task.py
  3. 21 0
      celery/contrib/methods.py

+ 7 - 2
celery/app/base.py

@@ -111,7 +111,7 @@ class Celery(object):
     def task(self, *args, **opts):
         """Creates new task class from any callable."""
 
-        def inner_create_task_cls(shared=True, **opts):
+        def inner_create_task_cls(shared=True, filter=None, **opts):
 
             def _create_task_cls(fun):
                 if shared:
@@ -119,11 +119,16 @@ class Celery(object):
                     cons.__name__ = fun.__name__
                     shared_task(cons)
                 if self.accept_magic_kwargs:  # compat mode
-                    return self._task_from_fun(fun, **opts)
+                    task = self._task_from_fun(fun, **opts)
+                    if filter:
+                        task = filter(task)
+                    return task
 
                 # return a proxy object that is only evaluated when first used
                 promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
                 self._pending.append(promise)
+                if filter:
+                    return filter(promise)
                 return promise
 
             return _create_task_cls

+ 8 - 1
celery/app/task.py

@@ -120,7 +120,7 @@ class TaskType(type):
             except KeyError:  # pragma: no cover
                 # Fix for manage.py shell_plus (Issue #366).
                 module_name = task_module
-            attrs["name"] = '.'.join([module_name, name])
+            attrs["name"] = '.'.join(filter(None, [module_name, name]))
             autoname = True
 
         # - Create and register class.
@@ -162,6 +162,9 @@ class BaseTask(object):
     #: Execution strategy used, or the qualified name of one.
     Strategy = "celery.worker.strategy:default"
 
+    #: This is the instance bound to if the task is a method of a class.
+    __self__ = None
+
     #: The application instance associated with this task class.
     _app = None
 
@@ -545,6 +548,10 @@ class BaseTask(object):
         router = router or self.app.amqp.router
         conf = app.conf
 
+        # add 'self' if this is a bound method.
+        if self.__self__ is not None:
+            args = (self.__self__, ) + tuple(args)
+
         if conf.CELERY_ALWAYS_EAGER:
             return self.apply(args, kwargs, task_id=task_id, **options)
         options = dict(extract_exec_options(self), **options)

+ 21 - 0
celery/contrib/methods.py

@@ -0,0 +1,21 @@
+from __future__ import absolute_import
+
+from functools import partial
+
+from celery import task as _task
+
+
+class task_method(object):
+
+    def __init__(self, task, *args, **kwargs):
+        self.task = task
+
+    def __get__(self, obj, type=None):
+        if obj is None:
+            return self.task
+        task = self.task.__class__()
+        task.__self__ = obj
+        return task
+
+
+task = partial(_task, filter=task_method)