소스 검색

Fixes docs and test issues

Ask Solem 10 년 전
부모
커밋
100168871a

+ 2 - 2
README.rst

@@ -234,9 +234,9 @@ by using brackets.  Multiple bundles can be specified by separating them by
 commas.
 ::
 
-    $ pip install celery[librabbitmq]
+    $ pip install "celery[librabbitmq]"
 
-    $ pip install celery[librabbitmq,redis,auth,msgpack]
+    $ pip install "celery[librabbitmq,redis,auth,msgpack]"
 
 The following bundles are available:
 

+ 3 - 1
celery/app/trace.py

@@ -491,9 +491,11 @@ def _fast_trace_task(task, uuid, request, body, content_type,
     embed = None
     tasks, accept, hostname = _loc
     if content_type:
-        args, kwargs, embed = loads(
+        X = loads(
             body, content_type, content_encoding, accept=accept,
         )
+        print(X)
+        args, kwargs, embed = X
     else:
         args, kwargs = body
     request.update({

+ 1 - 1
celery/beat.py

@@ -478,7 +478,7 @@ class Service(object):
                 interval = self.scheduler.tick()
                 if interval:
                     debug('beat: Waking up %s.',
-                        humanize_seconds(interval, prefix='in '))
+                          humanize_seconds(interval, prefix='in '))
                     time.sleep(interval)
         except (KeyboardInterrupt, SystemExit):
             self._is_shutdown.set()

+ 5 - 3
celery/tests/case.py

@@ -869,7 +869,8 @@ def restore_logging():
         root.handlers[:] = handlers
 
 
-def TaskMessage(name, id=None, args=(), kwargs={}, **options):
+def TaskMessage(name, id=None, args=(), kwargs={}, callbacks=None,
+                errbacks=None, chain=None, **options):
     from celery import uuid
     from kombu.serialization import dumps
     id = id or uuid()
@@ -878,9 +879,10 @@ def TaskMessage(name, id=None, args=(), kwargs={}, **options):
         'id': id,
         'task': name,
     }
+    embed = {'callbacks': callbacks, 'errbacks': errbacks, 'chain': chain}
     message.headers.update(options)
     message.content_type, message.content_encoding, message.body = dumps(
-        (args, kwargs), serializer='json',
+        (args, kwargs, embed), serializer='json',
     )
-    message.payload = (args, kwargs)
+    message.payload = (args, kwargs, embed)
     return message

+ 1 - 1
celery/tests/worker/test_loops.py

@@ -158,7 +158,7 @@ class test_asynloop(AppCase):
         x, on_task, msg, strategy = self.task_context(self.add.s(2, 2))
         msg.headers.pop('task')
         on_task(msg)
-        x.on_unknown_message.assert_called_with(((2, 2), {}), msg)
+        x.on_unknown_message.assert_called_with(msg.payload, msg)
 
     def test_on_task_not_registered(self):
         x, on_task, msg, strategy = self.task_context(self.add.s(2, 2))

+ 1 - 0
celery/tests/worker/test_request.py

@@ -619,6 +619,7 @@ class test_Request(AppCase):
         self.assertIs(trace.trace_task_ret, trace._fast_trace_task)
         tid = uuid()
         message = TaskMessage(self.mytask.name, tid, args=[4])
+        assert len(message.payload) == 3
         try:
             self.mytask.__trace__ = build_tracer(
                 self.mytask.name, self.mytask, self.app.loader, 'test',

+ 1 - 1
celery/tests/worker/test_worker.py

@@ -912,7 +912,7 @@ class test_WorkController(AppCase):
                     os.environ['FORKED_BY_MULTIPROCESSING'] = "1"
                     try:
                         process_initializer(app, 'luke.worker.com')
-                        S.assert_called_with(app)
+                        S.assert_called_with(app, 'luke.worker.com')
                     finally:
                         os.environ.pop('FORKED_BY_MULTIPROCESSING', None)
 

+ 2 - 2
celery/worker/request.py

@@ -207,10 +207,10 @@ class Request(object):
             self.acknowledge()
 
         request = self.request_dict
-        args, kwargs = self.message.payload
+        args, kwargs, embed = self.message.payload
         request.update({'loglevel': loglevel, 'logfile': logfile,
                         'hostname': self.hostname, 'is_eager': False,
-                        'args': args, 'kwargs': kwargs})
+                        'args': args, 'kwargs': kwargs}, **embed or {})
         retval = trace_task(self.task, self.id, args, kwargs, request,
                             hostname=self.hostname, loader=self.app.loader,
                             app=self.app)[0]

+ 8 - 0
docs/configuration.rst

@@ -1110,6 +1110,14 @@ compression schemes registered in the Kombu compression registry.
 
 The default is to send uncompressed messages.
 
+.. setting:: CELERY_TASK_PROTOCOL
+
+CELERY_TASK_PROTOCOL
+~~~~~~~~~~~~~~~~~~~~
+
+Default task message protocol version.
+Supports protocols: 1 and 2 (default is 1 for backwards compatibility).
+
 .. setting:: CELERY_TASK_RESULT_EXPIRES
 
 CELERY_TASK_RESULT_EXPIRES

BIN
docs/images/worker_graph_full.png