Pārlūkot izejas kodu

apply_async now uses send_task

Ask Solem 11 gadi atpakaļ
vecāks
revīzija
e67f0eb351
4 mainītis faili ar 48 papildinājumiem un 57 dzēšanām
  1. 27 20
      celery/app/base.py
  2. 19 34
      celery/app/task.py
  3. 1 1
      celery/task/base.py
  4. 1 2
      celery/tests/compat_modules/test_sets.py

+ 27 - 20
celery/app/base.py

@@ -20,10 +20,12 @@ from operator import attrgetter
 from billiard.util import register_after_fork
 from kombu.clocks import LamportClock
 from kombu.common import oid_from
-from kombu.utils import cached_property
+from kombu.utils import cached_property, uuid
 
 from celery import platforms
-from celery._state import _task_stack, _tls, get_current_app, _register_app
+from celery._state import (
+    _task_stack, _tls, get_current_app, _register_app, get_current_worker_task,
+)
 from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
 from celery.five import items, values
 from celery.loaders import get_loader_cls
@@ -278,28 +280,33 @@ class Celery(object):
 
     def send_task(self, name, args=None, kwargs=None, countdown=None,
                   eta=None, task_id=None, producer=None, connection=None,
-                  result_cls=None, expires=None, queues=None, publisher=None,
-                  link=None, link_error=None,
-                  **options):
+                  router=None, result_cls=None, expires=None,
+                  publisher=None, link=None, link_error=None,
+                  add_to_parent=True, reply_to=None, **options):
+        task_id = task_id or uuid()
         producer = producer or publisher  # XXX compat
-        if self.conf.CELERY_ALWAYS_EAGER:  # pragma: no cover
+        router = router or self.amqp.router
+        conf = self.conf
+        if conf.CELERY_ALWAYS_EAGER:  # pragma: no cover
             warnings.warn(AlwaysEagerIgnored(
                 'CELERY_ALWAYS_EAGER has no effect on send_task'))
-
-        result_cls = result_cls or self.AsyncResult
-        router = self.amqp.Router(queues)
-        options.setdefault('compression',
-                           self.conf.CELERY_MESSAGE_COMPRESSION)
         options = router.route(options, name, args, kwargs)
-        with self.producer_or_acquire(producer) as producer:
-            return result_cls(producer.publish_task(
-                name, args, kwargs,
-                task_id=task_id,
-                countdown=countdown, eta=eta,
-                callbacks=maybe_list(link),
-                errbacks=maybe_list(link_error),
-                expires=expires, **options
-            ))
+        if connection:
+            producer = self.amqp.TaskProducer(connection)
+        with self.producer_or_acquire(producer) as P:
+            self.backend.on_task_call(P, task_id)
+            task_id = P.publish_task(
+                name, args, kwargs, countdown=countdown, eta=eta,
+                task_id=task_id, expires=expires,
+                callbacks=maybe_list(link), errbacks=maybe_list(link_error),
+                reply_to=reply_to or self.oid, **options
+            )
+        result = (result_cls or self.AsyncResult)(task_id)
+        if add_to_parent:
+            parent = get_current_worker_task()
+            if parent:
+                parent.add_trail(result)
+        return result
 
     def connection(self, hostname=None, userid=None, password=None,
                    virtual_host=None, port=None, ssl=None,

+ 19 - 34
celery/app/task.py

@@ -11,10 +11,11 @@ from __future__ import absolute_import
 import sys
 
 from billiard.einfo import ExceptionInfo
+from kombu.utils import cached_property
 
 from celery import current_app
 from celery import states
-from celery._state import get_current_worker_task, _task_stack
+from celery._state import _task_stack
 from celery.canvas import subtask
 from celery.exceptions import MaxRetriesExceededError, RetryTaskError
 from celery.five import class_property, items, with_metaclass
@@ -269,6 +270,8 @@ class Task(object):
     #: called.  This should probably be deprecated.
     _default_request = None
 
+    _exec_options = None
+
     __bound__ = False
 
     from_config = (
@@ -388,10 +391,8 @@ class Task(object):
         """
         return self.apply_async(args, kwargs)
 
-    def apply_async(self, args=None, kwargs=None,
-                    task_id=None, producer=None, connection=None, router=None,
-                    link=None, link_error=None, publisher=None,
-                    add_to_parent=True, reply_to=None, **options):
+    def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
+                    link=None, link_error=None, **options):
         """Apply tasks asynchronously by sending a message.
 
         :keyword args: The positional arguments to pass on to the
@@ -479,42 +480,21 @@ class Task(object):
             be replaced by a local :func:`apply` call instead.
 
         """
-        task_id = task_id or uuid()
-        producer = producer or publisher
-        app = self._get_app()
-        router = router or self.app.amqp.router
-        conf = app.conf
-
         # add 'self' if this is a bound method.
         if self.__self__ is not None:
             args = (self.__self__, ) + tuple(args)
-
-        if conf.CELERY_ALWAYS_EAGER:
-            return self.apply(args, kwargs, task_id=task_id,
+        app = self._get_app()
+        if app.conf.CELERY_ALWAYS_EAGER:
+            return self.apply(args, kwargs, task_id=task_id or uuid(),
                               link=link, link_error=link_error, **options)
-        options = dict(extract_exec_options(self), **options)
-        options = router.route(options, self.name, args, kwargs)
-
-        if connection:
-            producer = app.amqp.TaskProducer(connection)
-        with app.producer_or_acquire(producer) as P:
-            self.backend.on_task_call(P, task_id)
-            task_id = P.publish_task(self.name, args, kwargs,
-                                     task_id=task_id,
-                                     callbacks=maybe_list(link),
-                                     errbacks=maybe_list(link_error),
-                                     reply_to=reply_to or self.app.oid,
-                                     **options)
-        result = self.AsyncResult(task_id)
-        if add_to_parent:
-            parent = get_current_worker_task()
-            if parent:
-                parent.add_trail(result)
-        return result
+        return app.send_task(
+            self.name, args, kwargs, task_id=task_id, producer=producer,
+            link=link, link_error=link_error, result_cls=self.AsyncResult,
+            **dict(self._get_exec_options(), **options)
+        )
 
     def subtask_from_request(self, request=None, args=None, kwargs=None,
                              **extra_options):
-
         request = self.request if request is None else request
         args = request.args if args is None else args
         kwargs = request.kwargs if kwargs is None else kwargs
@@ -834,6 +814,11 @@ class Task(object):
         return req
     request = property(_get_request)
 
+    def _get_exec_options(self):
+        if self._exec_options is None:
+            self._exec_options = extract_exec_options(self)
+        return self._exec_options
+
     @property
     def __name__(self):
         return self.__class__.__name__

+ 1 - 1
celery/task/base.py

@@ -24,7 +24,7 @@ __all__ = ['Task', 'PeriodicTask', 'task']
 #: list of methods that must be classmethods in the old API.
 _COMPAT_CLASSMETHODS = (
     'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
-    'AsyncResult', 'subtask', '_get_request',
+    'AsyncResult', 'subtask', '_get_request', '_get_exec_options',
 )
 
 

+ 1 - 2
celery/tests/compat_modules/test_sets.py

@@ -132,9 +132,8 @@ class test_TaskSet(AppCase):
 
         with patch('celery.task.sets.get_current_worker_task') as gwt:
             parent = gwt.return_value = Mock()
-            parent.request.children = []
             ts.apply_async()
-            self.assertTrue(parent.request.children)
+            self.assertTrue(parent.add_trail.called)
 
     def test_apply_async(self):
         applied = [0]