Sfoglia il codice sorgente

Ability to set serializer to pickle, json or yaml. globally, per task, or per execution.

Ask Solem 15 anni fa
parent
commit
f67de4a516
4 ha cambiato i file con 42 aggiunte e 9 eliminazioni
  1. 15 0
      celery/conf.py
  2. 10 2
      celery/execute.py
  3. 3 2
      celery/messaging.py
  4. 14 5
      celery/task/base.py

+ 15 - 0
celery/conf.py

@@ -20,6 +20,7 @@ DEFAULT_ALWAYS_EAGER = False
 DEFAULT_TASK_RESULT_EXPIRES = timedelta(days=5)
 DEFAULT_AMQP_CONNECTION_RETRY = True
 DEFAULT_AMQP_CONNECTION_MAX_RETRIES = 100
+DEFAULT_TASK_SERIALIZER = "pickle"
 
 """
 .. data:: LOG_LEVELS
@@ -241,3 +242,17 @@ Default is ``100`` retries.
 AMQP_CONNECTION_MAX_RETRIES = getattr(settings,
                                       "AMQP_CONNECTION_MAX_RETRIES",
                                       DEFAULT_AMQP_CONNECTION_MAX_RETRIES)
+
+"""
+.. data:: TASK_SERIALIZER
+
+A string identifying the default serialization
+method to use. Can be ``pickle`` (default),
+``json``, ``yaml``, or any custom serialization methods that have
+been registered with :mod:`carrot.serialization.registry`.
+
+Default is ``pickle``.
+
+"""
+TASK_SERIALIZER = getattr(settings, "CELERY_TASK_SERIALIZER",
+                          DEFAULT_TASK_SERIALIZER)

+ 10 - 2
celery/execute.py

@@ -21,7 +21,7 @@ import inspect
 def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
         routing_key=None, exchange=None, task_id=None,
         immediate=None, mandatory=None, priority=None, connection=None,
-        connect_timeout=AMQP_CONNECTION_TIMEOUT, **opts):
+        connect_timeout=AMQP_CONNECTION_TIMEOUT, serializer=None, **opts):
     """Run a task asynchronously by the celery daemon(s).
 
     :param task: The task to run (a callable object, or a :class:`Task`
@@ -62,6 +62,12 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
 
     :keyword priority: The task priority, a number between ``0`` and ``9``.
 
+    :keyword serializer: A string identifying the default serialization
+        method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
+        Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
+        methods that have been registered with
+        :mod:`carrot.serialization.registry`.
+
     """
     args = args or []
     kwargs = kwargs or {}
@@ -70,6 +76,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     immediate = immediate or getattr(task, "immediate", None)
     mandatory = mandatory or getattr(task, "mandatory", None)
     priority = priority or getattr(task, "priority", None)
+    serializer = serializer or getattr(task, "serializer", None)
     taskset_id = opts.get("taskset_id")
     publisher = opts.get("publisher")
     retries = opts.get("retries")
@@ -96,7 +103,8 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
                          task_id=task_id, retries=retries,
                          routing_key=routing_key, exchange=exchange,
                          mandatory=mandatory, immediate=immediate,
-                         priority=priority, eta=eta)
+                         serializer=serializer, priority=priority,
+                         eta=eta)
 
     if need_to_close_connection:
         publisher.close()

+ 3 - 2
celery/messaging.py

@@ -11,7 +11,8 @@ from celery.serialization import pickle
 
 
 MSG_OPTIONS = ("mandatory", "priority",
-               "immediate", "routing_key")
+               "immediate", "routing_key",
+               "serializer")
 
 get_msg_options = mitemgetter(*MSG_OPTIONS)
 
@@ -23,7 +24,7 @@ class TaskPublisher(Publisher):
     exchange = conf.AMQP_EXCHANGE
     exchange_type = conf.AMQP_EXCHANGE_TYPE
     routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
-    serializer = "pickle"
+    serializer = conf.TASK_SERIALIZER
     encoder = pickle.dumps
 
     def delay_task(self, task_name, task_args, task_kwargs, **kwargs):

+ 14 - 5
celery/task/base.py

@@ -1,14 +1,14 @@
 from carrot.connection import DjangoBrokerConnection
-from celery.conf import AMQP_CONNECTION_TIMEOUT
+from celery import conf
 from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
 from celery.result import TaskSetResult, EagerResult
 from celery.execute import apply_async, delay_task, apply
 from celery.utils import gen_unique_id, get_full_cls_name
-from datetime import timedelta
 from celery.registry import tasks
 from celery.serialization import pickle
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
+from datetime import timedelta
 
 
 class Task(object):
@@ -83,6 +83,14 @@ class Task(object):
         Disable all error e-mails for this task (only applicable if
         ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
 
+    .. attribute:: serializer
+
+        A string identifying the default serialization
+        method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
+        Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
+        methods that have been registered with
+        :mod:`carrot.serialization.registry`.
+
     :raises NotImplementedError: if the :attr:`name` attribute is not set.
 
     The resulting class is callable, which if called will apply the
@@ -129,6 +137,7 @@ class Task(object):
     disable_error_emails = False
     max_retries = 3
     default_retry_delay = 3 * 60
+    serializer = conf.TASK_SERIALIZER
 
     MaxRetriesExceededError = MaxRetriesExceededError
 
@@ -206,7 +215,7 @@ class Task(object):
         loglevel = kwargs.get("loglevel")
         return setup_logger(loglevel=loglevel, logfile=logfile)
 
-    def get_publisher(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
+    def get_publisher(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
         """Get a celery task message publisher.
 
         :rtype: :class:`celery.messaging.TaskPublisher`.
@@ -225,7 +234,7 @@ class Task(object):
                              exchange=self.exchange,
                              routing_key=self.routing_key)
 
-    def get_consumer(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
+    def get_consumer(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
         """Get a celery task message consumer.
 
         :rtype: :class:`celery.messaging.TaskConsumer`.
@@ -485,7 +494,7 @@ class TaskSet(object):
         self.arguments = args
         self.total = len(args)
 
-    def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
+    def run(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
         """Run all tasks in the taskset.
 
         :returns: A :class:`celery.result.TaskSetResult` instance.