Browse Source

Tasks are now shared between apps by default, can be private by @task(shared=False)

Ask Solem 13 years ago
parent
commit
a1cc46a44b
3 changed files with 27 additions and 24 deletions
  1. 11 8
      celery/app/base.py
  2. 15 15
      celery/app/builtins.py
  3. 1 1
      celery/app/registry.py

+ 11 - 8
celery/app/base.py

@@ -31,8 +31,9 @@ from celery.utils.functional import first
 from celery.utils.imports import instantiate, symbol_by_name
 
 from .annotations import prepare as prepare_annotations
-from .builtins import builtin_task, load_builtin_tasks
+from .builtins import shared_task, load_shared_tasks
 from .defaults import DEFAULTS, find_deprecated_settings
+from .registry import TaskRegistry
 from .state import _tls, get_current_app
 from .utils import AppPickler, Settings, bugreport, _unpickle_app
 
@@ -55,7 +56,7 @@ class Celery(object):
     loader_cls = "celery.loaders.app:AppLoader"
     log_cls = "celery.app.log:Logging"
     control_cls = "celery.app.control:Control"
-    registry_cls = "celery.app.registry:TaskRegistry"
+    registry_cls = TaskRegistry
     _pool = None
 
     def __init__(self, main=None, loader=None, backend=None,
@@ -71,12 +72,14 @@ class Celery(object):
         self.log_cls = log or self.log_cls
         self.control_cls = control or self.control_cls
         self.set_as_current = set_as_current
-        self.registry_cls = self.registry_cls if tasks is None else tasks
+        self.registry_cls = symbol_by_name(self.registry_cls)
         self.accept_magic_kwargs = accept_magic_kwargs
 
         self.finalized = False
         self._pending = deque()
-        self._tasks = instantiate(self.registry_cls)
+        self._tasks = tasks
+        if not isinstance(self._tasks, TaskRegistry):
+            self._tasks = TaskRegistry(self._tasks or {})
 
         # these options are moved to the config to
         # simplify pickling of the app object.
@@ -106,13 +109,13 @@ class Celery(object):
     def task(self, *args, **opts):
         """Creates new task class from any callable."""
 
-        def inner_create_task_cls(builtin=False, **opts):
+        def inner_create_task_cls(shared=True, **opts):
 
             def _create_task_cls(fun):
-                if builtin:
+                if shared:
                     cons = lambda app: app._task_from_fun(fun, **opts)
                     cons.__name__ = fun.__name__
-                    builtin_task(cons)
+                    shared_task(cons)
                 if self.accept_magic_kwargs:  # compat mode
                     return self._task_from_fun(fun, **opts)
 
@@ -142,7 +145,7 @@ class Celery(object):
 
     def finalize(self):
         if not self.finalized:
-            load_builtin_tasks(self)
+            load_shared_tasks(self)
 
             pending = self._pending
             while pending:

+ 15 - 15
celery/app/builtins.py

@@ -6,28 +6,28 @@ from itertools import starmap
 
 from celery.utils import uuid
 
-#: global list of functions defining a built-in task.
-#: these are called for every app instance to setup built-in task.
-_builtin_tasks = []
+#: global list of functions defining tasks that should be
+#: added to all apps.
+_shared_tasks = []
 
 
-def builtin_task(constructor):
+def shared_task(constructor):
     """Decorator that specifies that the decorated function is a function
     that generates a built-in task.
 
     The function will then be called for every new app instance created
     (lazily, so more exactly when the task registry for that app is needed).
     """
-    _builtin_tasks.append(constructor)
+    _shared_tasks.append(constructor)
     return constructor
 
 
-def load_builtin_tasks(app):
+def load_shared_tasks(app):
     """Loads the built-in tasks for an app instance."""
-    [constructor(app) for constructor in _builtin_tasks]
+    [constructor(app) for constructor in _shared_tasks]
 
 
-@builtin_task
+@shared_task
 def add_backend_cleanup_task(app):
     """The backend cleanup task can be used to clean up the default result
     backend.
@@ -48,7 +48,7 @@ def add_backend_cleanup_task(app):
     return backend_cleanup
 
 
-@builtin_task
+@shared_task
 def add_unlock_chord_task(app):
     """The unlock chord task is used by result backends that doesn't
     have native chord support.
@@ -71,7 +71,7 @@ def add_unlock_chord_task(app):
     return unlock_chord
 
 
-@builtin_task
+@shared_task
 def add_map_task(app):
     from celery.canvas import subtask
 
@@ -81,7 +81,7 @@ def add_map_task(app):
         return list(map(task, it))
 
 
-@builtin_task
+@shared_task
 def add_starmap_task(app):
     from celery.canvas import subtask
 
@@ -91,7 +91,7 @@ def add_starmap_task(app):
         return list(starmap(task, it))
 
 
-@builtin_task
+@shared_task
 def add_chunk_task(app):
     from celery.canvas import chunks as _chunks
 
@@ -100,7 +100,7 @@ def add_chunk_task(app):
         return _chunks.apply_chunks(task, it, n)
 
 
-@builtin_task
+@shared_task
 def add_group_task(app):
     from celery.canvas import subtask
     from celery.app.state import get_current_task
@@ -154,7 +154,7 @@ def add_group_task(app):
     return Group
 
 
-@builtin_task
+@shared_task
 def add_chain_task(app):
     from celery.canvas import maybe_subtask
 
@@ -185,7 +185,7 @@ def add_chain_task(app):
     return Chain
 
 
-@builtin_task
+@shared_task
 def add_chord_task(app):
     """Every chord is executed in a dedicated task, so that the chord
     can be used as a subtask, and this generates the task

+ 1 - 1
celery/app/registry.py

@@ -13,7 +13,6 @@ from __future__ import absolute_import
 
 import inspect
 
-from celery import current_app
 from celery.exceptions import NotRegistered
 
 
@@ -60,4 +59,5 @@ class TaskRegistry(dict):
 
 
 def _unpickle_task(name):
+    from celery import current_app
     return current_app.tasks[name]