Browse Source

More efficient pickling of App objects by e.g. only pickling configuration changes

Ask Solem 14 years ago
parent
commit
0694993e93
5 changed files with 38 additions and 34 deletions
  1. 20 4
      celery/app/__init__.py
  2. 2 2
      celery/app/base.py
  3. 3 3
      celery/bin/base.py
  4. 12 25
      celery/datastructures.py
  5. 1 0
      celery/worker/__init__.py

+ 20 - 4
celery/app/__init__.py

@@ -40,9 +40,12 @@ class App(base.BaseApp):
 
     """
 
+    def set_current(self):
+        _tls.current_app = self
+
     def on_init(self):
         if self.set_as_current:
-            _tls.current_app = self
+            self.set_current()
 
     def create_task_cls(self):
         """Creates a base task class using default configuration
@@ -129,6 +132,20 @@ class App(base.BaseApp):
             return inner_create_task_cls()(*args)
         return inner_create_task_cls(**options)
 
+    def __reduce__(self):
+        return (_unpickle_app, (self.__class__,
+                                self.main,
+                                self.conf.changes,
+                                self.loader_cls,
+                                self.backend_cls))
+
+
+def _unpickle_app(cls, main, changes, loader, backend, set_as_current):
+    app = cls(main, loader=loader, backend=backend,
+                    set_as_current=False)
+    app.conf.update(changes)
+    return app
+
 
 #: The "default" loader is the default loader used by old applications.
 default_loader = os.environ.get("CELERY_LOADER") or "default"
@@ -141,12 +158,11 @@ if os.environ.get("CELERY_TRACE_APP"):
     def app_or_default(app=None):
         from traceback import print_stack
         from multiprocessing import current_process
-        global _current_app
         if app is None:
-            if _current_app:
+            if _tls.current_app:
                 print("-- RETURNING TO CURRENT APP --")
                 print_stack()
-                return _current_app
+                return _tls.current_app
             if current_process()._name == "MainProcess":
                 raise Exception("DEFAULT APP")
             print("-- RETURNING TO DEFAULT APP --")

+ 2 - 2
celery/app/base.py

@@ -15,7 +15,7 @@ from datetime import timedelta
 
 from celery import routes
 from celery.app.defaults import DEFAULTS
-from celery.datastructures import MultiDictView
+from celery.datastructures import ConfigurationView
 from celery.utils import noop, isatty
 from celery.utils.functional import wraps
 
@@ -252,7 +252,7 @@ class BaseApp(object):
         return backend_cls(app=self)
 
     def _get_config(self):
-        return self.post_config_merge(MultiDictView(
+        return self.post_config_merge(ConfigurationView(
                     self.pre_config_merge(self.loader.conf), DEFAULTS))
 
     @property

+ 3 - 3
celery/bin/base.py

@@ -30,8 +30,8 @@ class Command(object):
     Parser = OptionParser
 
     def __init__(self, app=None, get_app=None):
-        from celery.app import app_or_default
-        self.app = app_or_default(app)
+        #from celery.app import app_or_default
+        #self.app = app_or_default(app)
         self.get_app = get_app or self._get_default_app
 
     def usage(self):
@@ -79,7 +79,7 @@ class Command(object):
             os.environ["CELERY_CONFIG_MODULE"] = config_module
         if app:
             self.app = self.get_cls_by_name(app)
-        elif not self.app:
+        else:
             self.app = self.get_app(loader=loader)
         if self.enable_config_from_cmdline:
             argv = self.process_cmdline_config(argv)

+ 12 - 25
celery/datastructures.py

@@ -63,30 +63,16 @@ class DictAttribute(object):
         return vars(self.obj).iteritems()
 
 
-class MultiDictView(AttributeDictMixin):
-    """View for one more more dicts.
+class ConfigurationView(AttributeDictMixin):
+    changes = None
+    defaults = None
 
-    * When getting a key, the dicts are searched in order.
-    * When setting a key, the key is added to the first dict.
-
-    >>> d1 = {"x": 3"}
-    >>> d2 = {"x": 1, "y": 2, "z": 3}
-    >>> x = MultiDictView([d1, d2])
-
-    >>> x["x"]
-    3
-
-    >>>  x["y"]
-    2
-
-    """
-    dicts = None
-
-    def __init__(self, *dicts):
-        self.__dict__["dicts"] = dicts
+    def __init__(self, changes, defaults):
+        self.__dict__["changes"] = changes
+        self.__dict__["defaults"] = defaults
 
     def __getitem__(self, key):
-        for d in self.__dict__["dicts"]:
+        for d in self.__dict__["changes"], self.__dict__["defaults"]:
             try:
                 return d[key]
             except KeyError:
@@ -94,7 +80,7 @@ class MultiDictView(AttributeDictMixin):
         raise KeyError(key)
 
     def __setitem__(self, key, value):
-        self.__dict__["dicts"][0][key] = value
+        self.__dict__["changes"][key] = value
 
     def get(self, key, default=None):
         try:
@@ -110,10 +96,10 @@ class MultiDictView(AttributeDictMixin):
             return default
 
     def update(self, *args, **kwargs):
-        return self.__dict__["dicts"][0].update(*args, **kwargs)
+        return self.__dict__["changes"].update(*args, **kwargs)
 
     def __contains__(self, key):
-        for d in self.__dict__["dicts"]:
+        for d in self.__dict__["changes"], self.__dict__["defaults"]:
             if key in d:
                 return True
         return False
@@ -122,7 +108,8 @@ class MultiDictView(AttributeDictMixin):
         return repr(dict(iter(self)))
 
     def __iter__(self):
-        return chain(*[d.iteritems() for d in self.__dict__["dicts"]])
+        return chain(*[d.iteritems() for d in (self.__dict__["changes"],
+                                               self.__dict__["defaults"])])
 
 
 class PositionQueue(UserList):

+ 1 - 0
celery/worker/__init__.py

@@ -35,6 +35,7 @@ def process_initializer(app, hostname):
 
     """
     app = app_or_default(app)
+    app.set_current()
     [platforms.reset_signal(signal) for signal in WORKER_SIGRESET]
     [platforms.ignore_signal(signal) for signal in WORKER_SIGIGNORE]
     platforms.set_mp_process_title("celeryd", hostname=hostname)