Selaa lähdekoodia

Tests passing

Ask Solem 12 vuotta sitten
vanhempi
commit
09ab407067

+ 4 - 4
celery/app/base.py

@@ -183,10 +183,10 @@ class Celery(object):
                            self.conf.CELERY_MESSAGE_COMPRESSION)
         options = router.route(options, name, args, kwargs)
         with self.default_producer(publisher) as producer:
-            return result_cls(producer.delay_task(name, args, kwargs,
-                                                  task_id=task_id,
-                                                  countdown=countdown, eta=eta,
-                                                  expires=expires, **options))
+            return result_cls(producer.publish_task(name, args, kwargs,
+                        task_id=task_id,
+                        countdown=countdown, eta=eta,
+                        expires=expires, **options))
 
     def connection(self, hostname=None, userid=None,
             password=None, virtual_host=None, port=None, ssl=None,

+ 1 - 1
celery/app/task.py

@@ -583,7 +583,7 @@ class Task(object):
             self.apply(args=args, kwargs=kwargs, **options).get()
         else:
             self.apply_async(args=args, kwargs=kwargs, **options)
-        ret = RetryTaskError(exc, eta or countdown)
+        ret = RetryTaskError(exc=exc, when=eta or countdown)
         if throw:
             raise ret
         return ret

+ 5 - 2
celery/exceptions.py

@@ -61,7 +61,8 @@ class MaxRetriesExceededError(Exception):
 class RetryTaskError(Exception):
     """The task is to be retried later."""
 
-    def __init__(self, exc=None, when=None, **kwargs):
+    def __init__(self, message=None, exc=None, when=None, **kwargs):
+        self.message = message
         if isinstance(exc, basestring):
             self.exc, self.excs = None, exc
         else:
@@ -75,12 +76,14 @@ class RetryTaskError(Exception):
         return 'at %s' % (self.when, )
 
     def __str__(self):
+        if self.message:
+            return self.message
         if self.excs:
             return 'Retry %s: %r' % (self.humanize(), self.excs)
         return 'Retry %s' % self.humanize()
 
     def __reduce__(self):
-        return self.__class__, (self.excs, self.when)
+        return self.__class__, (self.message, self.excs, self.when)
 
 
 class TaskRevokedError(Exception):

+ 8 - 8
celery/tests/app/test_amqp.py

@@ -25,16 +25,16 @@ class test_TaskProducer(AppCase):
         publisher.declare()
 
     def test_retry_policy(self):
-        pub = self.app.amqp.TaskProducer(Mock())
-        pub.channel.connection.client.declared_entities = set()
-        pub.delay_task('tasks.add', (2, 2), {},
-                       retry_policy={'frobulate': 32.4})
+        prod = self.app.amqp.TaskProducer(Mock())
+        prod.channel.connection.client.declared_entities = set()
+        prod.publish_task('tasks.add', (2, 2), {},
+                          retry_policy={'frobulate': 32.4})
 
     def test_publish_no_retry(self):
-        pub = self.app.amqp.TaskProducer(Mock())
-        pub.channel.connection.client.declared_entities = set()
-        pub.delay_task('tasks.add', (2, 2), {}, retry=False, chord=123)
-        self.assertFalse(pub.connection.ensure.call_count)
+        prod = self.app.amqp.TaskProducer(Mock())
+        prod.channel.connection.client.declared_entities = set()
+        prod.publish_task('tasks.add', (2, 2), {}, retry=False, chord=123)
+        self.assertFalse(prod.connection.ensure.call_count)
 
 
 class test_compat_TaskPublisher(AppCase):

+ 10 - 10
celery/tests/app/test_app.py

@@ -205,7 +205,7 @@ class test_App(Case):
         def aawsX():
             pass
 
-        with patch('celery.app.amqp.TaskProducer.delay_task') as dt:
+        with patch('celery.app.amqp.TaskProducer.publish_task') as dt:
             aawsX.apply_async((4, 5))
             args = dt.call_args[0][1]
             self.assertEqual(args, ('hello', 4, 5))
@@ -433,20 +433,20 @@ class test_App(Case):
             chan.close()
         assert conn.transport_cls == 'memory'
 
-        pub = self.app.amqp.TaskProducer(conn,
+        prod = self.app.amqp.TaskProducer(conn,
                 exchange=Exchange('foo_exchange'))
 
         dispatcher = Dispatcher()
-        self.assertTrue(pub.delay_task('footask', (), {},
-                                       exchange='moo_exchange',
-                                       routing_key='moo_exchange',
-                                       event_dispatcher=dispatcher))
+        self.assertTrue(prod.publish_task('footask', (), {},
+                                          exchange='moo_exchange',
+                                          routing_key='moo_exchange',
+                                          event_dispatcher=dispatcher))
         self.assertTrue(dispatcher.sent)
         self.assertEqual(dispatcher.sent[0][0], 'task-sent')
-        self.assertTrue(pub.delay_task('footask', (), {},
-                                       event_dispatcher=dispatcher,
-                                       exchange='bar_exchange',
-                                       routing_key='bar_exchange'))
+        self.assertTrue(prod.publish_task('footask', (), {},
+                                          event_dispatcher=dispatcher,
+                                          exchange='bar_exchange',
+                                          routing_key='bar_exchange'))
 
     def test_error_mail_sender(self):
         x = ErrorMail.subject % {'name': 'task_name',

+ 3 - 3
celery/tests/tasks/test_tasks.py

@@ -350,14 +350,14 @@ class test_tasks(Case):
         app.conf.CELERY_SEND_TASK_SENT_EVENT = True
         dispatcher = [None]
 
-        class Pub(object):
+        class Prod(object):
             channel = chan
 
-            def delay_task(self, *args, **kwargs):
+            def publish_task(self, *args, **kwargs):
                 dispatcher[0] = kwargs.get('event_dispatcher')
 
         try:
-            T1.apply_async(publisher=Pub())
+            T1.apply_async(producer=Prod())
         finally:
             app.conf.CELERY_SEND_TASK_SENT_EVENT = False
             chan.close()

+ 4 - 2
docs/userguide/calling.rst

@@ -28,9 +28,11 @@ The API defines a standard set of execution options, as well as three methods:
         Shortcut to send a task message, but does not support execution
         options.
 
-    - ``apply()``
+    - *calling* (``__call__``)
 
-        Does not send a message but executes the task inline instead.
+        Applying an object supporting the calling API (e.g. ``add(2, 2)``)
+        means that the task will be executed in the current process, and
+        not by a worker (a message will not be sent).
 
 .. _calling-cheat: