瀏覽代碼

Merge branch 'heyman/master'

Conflicts:
	celery/bin/celeryd.py
Ask Solem 16 年之前
父節點
當前提交
f9147e45d1
共有 6 個文件被更改,包括 193 次插入52 次删除
  1. 31 1
      celery/backends/cache.py
  2. 13 8
      celery/bin/celeryd.py
  3. 126 40
      celery/fields.py
  4. 12 0
      celery/loaders/__init__.py
  5. 1 1
      celery/utils.py
  6. 10 2
      setup.py

+ 31 - 1
celery/backends/cache.py

@@ -1,7 +1,37 @@
 """celery.backends.cache"""
 """celery.backends.cache"""
 from django.core.cache import cache
 from django.core.cache import cache
+from django.core.cache.backends.base import InvalidCacheBackendError
+from django.utils.encoding import smart_str
 from celery.backends.base import KeyValueStoreBackend
 from celery.backends.base import KeyValueStoreBackend
 
 
+class DjangoMemcacheWrapper(object):
+    """
+    Wrapper class to django's memcache backend class, that overrides the get method in
+    order to remove the forcing of unicode strings since it may cause binary or pickled data
+    to break.
+    """
+    def __init__(self, cache):
+        self.cache = cache
+    
+    def get(self, key, default=None):
+        val = self.cache._cache.get(smart_str(key))
+        if val is None:
+            return default
+        else:
+            return val
+    
+    def set(self, key, value, timeout=0):
+        self.cache.set(key, value, timeout)
+
+# Check if django is using memcache as the cache backend. If so, wrap the cache object in
+# our DjangoMemcacheWrapper that fixes a bug with retrieving pickled data
+try:
+    from django.core.cache.backends.memcached import CacheClass
+    if isinstance(cache, CacheClass):
+        cache = DjangoMemcacheWrapper(cache)
+except InvalidCacheBackendError:
+    pass
+
 
 
 class Backend(KeyValueStoreBackend):
 class Backend(KeyValueStoreBackend):
     """Backend using the Django cache framework to store task metadata."""
     """Backend using the Django cache framework to store task metadata."""
@@ -10,4 +40,4 @@ class Backend(KeyValueStoreBackend):
         return cache.get(key)
         return cache.get(key)
 
 
     def set(self, key, value):
     def set(self, key, value):
-        cache.set(key, value)
+        cache.set(key, value)

+ 13 - 8
celery/bin/celeryd.py

@@ -80,7 +80,7 @@ from celery import conf
 from celery import discovery
 from celery import discovery
 from celery.task import discard_all
 from celery.task import discard_all
 from celery.worker import WorkController
 from celery.worker import WorkController
-from signal import signal, SIGHUP, SIGCLD, SIG_DFL
+import signal
 import multiprocessing
 import multiprocessing
 import traceback
 import traceback
 import optparse
 import optparse
@@ -183,14 +183,16 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         statistics=None, **kwargs):
         statistics=None, **kwargs):
     """Starts the celery worker server."""
     """Starts the celery worker server."""
 
 
-    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode it)
-    # lets the parent wait() for the terminated child process and stops
-    # 'OSError: [Errno 10] No child processes' problem.
-    signal(SIGCLD, SIG_DFL)
+    # set SIGCLD back to the default SIG_DFL (before python-daemon overrode
+    # it) lets the parent wait() for the terminated child process and stops
+    # the 'OSError: [Errno 10] No child processes' problem.
+
+    if hasattr(signal, "SIGCLD"): # Make sure the platform supports signals.
+        signal.signal(signal.SIGCLD, signal.SIG_DFL)
     
     
     print("Celery %s is starting." % __version__)
     print("Celery %s is starting." % __version__)
 
 
-    if statistics:
+    if statistics is not None:
         settings.CELERY_STATISTICS = statistics
         settings.CELERY_STATISTICS = statistics
 
 
     if not concurrency:
     if not concurrency:
@@ -271,7 +273,8 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
                                 logfile=logfile,
                                 logfile=logfile,
                                 is_detached=detach)
                                 is_detached=detach)
 
 
-        # Install signal handler that restarts celeryd on SIGHUP
+        # Install signal handler that restarts celeryd on SIGHUP,
+        # (only on POSIX systems)
         install_restart_signal_handler(worker)
         install_restart_signal_handler(worker)
 
 
         try:
         try:
@@ -295,6 +298,8 @@ def install_restart_signal_handler(worker):
     """Installs a signal handler that restarts the current program
     """Installs a signal handler that restarts the current program
     when it receives the ``SIGHUP`` signal.
     when it receives the ``SIGHUP`` signal.
     """
     """
+    if not hasattr(signal, "SIGHUP"):
+        return  # platform is not POSIX
 
 
     def restart_self(signum, frame):
     def restart_self(signum, frame):
         """Signal handler restarting the current python program."""
         """Signal handler restarting the current python program."""
@@ -309,7 +314,7 @@ def install_restart_signal_handler(worker):
             worker.stop()
             worker.stop()
         os.execv(sys.executable, [sys.executable] + sys.argv)
         os.execv(sys.executable, [sys.executable] + sys.argv)
 
 
-    signal(SIGHUP, restart_self)
+    signal.signal(signal.SIGHUP, restart_self)
 
 
 
 
 def parse_options(arguments):
 def parse_options(arguments):

+ 126 - 40
celery/fields.py

@@ -3,62 +3,148 @@
 Custom Django Model Fields.
 Custom Django Model Fields.
 
 
 """
 """
-from django.db import models
-from django.conf import settings
-from celery.serialization import pickle
 
 
+from copy import deepcopy
+from base64 import b64encode, b64decode
+from zlib import compress, decompress
+try:
+    from cPickle import loads, dumps
+except ImportError:
+    from pickle import loads, dumps
+
+from django.db import models
+from django.utils.encoding import force_unicode
 
 
 class PickledObject(str):
 class PickledObject(str):
-    """A subclass of string so it can be told whether a string is
-       a pickled object or not (if the object is an instance of this class
-       then it must [well, should] be a pickled one)."""
+    """
+    A subclass of string so it can be told whether a string is a pickled
+    object or not (if the object is an instance of this class then it must
+    [well, should] be a pickled one).
+    
+    Only really useful for passing pre-encoded values to ``default``
+    with ``dbsafe_encode``, not that doing so is necessary. If you
+    remove PickledObject and its references, you won't be able to pass
+    in pre-encoded values anymore, but you can always just pass in the
+    python objects themselves.
+    
+    """
     pass
     pass
 
 
+def dbsafe_encode(value, compress_object=False):
+    """
+    We use deepcopy() here to avoid a problem with cPickle, where dumps
+    can generate different character streams for same lookup value if
+    they are referenced differently. 
+    
+    The reason this is important is because we do all of our lookups as
+    simple string matches, thus the character streams must be the same
+    for the lookups to work properly. See tests.py for more information.
+    """
+    if not compress_object:
+        value = b64encode(dumps(deepcopy(value)))
+    else:
+        value = b64encode(compress(dumps(deepcopy(value))))
+    return PickledObject(value)
 
 
-if settings.DATABASE_ENGINE == "postgresql_psycopg2":
-    import psycopg2.extensions
-    # register PickledObject as a QuotedString otherwise we will see
-    # can't adapt errors from psycopg2.
-    psycopg2.extensions.register_adapter(PickledObject,
-            psycopg2.extensions.QuotedString)
-
+def dbsafe_decode(value, compress_object=False):
+    if not compress_object:
+        value = loads(b64decode(value))
+    else:
+        value = loads(decompress(b64decode(value)))
+    return value
 
 
 class PickledObjectField(models.Field):
 class PickledObjectField(models.Field):
-    """A field that automatically pickles/unpickles its value."""
+    """
+    A field that will accept *any* python object and store it in the
+    database. PickledObjectField will optionally compress it's values if
+    declared with the keyword argument ``compress=True``.
+    
+    Does not actually encode and compress ``None`` objects (although you
+    can still do lookups using None). This way, it is still possible to
+    use the ``isnull`` lookup type correctly. Because of this, the field
+    defaults to ``null=True``, as otherwise it wouldn't be able to store
+    None values since they aren't pickled and encoded.
+    
+    """
     __metaclass__ = models.SubfieldBase
     __metaclass__ = models.SubfieldBase
+    
+    def __init__(self, *args, **kwargs):
+        self.compress = kwargs.pop('compress', False)
+        self.protocol = kwargs.pop('protocol', 2)
+        kwargs.setdefault('null', True)
+        kwargs.setdefault('editable', False)
+        super(PickledObjectField, self).__init__(*args, **kwargs)
+    
+    def get_default(self):
+        """
+        Returns the default value for this field.
+        
+        The default implementation on models.Field calls force_unicode
+        on the default, which means you can't set arbitrary Python
+        objects as the default. To fix this, we just return the value
+        without calling force_unicode on it. Note that if you set a
+        callable as a default, the field will still call it. It will
+        *not* try to pickle and encode it.
+        
+        """
+        if self.has_default():
+            if callable(self.default):
+                return self.default()
+            return self.default
+        # If the field doesn't have a default, then we punt to models.Field.
+        return super(PickledObjectField, self).get_default()
 
 
     def to_python(self, value):
     def to_python(self, value):
-        """Convert the database value to a python value."""
-        if isinstance(value, PickledObject):
-            # If the value is a definite pickle; and an error is
-            # raised in de-pickling it should be allowed to propogate.
-            return pickle.loads(str(value))
-        else:
+        """
+        B64decode and unpickle the object, optionally decompressing it.
+        
+        If an error is raised in de-pickling and we're sure the value is
+        a definite pickle, the error is allowed to propogate. If we
+        aren't sure if the value is a pickle or not, then we catch the
+        error and return the original value instead.
+        
+        """
+        if value is not None:
             try:
             try:
-                return pickle.loads(str(value))
-            except Exception:
-                # If an error was raised, just return the plain value
-                return value
+                value = dbsafe_decode(value, self.compress)
+            except:
+                # If the value is a definite pickle; and an error is raised in
+                # de-pickling it should be allowed to propogate.
+                if isinstance(value, PickledObject):
+                    raise
+        return value
 
 
-    def get_db_prep_save(self, value):
-        """get_db_prep_save"""
+    def get_db_prep_value(self, value):
+        """
+        Pickle and b64encode the object, optionally compressing it.
+        
+        The pickling protocol is specified explicitly (by default 2),
+        rather than as -1 or HIGHEST_PROTOCOL, because we don't want the
+        protocol to change over time. If it did, ``exact`` and ``in``
+        lookups would likely fail, since pickle would now be generating
+        a different string. 
+        
+        """
         if value is not None and not isinstance(value, PickledObject):
         if value is not None and not isinstance(value, PickledObject):
-            value = PickledObject(pickle.dumps(value))
+            # We call force_unicode here explicitly, so that the encoded string
+            # isn't rejected by the postgresql_psycopg2 backend. Alternatively,
+            # we could have just registered PickledObject with the psycopg
+            # marshaller (telling it to store it like it would a string), but
+            # since both of these methods result in the same value being stored,
+            # doing things this way is much easier.
+            value = force_unicode(dbsafe_encode(value, self.compress))
         return value
         return value
 
 
-    def get_internal_type(self):
-        """The database field type used by this field."""
-        return 'TextField'
+    def value_to_string(self, obj):
+        value = self._get_val_from_obj(obj)
+        return self.get_db_prep_value(value)
 
 
+    def get_internal_type(self): 
+        return 'TextField'
+    
     def get_db_prep_lookup(self, lookup_type, value):
     def get_db_prep_lookup(self, lookup_type, value):
-        """get_db_prep_lookup"""
-        if lookup_type == 'exact':
-            value = self.get_db_prep_save(value)
-            return super(PickledObjectField, self).get_db_prep_lookup(
-                    lookup_type, value)
-        elif lookup_type == 'in':
-            value = [self.get_db_prep_save(v) for v in value]
-            return super(PickledObjectField, self).get_db_prep_lookup(
-                    lookup_type, value)
-        else:
+        if lookup_type not in ['exact', 'in', 'isnull']:
             raise TypeError('Lookup type %s is not supported.' % lookup_type)
             raise TypeError('Lookup type %s is not supported.' % lookup_type)
+        # The Field model already calls get_db_prep_value before doing the
+        # actual lookup, so all we need to do is limit the lookup types.
+        return super(PickledObjectField, self).get_db_prep_lookup(lookup_type, value)

+ 12 - 0
celery/loaders/__init__.py

@@ -1,6 +1,7 @@
 from celery.loaders.djangoapp import Loader as DjangoLoader
 from celery.loaders.djangoapp import Loader as DjangoLoader
 from celery.loaders.default import Loader as DefaultLoader
 from celery.loaders.default import Loader as DefaultLoader
 from django.conf import settings
 from django.conf import settings
+from django.core.management import setup_environ
 
 
 """
 """
 .. class:: Loader
 .. class:: Loader
@@ -11,6 +12,17 @@ The current loader class.
 Loader = DefaultLoader
 Loader = DefaultLoader
 if settings.configured:
 if settings.configured:
     Loader = DjangoLoader
     Loader = DjangoLoader
+else:
+    # We might still be running celery with django, because worker processes
+    # spawned with celery running through manage.py, will not have had their
+    # django environment set up
+    try:
+        # If we can import 'settings', assume we're running celery with django
+        import settings as project_settings
+        setup_environ(project_settings)
+        Loader = DjangoLoader
+    except ImportError:
+        pass
 
 
 """
 """
 .. data:: current_loader
 .. data:: current_loader

+ 1 - 1
celery/utils.py

@@ -50,7 +50,7 @@ def gen_unique_id():
         buffer = ctypes.create_string_buffer(16)
         buffer = ctypes.create_string_buffer(16)
         _uuid_generate_random(buffer)
         _uuid_generate_random(buffer)
         return str(UUID(bytes=buffer.raw))
         return str(UUID(bytes=buffer.raw))
-    return str(uuid.uuid4())
+    return str(uuid4())
 
 
 
 
 def mitemgetter(*keys):
 def mitemgetter(*keys):

+ 10 - 2
setup.py

@@ -42,8 +42,16 @@ class RunTests(Command):
 
 
 install_requires = ["django-unittest-depth",
 install_requires = ["django-unittest-depth",
                     "anyjson",
                     "anyjson",
-                    "carrot>=0.5.2",
-                    "python-daemon"]
+                    "carrot>=0.5.2"]
+
+# python-daemon doesn't run on windows, so check current platform
+if sys.platform == "win32":
+    print
+    print "I see you are using windows. You will not be able to run celery in daemon mode with the --detach parameter."
+    print
+else:
+    install_requires.append("python-daemon")
+
 py_version_info = sys.version_info
 py_version_info = sys.version_info
 py_major_version = py_version_info[0]
 py_major_version = py_version_info[0]
 py_minor_version = py_version_info[1]
 py_minor_version = py_version_info[1]