فهرست منبع

Modern Task class no longer supports: establish_connection(), get_publisher(), get_consumer() and get_logger()

These are now only available on the compat Task class:

    from celery.task import Task

The new Task class is now avalable as:

    from celery import Task

The methods deprecated above are not very useful, it is far better
to use the Celery app methods (broker_connection, amqp.TaskProducer,
amqp.TaskConsumer instead).

(Also remember to write in changelog that the new modern Task class
no longer uses classmethods).
Ask Solem 13 سال پیش
والد
کامیت
b4a3f9f67e

+ 1 - 0
celery/__init__.py

@@ -20,6 +20,7 @@ from .__compat__ import recreate_module
 old_module, new_module = recreate_module(__name__,  # pragma: no cover
     by_module={
         "celery.app":       ["Celery", "bugreport"],
+        "celery.app.task":  ["Task"],
         "celery.state":     ["current_app", "current_task"],
         "celery.canvas":    ["chain", "chord", "chunks",
                              "group", "subtask", "xmap", "xstarmap"],

+ 1 - 1
celery/app/base.py

@@ -300,7 +300,7 @@ class Celery(object):
     def create_task_cls(self):
         """Creates a base task class using default configuration
         taken from this app."""
-        return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
+        return self.subclass_with_self("celery.app.task:Task", name="Task",
                                        attribute="_app", abstract=True)
 
     def subclass_with_self(self, Class, name=None, attribute="app",

+ 10 - 64
celery/app/task.py

@@ -159,7 +159,7 @@ class TaskType(type):
         return "<unbound %s>" % (cls.__name__, )
 
 
-class BaseTask(object):
+class Task(object):
     """Task base class.
 
     When called tasks apply the :meth:`run` method.  This method must
@@ -402,65 +402,6 @@ class BaseTask(object):
     def start_strategy(self, app, consumer):
         return instantiate(self.Strategy, self, app, consumer)
 
-    def get_logger(self, **kwargs):
-        """Get task-aware logger object."""
-        logger = get_logger(self.name)
-        if logger.parent is logging.root:
-            logger.parent = get_logger("celery.task")
-        return logger
-
-    def establish_connection(self, connect_timeout=None):
-        """Establish a connection to the message broker."""
-        return self._get_app().broker_connection(
-                connect_timeout=connect_timeout)
-
-    def get_publisher(self, connection=None, exchange=None,
-            connect_timeout=None, exchange_type=None, **options):
-        """Get a celery task message publisher.
-
-        :rtype :class:`~celery.app.amqp.TaskProducer`:
-
-        .. warning::
-
-            If you don't specify a connection, one will automatically
-            be established for you, in that case you need to close this
-            connection after use::
-
-                >>> publisher = self.get_publisher()
-                >>> # ... do something with publisher
-                >>> publisher.connection.close()
-
-        """
-        exchange = self.exchange if exchange is None else exchange
-        if exchange_type is None:
-            exchange_type = self.exchange_type
-        connection = connection or self.establish_connection(connect_timeout)
-        return self._get_app().amqp.TaskProducer(connection,
-                exchange=exchange and Exchange(exchange, exchange_type),
-                routing_key=self.routing_key, **options)
-
-    def get_consumer(self, connection=None, queues=None, **kwargs):
-        """Get message consumer.
-
-        :rtype :class:`kombu.messaging.Consumer`:
-
-        .. warning::
-
-            If you don't specify a connection, one will automatically
-            be established for you, in that case you need to close this
-            connection after use::
-
-                >>> consumer = self.get_consumer()
-                >>> # do something with consumer
-                >>> consumer.close()
-                >>> consumer.connection.close()
-
-        """
-        app = self._get_app()
-        connection = connection or self.establish_connection()
-        return app.amqp.TaskConsumer(connection,
-            queues or app.amqp.queue_or_default(self.queue), **kwargs)
-
     def delay(self, *args, **kwargs):
         """Star argument version of :meth:`apply_async`.
 
@@ -700,7 +641,7 @@ class BaseTask(object):
         :rtype :class:`celery.result.EagerResult`:
 
         """
-        # trace imports BaseTask, so need to import inline.
+        # trace imports Task, so need to import inline.
         from celery.task.trace import eager_trace_task
 
         app = self._get_app()
@@ -893,9 +834,13 @@ class BaseTask(object):
         """`repr(task)`"""
         return "<@task: %s>" % (self.name, )
 
-    @cached_property
-    def logger(self):
-        return self.get_logger()
+    def _get_logger(self, **kwargs):
+        """Get task-aware logger object."""
+        logger = get_logger(self.name)
+        if logger.parent is logging.root:
+            logger.parent = get_logger("celery.task")
+        return logger
+    logger = cached_property(_get_logger)
 
     @property
     def request(self):
@@ -904,3 +849,4 @@ class BaseTask(object):
     @property
     def __name__(self):
         return self.__class__.__name__
+BaseTask = Task  # compat alias

+ 1 - 1
celery/bin/celery.py

@@ -433,7 +433,7 @@ class shell(Command):  # pragma: no cover
         import celery.task.base
         self.app.loader.import_default_modules()
         self.locals = {"celery": self.app,
-                       "BaseTask": celery.task.base.BaseTask,
+                       "Task": celery.Task,
                        "chord": celery.chord,
                        "group": celery.group,
                        "chain": celery.chain,

+ 69 - 4
celery/task/base.py

@@ -15,17 +15,21 @@ from __future__ import absolute_import
 
 from celery import current_app
 from celery.__compat__ import class_property, reclassmethod
-from celery.app.task import Context, TaskType, BaseTask  # noqa
+from celery.app.task import Context, TaskType, Task as BaseTask  # noqa
 from celery.schedules import maybe_schedule
 
 #: list of methods that must be classmethods in the old API.
 _COMPAT_CLASSMETHODS = (
-    "get_logger", "establish_connection", "get_publisher", "get_consumer",
-    "delay", "apply_async", "retry", "apply", "AsyncResult", "subtask",
-    "push_request", "pop_request")
+    "delay", "apply_async", "retry", "apply",
+    "AsyncResult", "subtask", "push_request", "pop_request")
 
 
 class Task(BaseTask):
+    """Deprecated Task base class.
+
+    Modern applications should use :class:`celery.Task` instead.
+
+    """
     abstract = True
     __bound__ = False
 
@@ -42,6 +46,67 @@ class Task(BaseTask):
         return self.request_stack.top
     request = class_property(_get_request)
 
+    #: Deprecated alias to :attr:`logger``.
+    get_logger = reclassmethod(BaseTask._get_logger)
+
+    @classmethod
+    def establish_connection(self, connect_timeout=None):
+        """Deprecated method used to get a broker connection.
+
+        Should be replaced with :meth:`@Celery.broker_connection`
+        instead, or by acquiring connections from the connection pool:
+
+        .. code-block:: python
+
+            # using the connection pool
+            with celery.pool.acquire(block=True) as conn:
+                ...
+
+            # establish fresh connection
+            with celery.broker_connection() as conn:
+                ...
+        """
+        return self._get_app().broker_connection(
+                connect_timeout=connect_timeout)
+
+
+    def get_publisher(self, connection=None, exchange=None,
+            connect_timeout=None, exchange_type=None, **options):
+        """Deprecated method to get the task publisher (now called producer).
+
+        Should be replaced with :class:`@amqp.TaskProducer`:
+
+        .. code-block:: python
+
+            with celery.broker_connection() as conn:
+                with celery.amqp.TaskProducer(conn) as prod:
+                    my_task.apply_async(producer=prod)
+
+        """
+        exchange = self.exchange if exchange is None else exchange
+        if exchange_type is None:
+            exchange_type = self.exchange_type
+        connection = connection or self.establish_connection(connect_timeout)
+        return self._get_app().amqp.TaskProducer(connection,
+                exchange=exchange and Exchange(exchange, exchange_type),
+                routing_key=self.routing_key, **options)
+
+
+    @classmethod
+    def get_consumer(self, connection=None, queues=None, **kwargs):
+        """Deprecated method used to get consumer for the queue
+        this task is sent to.
+
+        Should be replaced with :class:`@amqp.TaskConsumer` instead:
+
+        """
+        Q = self._get_app().amqp
+        connection = connection or self.establish_connection()
+        if queues is None:
+            queues = Q.queues[self.queue] if self.queue else Q.default_queue
+        return Q.TaskConsumer(connection, queues, **kwargs)
+
+
 
 class PeriodicTask(Task):
     """A periodic task is a task that adds itself to the

+ 1 - 1
celery/task/trace.py

@@ -29,7 +29,7 @@ from kombu.utils import kwdict
 from celery import current_app
 from celery import states, signals
 from celery.state import _task_stack, default_app
-from celery.app.task import BaseTask, Context
+from celery.app.task import Task as BaseTask, Context
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import RetryTaskError
 from celery.utils.serialization import get_pickleable_exception

+ 5 - 5
celery/tests/app/test_amqp.py

@@ -7,31 +7,31 @@ from celery.app.amqp import Queues
 from celery.tests.utils import AppCase
 
 
-class test_TaskPublisher(AppCase):
+class test_TaskProducer(AppCase):
 
     def test__exit__(self):
 
-        publisher = self.app.amqp.TaskPublisher(self.app.broker_connection())
+        publisher = self.app.amqp.TaskProducer(self.app.broker_connection())
         publisher.release = Mock()
         with publisher:
             pass
         publisher.release.assert_called_with()
 
     def test_declare(self):
-        publisher = self.app.amqp.TaskPublisher(self.app.broker_connection())
+        publisher = self.app.amqp.TaskProducer(self.app.broker_connection())
         publisher.exchange.name = "foo"
         publisher.declare()
         publisher.exchange.name = None
         publisher.declare()
 
     def test_retry_policy(self):
-        pub = self.app.amqp.TaskPublisher(Mock())
+        pub = self.app.amqp.TaskProducer(Mock())
         pub.channel.connection.client.declared_entities = set()
         pub.delay_task("tasks.add", (2, 2), {},
                        retry_policy={"frobulate": 32.4})
 
     def test_publish_no_retry(self):
-        pub = self.app.amqp.TaskPublisher(Mock())
+        pub = self.app.amqp.TaskProducer(Mock())
         pub.channel.connection.client.declared_entities = set()
         pub.delay_task("tasks.add", (2, 2), {}, retry=False, chord=123)
         self.assertFalse(pub.connection.ensure.call_count)

+ 1 - 1
celery/tests/app/test_app.py

@@ -261,7 +261,7 @@ class test_App(Case):
             chan.close()
         assert conn.transport_cls == "memory"
 
-        pub = self.app.amqp.TaskPublisher(conn,
+        pub = self.app.amqp.TaskProducer(conn,
                 exchange=Exchange("foo_exchange"))
 
         dispatcher = Dispatcher()

+ 7 - 7
celery/tests/utilities/test_compat.py

@@ -2,8 +2,8 @@ from __future__ import absolute_import
 
 
 import celery
-from celery.app.task import BaseTask
-from celery.task.base import Task
+from celery.app.task import Task as ModernTask
+from celery.task.base import Task as CompatTask
 
 from celery.tests.utils import Case
 
@@ -11,19 +11,19 @@ from celery.tests.utils import Case
 class test_MagicModule(Case):
 
     def test_class_property_set_without_type(self):
-        self.assertTrue(BaseTask.__dict__["app"].__get__(Task()))
+        self.assertTrue(ModernTask.__dict__["app"].__get__(CompatTask()))
 
     def test_class_property_set_on_class(self):
-        self.assertIs(BaseTask.__dict__["app"].__set__(None, None),
-                      BaseTask.__dict__["app"])
+        self.assertIs(ModernTask.__dict__["app"].__set__(None, None),
+                      ModernTask.__dict__["app"])
 
     def test_class_property_set(self):
 
-        class X(Task):
+        class X(CompatTask):
             pass
 
         app = celery.Celery(set_as_current=False)
-        BaseTask.__dict__["app"].__set__(X(), app)
+        ModernTask.__dict__["app"].__set__(X(), app)
         self.assertEqual(X.app, app)
 
     def test_dir(self):