Prechádzať zdrojové kódy

Added support for Kombu message compression using the CELERY_MESSAGE_COMPRESSION setting, or the compression argument to apply_async. Can also be set using routers.

Ask Solem 14 rokov pred
rodič
commit
c268470505
2 zmenil súbory, kde vykonal 11 pridanie a 1 odobranie
  1. 1 0
      celery/app/defaults.py
  2. 10 1
      celery/task/base.py

+ 1 - 0
celery/app/defaults.py

@@ -77,6 +77,7 @@ NAMESPACES = {
         "IMPORTS": Option((), type="tuple"),
         "IGNORE_RESULT": Option(False, type="bool"),
         "MAX_CACHED_RESULTS": Option(5000, type="int"),
+        "MESSAGE_COMPRESSION": Option(None, type="string"),
         "RESULT_BACKEND": Option("amqp"),
         "RESULT_DBURI": Option(),
         "RESULT_ENGINE_OPTIONS": Option(None, type="dict"),

+ 10 - 1
celery/task/base.py

@@ -32,7 +32,8 @@ Please use the CELERYBEAT_SCHEDULE setting instead:
 extract_exec_options = mattrgetter("queue", "routing_key",
                                    "exchange", "immediate",
                                    "mandatory", "priority",
-                                   "serializer", "delivery_mode")
+                                   "serializer", "delivery_mode",
+                                   "compression")
 _default_context = {"logfile": None,
                     "loglevel": None,
                     "id": None,
@@ -461,6 +462,12 @@ class BaseTask(object):
             :mod:`carrot.serialization.registry`. Defaults to the tasks
             :attr:`serializer` attribute.
 
+        :keyword compression: A string identifying the compression method
+            to use.  Defaults to the :setting:`CELERY_MESSAGE_COMPRESSION`
+            setting.  Can be one of ``zlib``, ``bzip2``, or any custom
+            compression methods registered with
+            :func:`kombu.compression.register`.  **Only supported by Kombu.**
+
         **Note**: If the ``CELERY_ALWAYS_EAGER`` setting is set, it will
             be replaced by a local :func:`apply` call instead.
 
@@ -470,6 +477,8 @@ class BaseTask(object):
         if self.app.conf.CELERY_ALWAYS_EAGER:
             return self.apply(args, kwargs, task_id=task_id)
 
+        options.setdefault("compression",
+                           self.app.conf.CELERY_MESSAGE_COMPRESSION)
         options = dict(extract_exec_options(self), **options)
         options = router.route(options, self.name, args, kwargs)
         exchange = options.get("exchange")