Browse Source

Removes the metaclass for the new task base class (tasks are no longer automatically registered, the @task decorator binds and registers the task)

Ask Solem 11 years ago
parent
commit
c016714be8

+ 13 - 6
celery/app/base.py

@@ -33,6 +33,7 @@ from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
 from celery.five import items, values
 from celery.loaders import get_loader_cls
 from celery.local import PromiseProxy, maybe_evaluate
+from celery.utils import gen_task_name
 from celery.utils.dispatch import Signal
 from celery.utils.functional import first, maybe_list
 from celery.utils.imports import instantiate, symbol_by_name
@@ -224,16 +225,16 @@ class Celery(object):
 
     def task(self, *args, **opts):
         """Creates new task class from any callable."""
-        if _EXECV and not opts.get('_force_evaluate'):
+        if _EXECV and opts.get('lazy', True):
             # When using execv the task in the original module will point to a
             # different app, so doing things like 'add.request' will point to
-            # a differnt task instance.  This makes sure it will always use
+            # a different task instance.  This makes sure it will always use
             # the task instance from the current app.
             # Really need a better solution for this :(
             from . import shared_task
-            return shared_task(*args, _force_evaluate=True, **opts)
+            return shared_task(*args, lazy=False, **opts)
 
-        def inner_create_task_cls(shared=True, filter=None, **opts):
+        def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
             _filt = filter  # stupid 2to3
 
             def _create_task_cls(fun):
@@ -241,7 +242,7 @@ class Celery(object):
                     cons = lambda app: app._task_from_fun(fun, **opts)
                     cons.__name__ = fun.__name__
                     connect_on_app_finalize(cons)
-                if self.finalized or opts.get('_force_evaluate'):
+                if not lazy or self.finalized:
                     ret = self._task_from_fun(fun, **opts)
                 else:
                     # return a proxy object that evaluates on first use
@@ -264,19 +265,25 @@ class Celery(object):
                     sum([len(args), len(opts)])))
         return inner_create_task_cls(**opts)
 
-    def _task_from_fun(self, fun, **options):
+    def _task_from_fun(self, fun, name=None, **options):
         if not self.finalized and not self.autofinalize:
             raise RuntimeError('Contract breach: app not finalized')
         base = options.pop('base', None) or self.Task
         bind = options.pop('bind', False)
 
+        name = name or gen_task_name(self, fun.__name__, fun.__module__)
+
         T = type(fun.__name__, (base, ), dict({
             'app': self,
+            'name': name,
             'run': fun if bind else staticmethod(fun),
             '_decorated': True,
             '__doc__': fun.__doc__,
             '__module__': fun.__module__,
             '__wrapped__': fun}, **options))()
+        if T.name not in self._tasks:
+            self._tasks.register(T)
+            T.bind(self)  # connects task to this app
         task = self._tasks[T.name]  # return global instance.
         return task
 

+ 40 - 53
celery/app/builtins.py

@@ -27,8 +27,7 @@ def add_backend_cleanup_task(app):
     :program:`celery beat` to be running).
 
     """
-    @app.task(name='celery.backend_cleanup',
-              shared=False, _force_evaluate=True)
+    @app.task(name='celery.backend_cleanup', shared=False, lazy=False)
     def backend_cleanup():
         app.backend.cleanup()
     return backend_cleanup
@@ -48,7 +47,7 @@ def add_unlock_chord_task(app):
     default_propagate = app.conf.CELERY_CHORD_PROPAGATES
 
     @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
-              default_retry_delay=1, ignore_result=True, _force_evaluate=True)
+              default_retry_delay=1, ignore_result=True, lazy=False)
     def unlock_chord(group_id, callback, interval=None, propagate=None,
                      max_retries=None, result=None,
                      Result=app.AsyncResult, GroupResult=app.GroupResult,
@@ -106,7 +105,7 @@ def add_unlock_chord_task(app):
 def add_map_task(app):
     from celery.canvas import signature
 
-    @app.task(name='celery.map', shared=False, _force_evaluate=True)
+    @app.task(name='celery.map', shared=False, lazy=False)
     def xmap(task, it):
         task = signature(task, app=app).type
         return [task(item) for item in it]
@@ -117,7 +116,7 @@ def add_map_task(app):
 def add_starmap_task(app):
     from celery.canvas import signature
 
-    @app.task(name='celery.starmap', shared=False, _force_evaluate=True)
+    @app.task(name='celery.starmap', shared=False, lazy=False)
     def xstarmap(task, it):
         task = signature(task, app=app).type
         return [task(*item) for item in it]
@@ -128,7 +127,7 @@ def add_starmap_task(app):
 def add_chunk_task(app):
     from celery.canvas import chunks as _chunks
 
-    @app.task(name='celery.chunks', shared=False, _force_evaluate=True)
+    @app.task(name='celery.chunks', shared=False, lazy=False)
     def chunks(task, it, n):
         return _chunks.apply_chunks(task, it, n)
     return chunks
@@ -137,43 +136,35 @@ def add_chunk_task(app):
 @connect_on_app_finalize
 def add_group_task(app):
     """No longer used, but here for backwards compatibility."""
-    _app = app
     from celery.canvas import maybe_signature
     from celery.result import result_from_tuple
 
-    class Group(app.Task):
-        app = _app
-        name = 'celery.group'
-        _decorated = True
-
-        def run(self, tasks, result, group_id, partial_args,
-                add_to_parent=True):
-            app = self.app
-            result = result_from_tuple(result, app)
-            # any partial args are added to all tasks in the group
-            taskit = (maybe_signature(task, app=app).clone(partial_args)
-                      for i, task in enumerate(tasks))
-            with app.producer_or_acquire() as pub:
-                [stask.apply_async(group_id=group_id, producer=pub,
-                                   add_to_parent=False) for stask in taskit]
-            parent = get_current_worker_task()
-            if add_to_parent and parent:
-                parent.add_trail(result)
-            return result
-    return Group
+    @app.task(name='celery.group', bind=True, shared=False, lazy=False)
+    def group(self, tasks, result, group_id, partial_args, add_to_parent=True):
+        app = self.app
+        result = result_from_tuple(result, app)
+        # any partial args are added to all tasks in the group
+        taskit = (maybe_signature(task, app=app).clone(partial_args)
+                  for i, task in enumerate(tasks))
+        with app.producer_or_acquire() as producer:
+            [stask.apply_async(group_id=group_id, producer=producer,
+                               add_to_parent=False) for stask in taskit]
+        parent = get_current_worker_task()
+        if add_to_parent and parent:
+            parent.add_trail(result)
+        return result
+    return group
 
 
 @connect_on_app_finalize
 def add_chain_task(app):
     """No longer used, but here for backwards compatibility."""
-    _app = app
 
-    class Chain(app.Task):
-        app = _app
-        name = 'celery.chain'
-        _decorated = True
+    @app.task(name='celery.chain', shared=False, lazy=False)
+    def chain(*args, **kwargs):
+        raise NotImplementedError('chain is not a real task')
+    return chain
 
-    return Chain
 
 
 @connect_on_app_finalize
@@ -183,23 +174,19 @@ def add_chord_task(app):
     from celery.canvas import maybe_signature
     _app = app
 
-    class Chord(app.Task):
-        app = _app
-        name = 'celery.chord'
-        ignore_result = False
-        _decorated = True
-
-        def run(self, header, body, partial_args=(), interval=None,
-                countdown=1, max_retries=None, propagate=None,
-                eager=False, **kwargs):
-            app = self.app
-            # - convert back to group if serialized
-            tasks = header.tasks if isinstance(header, group) else header
-            header = group([
-                maybe_signature(s, app=app) for s in tasks
-            ], app=self.app)
-            body = maybe_signature(body, app=app)
-            ch = _chord(header, body)
-            return ch.run(header, body, partial_args, app, interval,
-                          countdown, max_retries, propagate, **kwargs)
-    return Chord
+    @app.task(name='celery.chord', bind=True, ignore_result=False,
+              shared=False, lazy=False)
+    def chord(self, header, body, partial_args=(), interval=None,
+              countdown=1, max_retries=None, propagate=None,
+              eager=False, **kwargs):
+        app = self.app
+        # - convert back to group if serialized
+        tasks = header.tasks if isinstance(header, group) else header
+        header = group([
+            maybe_signature(s, app=app) for s in tasks
+        ], app=self.app)
+        body = maybe_signature(body, app=app)
+        ch = _chord(header, body)
+        return ch.run(header, body, partial_args, app, interval,
+                      countdown, max_retries, propagate, **kwargs)
+    return chord

+ 2 - 97
celery/app/task.py

@@ -17,10 +17,9 @@ from celery import states
 from celery._state import _task_stack
 from celery.canvas import signature
 from celery.exceptions import MaxRetriesExceededError, Reject, Retry
-from celery.five import class_property, items, with_metaclass
-from celery.local import Proxy
+from celery.five import class_property, items
 from celery.result import EagerResult
-from celery.utils import gen_task_name, uuid, maybe_reraise
+from celery.utils import uuid, maybe_reraise
 from celery.utils.functional import mattrgetter, maybe_list
 from celery.utils.imports import instantiate
 from celery.utils.mail import ErrorMail
@@ -45,22 +44,6 @@ R_SELF_TASK = '<@task {0.name} bound to other {0.__self__}>'
 R_INSTANCE = '<@task: {0.name} of {app}{flags}>'
 
 
-class _CompatShared(object):
-
-    def __init__(self, name, cons):
-        self.name = name
-        self.cons = cons
-
-    def __hash__(self):
-        return hash(self.name)
-
-    def __repr__(self):
-        return '<OldTask: %r>' % (self.name, )
-
-    def __call__(self, app):
-        return self.cons(app)
-
-
 def _strflags(flags, default=''):
     if flags:
         return ' ({0})'.format(', '.join(flags))
@@ -130,84 +113,6 @@ class Context(object):
         return self._children
 
 
-class TaskType(type):
-    """Meta class for tasks.
-
-    Automatically registers the task in the task registry (except
-    if the :attr:`Task.abstract`` attribute is set).
-
-    If no :attr:`Task.name` attribute is provided, then the name is generated
-    from the module and class name.
-
-    """
-    _creation_count = {}  # used by old non-abstract task classes
-
-    def __new__(cls, name, bases, attrs):
-        new = super(TaskType, cls).__new__
-        task_module = attrs.get('__module__') or '__main__'
-
-        # - Abstract class: abstract attribute should not be inherited.
-        abstract = attrs.pop('abstract', None)
-        if abstract or not attrs.get('autoregister', True):
-            return new(cls, name, bases, attrs)
-
-        # The 'app' attribute is now a property, with the real app located
-        # in the '_app' attribute.  Previously this was a regular attribute,
-        # so we should support classes defining it.
-        app = attrs.pop('_app', None) or attrs.pop('app', None)
-
-        # Attempt to inherit app from one the bases
-        if not isinstance(app, Proxy) and app is None:
-            for base in bases:
-                if getattr(base, '_app', None):
-                    app = base._app
-                    break
-            else:
-                app = current_app._get_current_object()
-        attrs['_app'] = app
-
-        # - Automatically generate missing/empty name.
-        task_name = attrs.get('name')
-        if not task_name:
-            attrs['name'] = task_name = gen_task_name(app, name, task_module)
-
-        if not attrs.get('_decorated'):
-            # non decorated tasks must also be shared in case
-            # an app is created multiple times due to modules
-            # imported under multiple names.
-            # Hairy stuff,  here to be compatible with 2.x.
-            # People should not use non-abstract task classes anymore,
-            # use the task decorator.
-            from celery._state import connect_on_app_finalize
-            unique_name = '.'.join([task_module, name])
-            if unique_name not in cls._creation_count:
-                # the creation count is used as a safety
-                # so that the same task is not added recursively
-                # to the set of constructors.
-                cls._creation_count[unique_name] = 1
-                connect_on_app_finalize(_CompatShared(
-                    unique_name,
-                    lambda app: TaskType.__new__(cls, name, bases,
-                                                 dict(attrs, _app=app)),
-                ))
-
-        # - Create and register class.
-        # Because of the way import happens (recursively)
-        # we may or may not be the first time the task tries to register
-        # with the framework.  There should only be one class for each task
-        # name, so we always return the registered version.
-        tasks = app._tasks
-        if task_name not in tasks:
-            tasks.register(new(cls, name, bases, attrs))
-        instance = tasks[task_name]
-        instance.bind(app)
-        return instance.__class__
-
-    def __repr__(cls):
-        return _reprtask(cls)
-
-
-@with_metaclass(TaskType)
 class Task(object):
     """Task base class.
 

+ 99 - 3
celery/task/base.py

@@ -14,12 +14,14 @@ from __future__ import absolute_import
 from kombu import Exchange
 
 from celery import current_app
-from celery.app.task import Context, TaskType, Task as BaseTask  # noqa
-from celery.five import class_property, reclassmethod
+from celery.app.task import Context, Task as BaseTask, _reprtask
+from celery.five import class_property, reclassmethod, with_metaclass
+from celery.local import Proxy
 from celery.schedules import maybe_schedule
+from celery.utils import gen_task_name
 from celery.utils.log import get_task_logger
 
-__all__ = ['Task', 'PeriodicTask', 'task']
+__all__ = ['Context', 'Task', 'TaskType', 'PeriodicTask', 'task']
 
 #: list of methods that must be classmethods in the old API.
 _COMPAT_CLASSMETHODS = (
@@ -29,6 +31,100 @@ _COMPAT_CLASSMETHODS = (
 )
 
 
+class _CompatShared(object):
+
+    def __init__(self, name, cons):
+        self.name = name
+        self.cons = cons
+
+    def __hash__(self):
+        return hash(self.name)
+
+    def __repr__(self):
+        return '<OldTask: %r>' % (self.name, )
+
+    def __call__(self, app):
+        return self.cons(app)
+
+
+class TaskType(type):
+    """Meta class for tasks.
+
+    Automatically registers the task in the task registry (except
+    if the :attr:`Task.abstract`` attribute is set).
+
+    If no :attr:`Task.name` attribute is provided, then the name is generated
+    from the module and class name.
+
+    """
+    _creation_count = {}  # used by old non-abstract task classes
+
+    def __new__(cls, name, bases, attrs):
+        new = super(TaskType, cls).__new__
+        task_module = attrs.get('__module__') or '__main__'
+
+        # - Abstract class: abstract attribute should not be inherited.
+        abstract = attrs.pop('abstract', None)
+        if abstract or not attrs.get('autoregister', True):
+            return new(cls, name, bases, attrs)
+
+        # The 'app' attribute is now a property, with the real app located
+        # in the '_app' attribute.  Previously this was a regular attribute,
+        # so we should support classes defining it.
+        app = attrs.pop('_app', None) or attrs.pop('app', None)
+
+        # Attempt to inherit app from one the bases
+        if not isinstance(app, Proxy) and app is None:
+            for base in bases:
+                if getattr(base, '_app', None):
+                    app = base._app
+                    break
+            else:
+                app = current_app._get_current_object()
+        attrs['_app'] = app
+
+        # - Automatically generate missing/empty name.
+        task_name = attrs.get('name')
+        if not task_name:
+            attrs['name'] = task_name = gen_task_name(app, name, task_module)
+
+        if not attrs.get('_decorated'):
+            # non decorated tasks must also be shared in case
+            # an app is created multiple times due to modules
+            # imported under multiple names.
+            # Hairy stuff,  here to be compatible with 2.x.
+            # People should not use non-abstract task classes anymore,
+            # use the task decorator.
+            from celery._state import connect_on_app_finalize
+            unique_name = '.'.join([task_module, name])
+            if unique_name not in cls._creation_count:
+                # the creation count is used as a safety
+                # so that the same task is not added recursively
+                # to the set of constructors.
+                cls._creation_count[unique_name] = 1
+                connect_on_app_finalize(_CompatShared(
+                    unique_name,
+                    lambda app: TaskType.__new__(cls, name, bases,
+                                                 dict(attrs, _app=app)),
+                ))
+
+        # - Create and register class.
+        # Because of the way import happens (recursively)
+        # we may or may not be the first time the task tries to register
+        # with the framework.  There should only be one class for each task
+        # name, so we always return the registered version.
+        tasks = app._tasks
+        if task_name not in tasks:
+            tasks.register(new(cls, name, bases, attrs))
+        instance = tasks[task_name]
+        instance.bind(app)
+        return instance.__class__
+
+    def __repr__(cls):
+        return _reprtask(cls)
+
+
+@with_metaclass(TaskType)
 class Task(BaseTask):
     """Deprecated Task base class.
 

+ 3 - 3
celery/tests/app/test_builtins.py

@@ -78,7 +78,7 @@ class test_chunks(BuiltinsCase):
 class test_group(BuiltinsCase):
 
     def setup(self):
-        self.task = builtins.add_group_task(self.app)()
+        self.task = builtins.add_group_task(self.app)
         super(test_group, self).setup()
 
     def test_apply_async_eager(self):
@@ -125,7 +125,7 @@ class test_chain(BuiltinsCase):
 
     def setup(self):
         BuiltinsCase.setup(self)
-        self.task = builtins.add_chain_task(self.app)()
+        self.task = builtins.add_chain_task(self.app)
 
     def test_apply_async(self):
         c = self.add.s(2, 2) | self.add.s(4) | self.add.s(8)
@@ -181,7 +181,7 @@ class test_chain(BuiltinsCase):
 class test_chord(BuiltinsCase):
 
     def setup(self):
-        self.task = builtins.add_chord_task(self.app)()
+        self.task = builtins.add_chord_task(self.app)
         super(test_chord, self).setup()
 
     def test_apply_async(self):

+ 0 - 5
celery/tests/tasks/test_tasks.py

@@ -358,11 +358,6 @@ class test_tasks(TasksCase):
         finally:
             self.mytask.pop_request()
 
-    def test_task_class_repr(self):
-        self.assertIn('class Task of', repr(self.mytask.app.Task))
-        self.mytask.app.Task._app = None
-        self.assertIn('unbound', repr(self.mytask.app.Task, ))
-
     def test_annotate(self):
         with patch('celery.app.task.resolve_all_annotations') as anno:
             anno.return_value = [{'FOO': 'BAR'}]

+ 1 - 0
celery/utils/__init__.py

@@ -298,6 +298,7 @@ def jsonify(obj,
 
 def gen_task_name(app, name, module_name):
     """Generate task name from name/module pair."""
+    module_name = module_name or '__main__'
     try:
         module = sys.modules[module_name]
     except KeyError: