Преглед изворни кода

create task message using additional v2 parameters (#4260)

* create task message using additional v2 parameters

* removing conditional and adding compat_kwargs to as_task methods

* Update amqp.py

* Adding tests for kwargsrepr/argsrepr, upgrading test_tasks to use task protocol v2

* Adding properties argument to allow checking for task properties values
James M. Allen пре 7 година
родитељ
комит
6bdf5aea3a
3 измењених фајлова са 42 додато и 11 уклоњено
  1. 2 1
      celery/app/amqp.py
  2. 2 0
      celery/app/base.py
  3. 38 10
      t/unit/tasks/test_tasks.py

+ 2 - 1
celery/app/amqp.py

@@ -398,7 +398,8 @@ class AMQP(object):
                    chord=None, callbacks=None, errbacks=None, reply_to=None,
                    time_limit=None, soft_time_limit=None,
                    create_sent_event=False, root_id=None, parent_id=None,
-                   shadow=None, now=None, timezone=None):
+                   shadow=None, now=None, timezone=None,
+                   **compat_kwargs):
         args = args or ()
         kwargs = kwargs or {}
         utc = self.utc

+ 2 - 0
celery/app/base.py

@@ -739,6 +739,8 @@ class Celery(object):
             reply_to or self.oid, time_limit, soft_time_limit,
             self.conf.task_send_sent_event,
             root_id, parent_id, shadow, chain,
+            argsrepr=options.get('argsrepr'),
+            kwargsrepr=options.get('kwargsrepr'),
         )
 
         if connection:

+ 38 - 10
t/unit/tasks/test_tasks.py

@@ -41,7 +41,6 @@ class MockApplyTask(Task):
 class TasksCase:
 
     def setup(self):
-        self.app.conf.task_protocol = 1  # XXX  Still using proto1
         self.mytask = self.app.task(shared=False)(return_True)
 
         @self.app.task(bind=True, count=0, shared=False)
@@ -412,20 +411,28 @@ class test_tasks(TasksCase):
 
     def assert_next_task_data_equal(self, consumer, presult, task_name,
                                     test_eta=False, test_expires=False,
-                                    **kwargs):
+                                    properties=None, headers=None, **kwargs):
         next_task = consumer.queues[0].get(accept=['pickle', 'json'])
-        task_data = next_task.decode()
-        assert task_data['id'] == presult.id
-        assert task_data['task'] == task_name
-        task_kwargs = task_data.get('kwargs', {})
+        task_properties = next_task.properties
+        task_headers = next_task.headers
+        task_body = next_task.decode()
+        task_args, task_kwargs, embed = task_body
+        assert task_headers['id'] == presult.id
+        assert task_headers['task'] == task_name
         if test_eta:
-            assert isinstance(task_data.get('eta'), string_t)
-            to_datetime = parse_iso8601(task_data.get('eta'))
+            assert isinstance(task_headers.get('eta'), string_t)
+            to_datetime = parse_iso8601(task_headers.get('eta'))
             assert isinstance(to_datetime, datetime)
         if test_expires:
-            assert isinstance(task_data.get('expires'), string_t)
-            to_datetime = parse_iso8601(task_data.get('expires'))
+            assert isinstance(task_headers.get('expires'), string_t)
+            to_datetime = parse_iso8601(task_headers.get('expires'))
             assert isinstance(to_datetime, datetime)
+        properties = properties or {}
+        for arg_name, arg_value in items(properties):
+            assert task_properties.get(arg_name) == arg_value
+        headers = headers or {}
+        for arg_name, arg_value in items(headers):
+            assert task_headers.get(arg_name) == arg_value
         for arg_name, arg_value in items(kwargs):
             assert task_kwargs.get(arg_name) == arg_value
 
@@ -500,6 +507,27 @@ class test_tasks(TasksCase):
                 name='George Costanza', test_eta=True, test_expires=True,
             )
 
+            # Default argsrepr/kwargsrepr behavior
+            presult2 = self.mytask.apply_async(
+                args=('spam',), kwargs={'name': 'Jerry Seinfeld'}
+            )
+            self.assert_next_task_data_equal(
+                consumer, presult2, self.mytask.name,
+                headers={'argsrepr': "('spam',)",
+                         'kwargsrepr': "{'name': 'Jerry Seinfeld'}"},
+            )
+
+            # With argsrepr/kwargsrepr
+            presult2 = self.mytask.apply_async(
+                args=('secret',), argsrepr="'***'",
+                kwargs={'password': 'foo'}, kwargsrepr="{'password': '***'}",
+            )
+            self.assert_next_task_data_equal(
+                consumer, presult2, self.mytask.name,
+                headers={'argsrepr': "'***'",
+                         'kwargsrepr': "{'password': '***'}"},
+            )
+
             # Discarding all tasks.
             consumer.purge()
             self.mytask.apply_async()