Browse Source

Moved celery.serialization + celery.utils.functional -> billiard.*

Ask Solem 15 years ago
parent
commit
0208ec9f8b

+ 4 - 3
celery/backends/base.py

@@ -1,10 +1,11 @@
 """celery.backends.base"""
 """celery.backends.base"""
 import time
 import time
 
 
+from billiard.serialization import pickle
+from billiard.serialization import get_pickled_exception
+from billiard.serialization import get_pickleable_exception
+
 from celery.exceptions import TimeoutError
 from celery.exceptions import TimeoutError
-from celery.serialization import pickle
-from celery.serialization import get_pickled_exception
-from celery.serialization import get_pickleable_exception
 
 
 EXCEPTION_STATES = frozenset(["RETRY", "FAILURE"])
 EXCEPTION_STATES = frozenset(["RETRY", "FAILURE"])
 
 

+ 1 - 1
celery/backends/mongodb.py

@@ -2,12 +2,12 @@
 from datetime import datetime
 from datetime import datetime
 
 
 from django.core.exceptions import ImproperlyConfigured
 from django.core.exceptions import ImproperlyConfigured
+from billiard.serialization import pickle
 try:
 try:
     import pymongo
     import pymongo
 except ImportError:
 except ImportError:
     pymongo = None
     pymongo = None
 
 
-from celery.serialization import pickle
 from celery.backends.base import BaseBackend
 from celery.backends.base import BaseBackend
 from celery.loaders import settings
 from celery.loaders import settings
 from celery.conf import TASK_RESULT_EXPIRES
 from celery.conf import TASK_RESULT_EXPIRES

+ 1 - 1
celery/execute.py

@@ -4,11 +4,11 @@ import traceback
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 
 
 from carrot.connection import DjangoBrokerConnection
 from carrot.connection import DjangoBrokerConnection
+from billiard.utils.functional import curry
 
 
 from celery import signals
 from celery import signals
 from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.utils import gen_unique_id, noop, fun_takes_kwargs
 from celery.utils import gen_unique_id, noop, fun_takes_kwargs
-from celery.utils.functional import curry
 from celery.result import AsyncResult, EagerResult
 from celery.result import AsyncResult, EagerResult
 from celery.registry import tasks
 from celery.registry import tasks
 from celery.messaging import TaskPublisher
 from celery.messaging import TaskPublisher

+ 0 - 6
celery/messaging.py

@@ -9,8 +9,6 @@ from celery import conf
 from celery import signals
 from celery import signals
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
 from celery.utils import mitemgetter
 from celery.utils import mitemgetter
-from celery.serialization import pickle
-
 
 
 MSG_OPTIONS = ("mandatory", "priority",
 MSG_OPTIONS = ("mandatory", "priority",
                "immediate", "routing_key",
                "immediate", "routing_key",
@@ -27,7 +25,6 @@ class TaskPublisher(Publisher):
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
     routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
     serializer = conf.TASK_SERIALIZER
     serializer = conf.TASK_SERIALIZER
-    encoder = pickle.dumps
 
 
     def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
     def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
         """Delay task for execution by the celery nodes."""
         """Delay task for execution by the celery nodes."""
@@ -77,7 +74,6 @@ class TaskConsumer(Consumer):
     exchange = conf.AMQP_EXCHANGE
     exchange = conf.AMQP_EXCHANGE
     routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
     routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     exchange_type = conf.AMQP_EXCHANGE_TYPE
-    decoder = pickle.loads
     auto_ack = False
     auto_ack = False
     no_ack = False
     no_ack = False
 
 
@@ -85,7 +81,6 @@ class TaskConsumer(Consumer):
 class StatsPublisher(Publisher):
 class StatsPublisher(Publisher):
     exchange = "celerygraph"
     exchange = "celerygraph"
     routing_key = "stats"
     routing_key = "stats"
-    encoder = pickle.dumps
 
 
 
 
 class StatsConsumer(Consumer):
 class StatsConsumer(Consumer):
@@ -93,5 +88,4 @@ class StatsConsumer(Consumer):
     exchange = "celerygraph"
     exchange = "celerygraph"
     routing_key = "stats"
     routing_key = "stats"
     exchange_type = "direct"
     exchange_type = "direct"
-    decoder = pickle.loads
     no_ack=True
     no_ack=True

+ 0 - 117
celery/serialization.py

@@ -1,117 +0,0 @@
-import operator
-try:
-    import cPickle as pickle
-except ImportError:
-    import pickle
-
-from celery.utils.functional import curry
-
-
-def find_nearest_pickleable_exception(exc):
-    """With an exception instance, iterate over its super classes (by mro)
-    and find the first super exception that is pickleable. It does
-    not go below :exc:`Exception` (i.e. it skips :exc:`Exception`,
-    :class:`BaseException` and :class:`object`). If that happens
-    you should use :exc:`UnpickleableException` instead.
-
-    :param exc: An exception instance.
-
-    :returns: the nearest exception if it's not :exc:`Exception` or below,
-        if it is it returns ``None``.
-
-    :rtype: :exc:`Exception`
-
-    """
-
-    unwanted = (Exception, BaseException, object)
-    is_unwanted = lambda exc: any(map(curry(operator.is_, exc), unwanted))
-
-    mro_ = getattr(exc.__class__, "mro", lambda: [])
-    for supercls in mro_():
-        if is_unwanted(supercls):
-            # only BaseException and object, from here on down,
-            # we don't care about these.
-            return None
-        try:
-            exc_args = getattr(exc, "args", [])
-            superexc = supercls(*exc_args)
-            pickle.dumps(superexc)
-        except:
-            pass
-        else:
-            return superexc
-    return None
-
-
-def create_exception_cls(name, module, parent=None):
-    """Dynamically create an exception class."""
-    if not parent:
-        parent = Exception
-    return type(name, (parent, ), {"__module__": module})
-
-
-class UnpickleableExceptionWrapper(Exception):
-    """Wraps unpickleable exceptions.
-
-    :param exc_module: see :attr:`exc_module`.
-
-    :param exc_cls_name: see :attr:`exc_cls_name`.
-
-    :param exc_args: see :attr:`exc_args`
-
-    .. attribute:: exc_module
-
-        The module of the original exception.
-
-    .. attribute:: exc_cls_name
-
-        The name of the original exception class.
-
-    .. attribute:: exc_args
-
-        The arguments for the original exception.
-
-    Example
-
-        >>> try:
-        ...     something_raising_unpickleable_exc()
-        >>> except Exception, e:
-        ...     exc = UnpickleableException(e.__class__.__module__,
-        ...                                 e.__class__.__name__,
-        ...                                 e.args)
-        ...     pickle.dumps(exc) # Works fine.
-
-    """
-
-    def __init__(self, exc_module, exc_cls_name, exc_args):
-        self.exc_module = exc_module
-        self.exc_cls_name = exc_cls_name
-        self.exc_args = exc_args
-        super(Exception, self).__init__(exc_module, exc_cls_name, exc_args)
-
-
-def get_pickleable_exception(exc):
-    """Make sure exception is pickleable."""
-    nearest = find_nearest_pickleable_exception(exc)
-    if nearest:
-        return nearest
-
-    try:
-        pickle.dumps(exc)
-    except pickle.PickleError:
-        excwrapper = UnpickleableExceptionWrapper(
-                        exc.__class__.__module__,
-                        exc.__class__.__name__,
-                        getattr(exc, "args", []))
-        return excwrapper
-    return exc
-
-
-def get_pickled_exception(exc):
-    """Get original exception from exception pickled using
-    :meth:`get_pickleable_exception`."""
-    if isinstance(exc, UnpickleableExceptionWrapper):
-        exc_cls = create_exception_cls(exc.exc_cls_name,
-                                       exc.exc_module)
-        return exc_cls(*exc.exc_args)
-    return exc

+ 1 - 1
celery/task/__init__.py

@@ -4,6 +4,7 @@ Working with tasks and task sets.
 
 
 """
 """
 from carrot.connection import DjangoBrokerConnection
 from carrot.connection import DjangoBrokerConnection
+from billiard.serialization import pickle
 
 
 from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.conf import AMQP_CONNECTION_TIMEOUT
 from celery.execute import apply_async
 from celery.execute import apply_async
@@ -14,7 +15,6 @@ from celery.task.base import Task, TaskSet, PeriodicTask
 from celery.task.base import ExecuteRemoteTask, AsynchronousMapTask
 from celery.task.base import ExecuteRemoteTask, AsynchronousMapTask
 from celery.task.rest import RESTProxyTask
 from celery.task.rest import RESTProxyTask
 from celery.task.builtins import DeleteExpiredTaskMetaTask, PingTask
 from celery.task.builtins import DeleteExpiredTaskMetaTask, PingTask
-from celery.serialization import pickle
 
 
 
 
 def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
 def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):

+ 1 - 1
celery/task/base.py

@@ -3,6 +3,7 @@ from datetime import timedelta
 from Queue import Queue
 from Queue import Queue
 
 
 from carrot.connection import DjangoBrokerConnection
 from carrot.connection import DjangoBrokerConnection
+from billiard.serialization import pickle
 
 
 from celery import conf
 from celery import conf
 from celery.log import setup_logger
 from celery.log import setup_logger
@@ -13,7 +14,6 @@ from celery.registry import tasks
 from celery.backends import default_backend
 from celery.backends import default_backend
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
-from celery.serialization import pickle
 
 
 
 
 class TaskType(type):
 class TaskType(type):

+ 6 - 4
celery/tests/test_backends/test_base.py

@@ -1,10 +1,12 @@
 import unittest
 import unittest
 import types
 import types
-from celery.backends.base import BaseBackend, KeyValueStoreBackend
-from celery.serialization import find_nearest_pickleable_exception as fnpe
-from celery.serialization import UnpickleableExceptionWrapper
-from celery.serialization import get_pickleable_exception as gpe
+
 from django.db.models.base import subclass_exception
 from django.db.models.base import subclass_exception
+from billiard.serialization import find_nearest_pickleable_exception as fnpe
+from billiard.serialization import UnpickleableExceptionWrapper
+from billiard.serialization import get_pickleable_exception as gpe
+
+from celery.backends.base import BaseBackend, KeyValueStoreBackend
 
 
 
 
 class wrapobject(object):
 class wrapobject(object):

+ 2 - 1
celery/tests/test_pickle.py

@@ -1,5 +1,6 @@
 import unittest
 import unittest
-from celery.serialization import pickle
+
+from billiard.serialization import pickle
 
 
 
 
 class RegularException(Exception):
 class RegularException(Exception):

+ 3 - 3
celery/tests/test_serialization.py

@@ -7,9 +7,9 @@ class TestAAPickle(unittest.TestCase):
 
 
     def test_no_cpickle(self):
     def test_no_cpickle(self):
         from celery.tests.utils import mask_modules
         from celery.tests.utils import mask_modules
-        prev = sys.modules.pop("celery.serialization")
+        prev = sys.modules.pop("billiard.serialization")
         with mask_modules("cPickle"):
         with mask_modules("cPickle"):
-            from celery.serialization import pickle
+            from billiard.serialization import pickle
             import pickle as orig_pickle
             import pickle as orig_pickle
             self.assertTrue(pickle.dumps is orig_pickle.dumps)
             self.assertTrue(pickle.dumps is orig_pickle.dumps)
-        sys.modules["celery.serialization"] = prev
+        sys.modules["billiard.serialization"] = prev

+ 4 - 2
celery/tests/test_task_builtins.py

@@ -1,7 +1,9 @@
 import unittest
 import unittest
-from celery.task.builtins import PingTask, DeleteExpiredTaskMetaTask
+
+from billiard.serialization import pickle
+
 from celery.task.base import ExecuteRemoteTask
 from celery.task.base import ExecuteRemoteTask
-from celery.serialization import pickle
+from celery.task.builtins import PingTask, DeleteExpiredTaskMetaTask
 
 
 
 
 def some_func(i):
 def some_func(i):

+ 7 - 5
celery/tests/test_worker.py

@@ -1,15 +1,17 @@
 import unittest
 import unittest
 from Queue import Queue, Empty
 from Queue import Queue, Empty
-from carrot.connection import BrokerConnection
-from celery.worker.job import TaskWrapper
-from celery.worker import CarrotListener, WorkController
+from datetime import datetime, timedelta
 from multiprocessing import get_logger
 from multiprocessing import get_logger
+
+from carrot.connection import BrokerConnection
 from carrot.backends.base import BaseMessage
 from carrot.backends.base import BaseMessage
+from billiard.serialization import pickle
+
 from celery import registry
 from celery import registry
-from celery.serialization import pickle
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
+from celery.worker import CarrotListener, WorkController
+from celery.worker.job import TaskWrapper
 from celery.worker.scheduler import Scheduler
 from celery.worker.scheduler import Scheduler
-from datetime import datetime, timedelta
 from celery.decorators import task as task_dec
 from celery.decorators import task as task_dec
 
 
 
 

+ 1 - 1
celery/utils/__init__.py → celery/utils.py

@@ -13,7 +13,7 @@ from uuid import UUID, uuid4, _uuid_generate_random
 from inspect import getargspec
 from inspect import getargspec
 from itertools import repeat
 from itertools import repeat
 
 
-from celery.utils.functional import curry
+from billiard.utils.functional import curry
 
 
 noop = lambda *args, **kwargs: None
 noop = lambda *args, **kwargs: None
 
 

+ 0 - 16
celery/utils/functional.py

@@ -1,16 +0,0 @@
-"""Functional utilities for Python 2.4 compatibility."""
-
-
-def _compat_curry(fun, *args, **kwargs):
-    """New function with partial application of the given arguments
-    and keywords."""
-
-    def _curried(*addargs, **addkwargs):
-        return fun(*(args+addargs), **dict(kwargs, **addkwargs))
-    return _curried
-
-
-try:
-    from functools import partial as curry
-except ImportError:
-    curry = _compat_curry

+ 1 - 1
celery/worker/pool.py

@@ -6,9 +6,9 @@ Process Pools.
 from multiprocessing.util import get_logger
 from multiprocessing.util import get_logger
 
 
 from billiard.pool import DynamicPool
 from billiard.pool import DynamicPool
+from billiard.utils.functional import curry
 
 
 from celery.utils import noop
 from celery.utils import noop
-from celery.utils.functional import curry
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
 
 
 
 

+ 1 - 1
contrib/doc4allmods

@@ -2,7 +2,7 @@
 
 
 PACKAGE="$1"
 PACKAGE="$1"
 SKIP_PACKAGES="$PACKAGE tests management urls"
 SKIP_PACKAGES="$PACKAGE tests management urls"
-SKIP_FILES="celery.bin.rst celery.contrib.rst celery.serialization.rst"
+SKIP_FILES="celery.bin.rst celery.contrib.rst"
 
 
 modules=$(find "$PACKAGE" -name "*.py")
 modules=$(find "$PACKAGE" -name "*.py")
 
 

+ 0 - 8
docs/internals/reference/celery.utils.functional.rst

@@ -1,8 +0,0 @@
-===================================================
- Compat Functional Utils - celery.utils.functional
-===================================================
-
-.. currentmodule:: celery.utils.functional
-
-.. automodule:: celery.utils.functional
-    :members:

+ 0 - 1
docs/internals/reference/index.rst

@@ -27,7 +27,6 @@
     celery.log
     celery.log
     celery.utils
     celery.utils
     celery.discovery
     celery.discovery
-    celery.utils.functional
     celery.platform
     celery.platform
     celery.managers
     celery.managers
     celery.models
     celery.models