Browse Source

Removed `celery.task.RemoteExecuteTask` and accompanying functions: `dmap`, `dmap_async`, and `execute_remote`.

Executing arbitrary code using pickle is a potential security issue if
someone gains unrestricted access to the message broker.

If you really need this functionality, then you can simply add this
manually.
Ask Solem 14 years ago
parent
commit
f4aefcdd0d

+ 7 - 0
Changelog

@@ -130,7 +130,14 @@ Important Notes
   `celery.management.commands` has now been removed as per the deprecation
   `celery.management.commands` has now been removed as per the deprecation
   timeline.
   timeline.
 
 
+* Removed `celery.task.RemoteExecuteTask` and accompanying functions:
+  `dmap`, `dmap_async`, and `execute_remote`.
 
 
+    Executing arbitrary code using pickle is a potential security issue if
+    someone gains unrestricted access to the message broker.
+
+    If you really need this functionality, then you can simply add this
+    manually.
 
 
 .. _v220-news:
 .. _v220-news:
 
 

+ 2 - 2
celery/backends/base.py

@@ -3,8 +3,8 @@ import time
 
 
 from celery import states
 from celery import states
 from celery.exceptions import TimeoutError, TaskRevokedError
 from celery.exceptions import TimeoutError, TaskRevokedError
-from celery.serialization import pickle, get_pickled_exception
-from celery.serialization import get_pickleable_exception
+from celery.utils.serialization import pickle, get_pickled_exception
+from celery.utils.serialization import get_pickleable_exception
 from celery.datastructures import LocalCache
 from celery.datastructures import LocalCache
 
 
 
 

+ 1 - 1
celery/backends/cassandra.py

@@ -15,7 +15,7 @@ from datetime import datetime
 
 
 from celery.backends.base import BaseDictBackend
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
-from celery.serialization import pickle
+from celery.utils.serialization import pickle
 from celery import states
 from celery import states
 
 
 
 

+ 1 - 1
celery/backends/mongodb.py

@@ -9,7 +9,7 @@ except ImportError:
 from celery import states
 from celery import states
 from celery.backends.base import BaseDictBackend
 from celery.backends.base import BaseDictBackend
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
-from celery.serialization import pickle
+from celery.utils.serialization import pickle
 
 
 
 
 class Bunch:
 class Bunch:

+ 3 - 59
celery/task/__init__.py

@@ -5,68 +5,12 @@ Working with tasks and task sets.
 """
 """
 import warnings
 import warnings
 
 
-from celery.serialization import pickle
 from celery.task.base import Task, PeriodicTask
 from celery.task.base import Task, PeriodicTask
-from celery.task.sets import TaskSet
-from celery.task.builtins import PingTask, ExecuteRemoteTask
-from celery.task.builtins import AsynchronousMapTask, _dmap
+from celery.task.sets import TaskSet, subtask
+from celery.task.builtins import PingTask
 from celery.task.control import discard_all
 from celery.task.control import discard_all
-from celery.task.http import HttpDispatchTask
 
 
-__all__ = ["Task", "TaskSet", "PeriodicTask", "discard_all",
-           "dmap", "dmap_async", "execute_remote", "HttpDispatchTask"]
-
-
-def dmap(fun, args, timeout=None):
-    """Distribute processing of the arguments and collect the results.
-
-    Example
-
-        >>> from celery.task import dmap
-        >>> import operator
-        >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
-        [4, 8, 16]
-
-    """
-    return _dmap(fun, args, timeout)
-
-
-def dmap_async(fun, args, timeout=None):
-    """Distribute processing of the arguments and collect the results
-    asynchronously.
-
-    :returns :class:`celery.result.AsyncResult`:
-
-    Example
-
-        >>> from celery.task import dmap_async
-        >>> import operator
-        >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
-        >>> presult
-        <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
-        >>> presult.status
-        'SUCCESS'
-        >>> presult.result
-        [4, 8, 16]
-
-    """
-    return AsynchronousMapTask.delay(pickle.dumps(fun), args, timeout=timeout)
-
-
-def execute_remote(fun, *args, **kwargs):
-    """Execute arbitrary function/object remotely.
-
-    :param fun: A callable function or object.
-    :param \*args: Positional arguments to apply to the function.
-    :param \*\*kwargs: Keyword arguments to apply to the function.
-
-    The object must be picklable, so you can't use lambdas or functions
-    defined in the REPL (the objects must have an associated module).
-
-    :returns class:`celery.result.AsyncResult`:
-
-    """
-    return ExecuteRemoteTask.delay(pickle.dumps(fun), args, kwargs)
+__all__ = ["Task", "TaskSet", "PeriodicTask", "subtask", "discard_all"]
 
 
 
 
 def ping():
 def ping():

+ 0 - 40
celery/task/builtins.py

@@ -1,8 +1,6 @@
 from celery import conf
 from celery import conf
 from celery.schedules import crontab
 from celery.schedules import crontab
-from celery.serialization import pickle
 from celery.task.base import Task
 from celery.task.base import Task
-from celery.task.sets import TaskSet
 
 
 
 
 class backend_cleanup(Task):
 class backend_cleanup(Task):
@@ -28,41 +26,3 @@ class PingTask(Task):
     def run(self, **kwargs):
     def run(self, **kwargs):
         """:returns: the string `"pong"`."""
         """:returns: the string `"pong"`."""
         return "pong"
         return "pong"
-
-
-def _dmap(fun, args, timeout=None):
-    pickled = pickle.dumps(fun)
-    arguments = [((pickled, arg, {}), {}) for arg in args]
-    ts = TaskSet(ExecuteRemoteTask, arguments)
-    return ts.apply_async().join(timeout=timeout)
-
-
-class AsynchronousMapTask(Task):
-    """Task used internally by :func:`dmap_async` and
-    :meth:`TaskSet.map_async`.  """
-    name = "celery.map_async"
-
-    def run(self, serfun, args, timeout=None, **kwargs):
-        return _dmap(pickle.loads(serfun), args, timeout=timeout)
-
-
-class ExecuteRemoteTask(Task):
-    """Execute an arbitrary function or object.
-
-    *Note* You probably want :func:`execute_remote` instead, which this
-    is an internal component of.
-
-    The object must be pickleable, so you can't use lambdas or functions
-    defined in the REPL (that is the python shell, or :program:`ipython`).
-
-    """
-    name = "celery.execute_remote"
-
-    def run(self, ser_callable, fargs, fkwargs, **kwargs):
-        """
-        :param ser_callable: A pickled function or callable object.
-        :param fargs: Positional arguments to apply to the function.
-        :param fkwargs: Keyword arguments to apply to the function.
-
-        """
-        return pickle.loads(ser_callable)(*fargs, **fkwargs)

+ 5 - 4
celery/tests/test_backends/test_base.py

@@ -2,10 +2,11 @@ import sys
 import types
 import types
 from celery.tests.utils import unittest
 from celery.tests.utils import unittest
 
 
-from celery.serialization import subclass_exception
-from celery.serialization import find_nearest_pickleable_exception as fnpe
-from celery.serialization import UnpickleableExceptionWrapper
-from celery.serialization import get_pickleable_exception as gpe
+from celery.utils.serialization import subclass_exception
+from celery.utils.serialization import \
+        find_nearest_pickleable_exception as fnpe
+from celery.utils.serialization import UnpickleableExceptionWrapper
+from celery.utils.serialization import get_pickleable_exception as gpe
 
 
 from celery import states
 from celery import states
 from celery.backends.base import BaseBackend, KeyValueStoreBackend
 from celery.backends.base import BaseBackend, KeyValueStoreBackend

+ 1 - 1
celery/tests/test_pickle.py

@@ -1,6 +1,6 @@
 from celery.tests.utils import unittest
 from celery.tests.utils import unittest
 
 
-from celery.serialization import pickle
+from celery.utils.serialization import pickle
 
 
 
 
 class RegularException(Exception):
 class RegularException(Exception):

+ 3 - 3
celery/tests/test_serialization.py

@@ -7,10 +7,10 @@ from celery.tests.utils import execute_context, mask_modules
 class TestAAPickle(unittest.TestCase):
 class TestAAPickle(unittest.TestCase):
 
 
     def test_no_cpickle(self):
     def test_no_cpickle(self):
-        prev = sys.modules.pop("celery.serialization", None)
+        prev = sys.modules.pop("celery.utils.serialization", None)
         try:
         try:
             def with_cPickle_masked(_val):
             def with_cPickle_masked(_val):
-                from celery.serialization import pickle
+                from celery.utils.serialization import pickle
                 import pickle as orig_pickle
                 import pickle as orig_pickle
                 self.assertIs(pickle.dumps, orig_pickle.dumps)
                 self.assertIs(pickle.dumps, orig_pickle.dumps)
 
 
@@ -18,4 +18,4 @@ class TestAAPickle(unittest.TestCase):
             execute_context(context, with_cPickle_masked)
             execute_context(context, with_cPickle_masked)
 
 
         finally:
         finally:
-            sys.modules["celery.serialization"] = prev
+            sys.modules["celery.utils.serialization"] = prev

+ 0 - 19
celery/tests/test_task.py

@@ -239,25 +239,6 @@ class TestCeleryTasks(unittest.TestCase):
     def test_ping(self):
     def test_ping(self):
         self.assertEqual(task.ping(), 'pong')
         self.assertEqual(task.ping(), 'pong')
 
 
-    @with_eager_tasks
-    def test_execute_remote(self):
-        self.assertEqual(task.execute_remote(return_True, ["foo"]).get(),
-                         True)
-
-    @with_eager_tasks
-    def test_dmap(self):
-        import operator
-        res = task.dmap(operator.add, zip(xrange(10), xrange(10)))
-        self.assertEqual(sum(res), sum(operator.add(x, x)
-                                    for x in xrange(10)))
-
-    @with_eager_tasks
-    def test_dmap_async(self):
-        import operator
-        res = task.dmap_async(operator.add, zip(xrange(10), xrange(10)))
-        self.assertEqual(sum(res.get()), sum(operator.add(x, x)
-                                            for x in xrange(10)))
-
     def assertNextTaskDataEqual(self, consumer, presult, task_name,
     def assertNextTaskDataEqual(self, consumer, presult, task_name,
             test_eta=False, **kwargs):
             test_eta=False, **kwargs):
         next_task = consumer.fetch()
         next_task = consumer.fetch()

+ 1 - 10
celery/tests/test_task_builtins.py

@@ -1,8 +1,7 @@
 from celery.tests.utils import unittest
 from celery.tests.utils import unittest
 
 
-from celery.task.builtins import ExecuteRemoteTask
 from celery.task.builtins import PingTask, DeleteExpiredTaskMetaTask
 from celery.task.builtins import PingTask, DeleteExpiredTaskMetaTask
-from celery.serialization import pickle
+from celery.utils.serialization import pickle
 
 
 
 
 def some_func(i):
 def some_func(i):
@@ -15,14 +14,6 @@ class TestPingTask(unittest.TestCase):
         self.assertEqual(PingTask.apply().get(), 'pong')
         self.assertEqual(PingTask.apply().get(), 'pong')
 
 
 
 
-class TestRemoteExecuteTask(unittest.TestCase):
-
-    def test_execute_remote(self):
-        self.assertEqual(ExecuteRemoteTask.apply(
-                            args=[pickle.dumps(some_func), [10], {}]).get(),
-                          100)
-
-
 class TestDeleteExpiredTaskMetaTask(unittest.TestCase):
 class TestDeleteExpiredTaskMetaTask(unittest.TestCase):
 
 
     def test_run(self):
     def test_run(self):

+ 1 - 1
celery/tests/test_worker.py

@@ -12,13 +12,13 @@ from celery.app import app_or_default
 from celery.concurrency.base import BasePool
 from celery.concurrency.base import BasePool
 from celery.decorators import task as task_dec
 from celery.decorators import task as task_dec
 from celery.decorators import periodic_task as periodic_task_dec
 from celery.decorators import periodic_task as periodic_task_dec
-from celery.serialization import pickle
 from celery.utils import gen_unique_id
 from celery.utils import gen_unique_id
 from celery.worker import WorkController
 from celery.worker import WorkController
 from celery.worker.buckets import FastQueue
 from celery.worker.buckets import FastQueue
 from celery.worker.job import TaskRequest
 from celery.worker.job import TaskRequest
 from celery.worker.consumer import Consumer as MainConsumer
 from celery.worker.consumer import Consumer as MainConsumer
 from celery.worker.consumer import QoS, RUN
 from celery.worker.consumer import QoS, RUN
+from celery.utils.serialization import pickle
 
 
 from celery.tests.compat import catch_warnings
 from celery.tests.compat import catch_warnings
 from celery.tests.utils import execute_context
 from celery.tests.utils import execute_context

+ 0 - 0
celery/serialization.py → celery/utils/serialization.py