Explorar el Código

create_task_message for task protocol v2

Ask Solem hace 11 años
padre
commit
663e4d3a0b
Se han modificado 2 ficheros con 81 adiciones y 12 borrados
  1. 80 12
      celery/app/amqp.py
  2. 1 0
      celery/app/defaults.py

+ 80 - 12
celery/app/amqp.py

@@ -213,6 +213,14 @@ class AMQP(object):
 
     def __init__(self, app):
         self.app = app
+        self.task_protocols = {
+            1: self.as_task_v1,
+            2: self.as_task_v2,
+        }
+
+    @cached_property
+    def create_task_message(self):
+        return self.task_protocols[self.app.conf.CELERY_TASK_PROTOCOL]
 
     @cached_property
     def _task_retry(self):
@@ -303,12 +311,70 @@ class AMQP(object):
         return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
                         self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
 
-    def create_task_message(self, task_id, name, args=None, kwargs=None,
-                            countdown=None, eta=None, group_id=None,
-                            expires=None, now=None, retries=0, chord=None,
-                            callbacks=None, errbacks=None, reply_to=None,
-                            time_limit=None, soft_time_limit=None,
-                            create_sent_event=False):
+    def as_task_v2(self, task_id, name, args=None, kwargs=None,
+                   countdown=None, eta=None, group_id=None,
+                   expires=None, now=None, retries=0, chord=None,
+                   callbacks=None, errbacks=None, reply_to=None,
+                   time_limit=None, soft_time_limit=None,
+                   create_sent_event=False, timezone=None):
+        args = args or ()
+        kwargs = kwargs or {}
+        utc = self.utc
+        if not isinstance(args, (list, tuple)):
+            raise ValueError('task args must be a list or tuple')
+        if not isinstance(kwargs, Mapping):
+            raise ValueError('task keyword arguments must be a mapping')
+        if countdown:  # convert countdown to ETA
+            now = now or self.app.now()
+            timezone = timezone or self.app.timezone
+            eta = now + timedelta(seconds=countdown)
+            if utc:
+                eta = to_utc(eta).astimezone(timezone)
+        if isinstance(expires, numbers.Real):
+            now = now or self.app.now()
+            timezone = timezone or self.app.timezone
+            expires = now + timedelta(seconds=expires)
+            if utc:
+                expires = to_utc(expires).astimezone(timezone)
+        eta = eta and eta.isoformat()
+        expires = expires and expires.isoformat()
+
+        return task_message(
+            headers={
+                'lang': 'py',
+                'c_type': name,
+                'eta': eta,
+                'expires': expires,
+                'callbacks': callbacks,
+                'errbacks': errbacks,
+                'chain': None,  # TODO
+                'group': group_id,
+                'chord': chord,
+                'retries': retries,
+                'timelimit': (time_limit, soft_time_limit),
+            },
+            properties={
+                'correlation_id': task_id,
+                'reply_to': reply_to,
+            },
+            body=(args, kwargs),
+            sent_event={
+                'uuid': task_id,
+                'name': name,
+                'args': safe_repr(args),
+                'kwargs': safe_repr(kwargs),
+                'retries': retries,
+                'eta': eta,
+                'expires': expires,
+            } if create_sent_event else None,
+        )
+
+    def as_task_v1(self, task_id, name, args=None, kwargs=None,
+                   countdown=None, eta=None, group_id=None,
+                   expires=None, now=None, timezone=None, retries=0,
+                   chord=None, callbacks=None, errbacks=None, reply_to=None,
+                   time_limit=None, soft_time_limit=None,
+                   create_sent_event=False):
         args = args or ()
         kwargs = kwargs or {}
         utc = self.utc
@@ -318,24 +384,26 @@ class AMQP(object):
             raise ValueError('task keyword arguments must be a mapping')
         if countdown:  # convert countdown to ETA
             now = now or self.app.now()
+            timezone = timezone or self.app.timezone
             eta = now + timedelta(seconds=countdown)
             if utc:
-                eta = to_utc(eta).astimezone(self.app.timezone)
+                eta = to_utc(eta).astimezone(timezone)
         if isinstance(expires, numbers.Real):
             now = now or self.app.now()
+            timezone = timezone or self.app.timezone
             expires = now + timedelta(seconds=expires)
             if utc:
-                expires = to_utc(expires).astimezone(self.app.timezone)
+                expires = to_utc(expires).astimezone(timezone)
         eta = eta and eta.isoformat()
         expires = expires and expires.isoformat()
 
         return task_message(
-            {},
-            {
+            headers={},
+            properties={
                 'correlation_id': task_id,
                 'reply_to': reply_to,
             },
-            {
+            body={
                 'task': name,
                 'id': task_id,
                 'args': args,
@@ -350,7 +418,7 @@ class AMQP(object):
                 'taskset': group_id,
                 'chord': chord,
             },
-            {
+            sent_event={
                 'uuid': task_id,
                 'name': name,
                 'args': safe_repr(args),

+ 1 - 0
celery/app/defaults.py

@@ -146,6 +146,7 @@ NAMESPACES = {
         'SEND_TASK_ERROR_EMAILS': Option(False, type='bool'),
         'SEND_TASK_SENT_EVENT': Option(False, type='bool'),
         'STORE_ERRORS_EVEN_IF_IGNORED': Option(False, type='bool'),
+        'TASK_PROTOCOL': Option(1, type='int'),
         'TASK_PUBLISH_RETRY': Option(True, type='bool'),
         'TASK_PUBLISH_RETRY_POLICY': Option({
             'max_retries': 3,