Преглед изворни кода

New App pickling strategy: Use keywords instead of args

New keyword arguments can be added by extending app.__reduce_keys__:

    class CustomApp(Celery):

        def __init__(self, my_arg=True, **kwargs):
            self.my_arg = True
            super(CustomApp, self).__init__()

        def __reduce_keys__(self):
            return dict(super(CustomApp, self).__reduce_keys__(),
                        my_arg=self.my_arg)
Ask Solem пре 12 година
родитељ
комит
ef5d4ed17c
2 измењених фајлова са 44 додато и 3 уклоњено
  1. 36 2
      celery/app/base.py
  2. 8 1
      celery/app/utils.py

+ 36 - 2
celery/app/base.py

@@ -29,18 +29,26 @@ from celery.loaders import get_loader_cls
 from celery.local import PromiseProxy, maybe_evaluate
 from celery.utils.functional import first
 from celery.utils.imports import instantiate, symbol_by_name
+from celery.utils.objects import mro_lookup
 
 from .annotations import prepare as prepare_annotations
 from .builtins import shared_task, load_shared_tasks
 from .defaults import DEFAULTS, find_deprecated_settings
 from .registry import TaskRegistry
-from .utils import AppPickler, Settings, bugreport, _unpickle_app
+from .utils import (
+    AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2,
+)
 
 DEFAULT_FIXUPS = (
     'celery.fixups.django:DjangoFixup',
 )
 
 
+def app_has_custom(app, attr):
+    return mro_lookup(app.__class__, attr, stop=(Celery, object),
+                      monkey_patched=[__name__])
+
+
 def _unpickle_appattr(reverse_name, args):
     """Given an attribute name and a list of args, gets
     the attribute from the current app and calls it."""
@@ -90,9 +98,14 @@ class Celery(object):
         if not isinstance(self._tasks, TaskRegistry):
             self._tasks = TaskRegistry(self._tasks or {})
 
+        # If the class defins a custom __reduce_args__ we need to use
+        # the old way of pickling apps, which is pickling a list of
+        # args instead of the new way that pickles a dict of keywords.
+        self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
+
         # these options are moved to the config to
         # simplify pickling of the app object.
-        self._preconf = {}
+        self._preconf = changes or {}
         if broker:
             self._preconf['BROKER_URL'] = broker
         if include:
@@ -387,13 +400,34 @@ class Celery(object):
             type(self).__name__, self.main or '__main__', id(self))
 
     def __reduce__(self):
+        if self._using_v1_reduce:
+            return self.__reduce_v1__()
+        return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
+
+    def __reduce_v1__(self):
         # Reduce only pickles the configuration changes,
         # so the default configuration doesn't have to be passed
         # between processes.
         return (_unpickle_app, (self.__class__, self.Pickler)
                               + self.__reduce_args__())
 
+    def __reduce_keys__(self):
+        """Returns keyword arguments used to reconstruct the object
+        when unpickling."""
+        return {
+            'main': self.main,
+            'changes': self.conf.changes,
+            'loader': self.loader_cls,
+            'backend': self.backend_cls,
+            'amqp': self.amqp_cls,
+            'events': self.events_cls,
+            'log': self.log_cls,
+            'control': self.control_cls,
+            'accept_magic_kwargs': self.accept_magic_kwargs,
+        }
+
     def __reduce_args__(self):
+        """Deprecated method, please use :meth:`__reduce_keys__` instead."""
         return (self.main, self.conf.changes, self.loader_cls,
                 self.backend_cls, self.amqp_cls, self.events_cls,
                 self.log_cls, self.control_cls, self.accept_magic_kwargs)

+ 8 - 1
celery/app/utils.py

@@ -101,7 +101,7 @@ class Settings(datastructures.ConfigurationView):
 
 
 class AppPickler(object):
-    """Default application pickler/unpickler."""
+    """Old application pickler/unpickler (<= 3.0)."""
 
     def __call__(self, cls, *args):
         kwargs = self.build_kwargs(*args)
@@ -127,9 +127,16 @@ class AppPickler(object):
 
 
 def _unpickle_app(cls, pickler, *args):
+    """Rebuild app for versions 2.5+"""
     return pickler()(cls, *args)
 
 
+def _unpickle_app_v2(cls, kwargs):
+    """Rebuild app for versions 3.1+"""
+    kwargs['set_as_current'] = False
+    return cls(**kwargs)
+
+
 def bugreport(app):
     """Returns a string containing information useful in bug reports."""
     import billiard