浏览代码

celery.task.Task is no longer bound to an app by default, so configuration of the task is lazy

Ask Solem 13 年之前
父节点
当前提交
38361c839e
共有 3 个文件被更改,包括 100 次插入127 次删除
  1. 3 16
      celery/app/__init__.py
  2. 84 21
      celery/app/task/__init__.py
  3. 13 90
      celery/task/base.py

+ 3 - 16
celery/app/__init__.py

@@ -91,27 +91,14 @@ class App(base.BaseApp):
     def create_task_cls(self):
         """Creates a base task class using default configuration
         taken from this app."""
-        conf = self.conf
-
         from .task import BaseTask
 
         class Task(BaseTask):
-            abstract = True
             app = self
-            backend = self.backend
-            exchange_type = conf.CELERY_DEFAULT_EXCHANGE_TYPE
-            delivery_mode = conf.CELERY_DEFAULT_DELIVERY_MODE
-            send_error_emails = conf.CELERY_SEND_TASK_ERROR_EMAILS
-            error_whitelist = conf.CELERY_TASK_ERROR_WHITELIST
-            serializer = conf.CELERY_TASK_SERIALIZER
-            rate_limit = conf.CELERY_DEFAULT_RATE_LIMIT
-            track_started = conf.CELERY_TRACK_STARTED
-            acks_late = conf.CELERY_ACKS_LATE
-            ignore_result = conf.CELERY_IGNORE_RESULT
-            store_errors_even_if_ignored = \
-                conf.CELERY_STORE_ERRORS_EVEN_IF_IGNORED
-            accept_magic_kwargs = self.accept_magic_kwargs
+            abstract = True
+
         Task.__doc__ = BaseTask.__doc__
+        Task.bind(self)
 
         return Task
 

+ 84 - 21
celery/app/task/__init__.py

@@ -78,7 +78,6 @@ class TaskType(type):
 
     def __new__(cls, name, bases, attrs):
         new = super(TaskType, cls).__new__
-        app = attrs.get("app") or current_app
         task_module = attrs.get("__module__") or "__main__"
 
         if "__call__" in attrs:
@@ -89,6 +88,13 @@ class TaskType(type):
         if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
             return new(cls, name, bases, attrs)
 
+        # The 'app' attribute is now a property, with the real app located
+        # in the '_app' attribute.  Previously this was a regular attribute,
+        # so we should support classes defining it.
+        app = attrs["_app"] = (attrs.pop("_app", None) or
+                               attrs.pop("app", None) or
+                               current_app)
+
         # - Automatically generate missing/empty name.
         autoname = False
         if not attrs.get("name"):
@@ -125,9 +131,10 @@ class TaskType(type):
         task_name = attrs["name"]
         if task_name not in tasks:
             task_cls = new(cls, name, bases, attrs)
-            if autoname and task_module == "__main__" and task_cls.app.main:
-                task_name = task_cls.name = '.'.join([task_cls.app.main, name])
+            if autoname and task_module == "__main__" and app.main:
+                task_name = task_cls.name = '.'.join([app.main, name])
             tasks.register(task_cls)
+            task_cls.bind(app)
         task = tasks[task_name].__class__
 
         # decorate with annotations from config.
@@ -135,7 +142,9 @@ class TaskType(type):
         return task
 
     def __repr__(cls):
-        return "<class Task of %s>" % (cls.app, )
+        if cls._app:
+            return "<class %s of %s>" % (cls.__name__, cls._app, )
+        return "<unbound %s>" % (cls.__name__, )
 
 
 class BaseTask(object):
@@ -152,8 +161,11 @@ class BaseTask(object):
     ErrorMail = ErrorMail
     MaxRetriesExceededError = MaxRetriesExceededError
 
+    #: Execution strategy used, or the qualified name of one.
+    Strategy = "celery.worker.strategy:default"
+
     #: The application instance associated with this task class.
-    app = None
+    _app = None
 
     #: Name of the task.
     name = None
@@ -274,8 +286,56 @@ class BaseTask(object):
     #: Default task expiry time.
     expires = None
 
-    #: Execution strategy used, or the qualified name of one.
-    Strategy = "celery.worker.strategy:default"
+    from_config = (
+        ("exchange_type", "CELERY_DEFAULT_EXCHANGE_TYPE"),
+        ("delivery_mode", "CELERY_DEFAULT_DELIVERY_MODE"),
+        ("send_error_emails", "CELERY_SEND_TASK_ERROR_EMAILS"),
+        ("error_whitelist", "CELERY_TASK_ERROR_WHITELIST"),
+        ("serializer", "CELERY_TASK_SERIALIZER"),
+        ("rate_limit", "CELERY_DEFAULT_RATE_LIMIT"),
+        ("track_started", "CELERY_TRACK_STARTED"),
+        ("acks_late", "CELERY_ACKS_LATE"),
+        ("ignore_result", "CELERY_IGNORE_RESULT"),
+        ("store_errors_even_if_ignored",
+            "CELERY_STORE_ERRORS_EVEN_IF_IGNORED"),
+    )
+
+    @classmethod
+    def bind(cls, app):
+        cls._app = app
+        conf = app.conf
+
+        for attr_name, config_name in cls.from_config:
+            if getattr(cls, attr_name, None) is None:
+                setattr(cls, attr_name, conf[config_name])
+        cls.accept_magic_kwargs = app.accept_magic_kwargs
+        if cls.accept_magic_kwargs is None:
+            cls.accept_magic_kwargs = app.accept_magic_kwargs
+        if cls.backend is None:
+            cls.backend = app.backend
+
+        # e.g. PeriodicTask uses this to add itself to the PeriodicTask
+        # schedule.
+        cls.on_bound(app)
+
+        return app
+
+    @classmethod
+    def on_bound(self, app):
+        """This method can be defined to do additional actions when the
+        task class is bound to an app."""
+        pass
+
+    @classmethod
+    def _get_app(cls):
+        if cls._app is None:
+            cls.bind(current_app)
+        return cls._app
+
+    @classmethod
+    def _set_app(cls, app):
+        cls.bind(app)
+    app = property(_get_app, _set_app)
 
     def __reduce__(self):
         return (_unpickle_task, (self.name, ), None)
@@ -291,7 +351,7 @@ class BaseTask(object):
     def get_logger(self, loglevel=None, logfile=None, propagate=False,
             **kwargs):
         """Get task-aware logger object."""
-        return self.app.log.setup_task_logger(
+        return self._get_app().log.setup_task_logger(
             loglevel=self.request.loglevel if loglevel is None else loglevel,
             logfile=self.request.logfile if logfile is None else logfile,
             propagate=propagate, task_name=self.name, task_id=self.request.id)
@@ -299,7 +359,8 @@ class BaseTask(object):
     @classmethod
     def establish_connection(self, connect_timeout=None):
         """Establish a connection to the message broker."""
-        return self.app.broker_connection(connect_timeout=connect_timeout)
+        return self._get_app().broker_connection(
+                connect_timeout=connect_timeout)
 
     @classmethod
     def get_publisher(self, connection=None, exchange=None,
@@ -328,7 +389,7 @@ class BaseTask(object):
         if exchange_type is None:
             exchange_type = self.exchange_type
         connection = connection or self.establish_connection(connect_timeout)
-        return self.app.amqp.TaskPublisher(connection=connection,
+        return self._get_app().amqp.TaskPublisher(connection=connection,
                                            exchange=exchange,
                                            exchange_type=exchange_type,
                                            routing_key=self.routing_key,
@@ -353,7 +414,7 @@ class BaseTask(object):
 
         """
         connection = connection or self.establish_connection(connect_timeout)
-        return self.app.amqp.TaskConsumer(connection=connection,
+        return self._get_app().amqp.TaskConsumer(connection=connection,
                                           exchange=self.exchange,
                                           routing_key=self.routing_key)
 
@@ -457,19 +518,20 @@ class BaseTask(object):
             be replaced by a local :func:`apply` call instead.
 
         """
-        router = self.app.amqp.Router(queues)
-        conf = self.app.conf
+        app = self._get_app()
+        router = app.amqp.Router(queues)
+        conf = app.conf
 
         if conf.CELERY_ALWAYS_EAGER:
             return self.apply(args, kwargs, task_id=task_id, **options)
         options = dict(extract_exec_options(self), **options)
         options = router.route(options, self.name, args, kwargs)
 
-        publish = publisher or self.app.amqp.publisher_pool.acquire(block=True)
+        publish = publisher or app.amqp.publisher_pool.acquire(block=True)
         evd = None
         if conf.CELERY_SEND_TASK_SENT_EVENT:
-            evd = self.app.events.Dispatcher(channel=publish.channel,
-                                             buffer_while_offline=False)
+            evd = app.events.Dispatcher(channel=publish.channel,
+                                        buffer_while_offline=False)
 
         try:
             task_id = publish.delay_task(self.name, args, kwargs,
@@ -583,15 +645,16 @@ class BaseTask(object):
         :rtype :class:`celery.result.EagerResult`:
 
         """
+        app = self._get_app()
         args = args or []
         kwargs = kwargs or {}
         task_id = options.get("task_id") or uuid()
         retries = options.get("retries", 0)
-        throw = self.app.either("CELERY_EAGER_PROPAGATES_EXCEPTIONS",
-                                options.pop("throw", None))
+        throw = app.either("CELERY_EAGER_PROPAGATES_EXCEPTIONS",
+                           options.pop("throw", None))
 
         # Make sure we get the task instance, not class.
-        task = self.app._tasks[self.name]
+        task = app._tasks[self.name]
 
         request = {"id": task_id,
                    "retries": retries,
@@ -629,8 +692,8 @@ class BaseTask(object):
         :param task_id: Task id to get result for.
 
         """
-        return self.app.AsyncResult(task_id, backend=self.backend,
-                                             task_name=self.name)
+        return self._get_app().AsyncResult(task_id, backend=self.backend,
+                                                    task_name=self.name)
 
     def update_state(self, task_id=None, state=None, meta=None):
         """Update task state.

+ 13 - 90
celery/task/base.py

@@ -11,113 +11,36 @@
 """
 from __future__ import absolute_import
 
-from .. import current_app
 from ..app.task import Context, TaskType, BaseTask  # noqa
 from ..schedules import maybe_schedule
-from ..utils import timeutils
 
-Task = current_app.Task
 
+class Task(BaseTask):
+    abstract = True
 
-class PeriodicTask(Task):
-    """A periodic task is a task that behaves like a :manpage:`cron` job.
-
-    Results of periodic tasks are not stored by default.
-
-    .. attribute:: run_every
-
-        *REQUIRED* Defines how often the task is run (its interval),
-        it can be a :class:`~datetime.timedelta` object, a
-        :class:`~celery.schedules.crontab` object or an integer
-        specifying the time in seconds.
-
-    .. attribute:: relative
-
-        If set to :const:`True`, run times are relative to the time when the
-        server was started. This was the previous behaviour, periodic tasks
-        are now scheduled by the clock.
-
-    :raises NotImplementedError: if the :attr:`run_every` attribute is
-        not defined.
-
-    Example
-
-        >>> from celery.task import tasks, PeriodicTask
-        >>> from datetime import timedelta
-        >>> class EveryThirtySecondsTask(PeriodicTask):
-        ...     run_every = timedelta(seconds=30)
-        ...
-        ...     def run(self, **kwargs):
-        ...         logger = self.get_logger(**kwargs)
-        ...         logger.info("Execute every 30 seconds")
-
-        >>> from celery.task import PeriodicTask
-        >>> from celery.schedules import crontab
-
-        >>> class EveryMondayMorningTask(PeriodicTask):
-        ...     run_every = crontab(hour=7, minute=30, day_of_week=1)
-        ...
-        ...     def run(self, **kwargs):
-        ...         logger = self.get_logger(**kwargs)
-        ...         logger.info("Execute every Monday at 7:30AM.")
-
-        >>> class EveryMorningTask(PeriodicTask):
-        ...     run_every = crontab(hours=7, minute=30)
-        ...
-        ...     def run(self, **kwargs):
-        ...         logger = self.get_logger(**kwargs)
-        ...         logger.info("Execute every day at 7:30AM.")
-
-        >>> class EveryQuarterPastTheHourTask(PeriodicTask):
-        ...     run_every = crontab(minute=15)
-        ...
-        ...     def run(self, **kwargs):
-        ...         logger = self.get_logger(**kwargs)
-        ...         logger.info("Execute every 0:15 past the hour every day.")
 
-    """
+class PeriodicTask(BaseTask):
+    """A periodic task is a task that adds itself to the
+    :setting:`CELERYBEAT_SCHEDULE` setting."""
     abstract = True
     ignore_result = True
     relative = False
     options = None
 
     def __init__(self):
-        app = current_app
         if not hasattr(self, "run_every"):
             raise NotImplementedError(
                     "Periodic tasks must have a run_every attribute")
         self.run_every = maybe_schedule(self.run_every, self.relative)
+        super(PeriodicTask, self).__init__()
 
-        # For backward compatibility, add the periodic task to the
-        # configuration schedule instead.
-        app.conf.CELERYBEAT_SCHEDULE[self.name] = {
-                "task": self.name,
-                "schedule": self.run_every,
+    @classmethod
+    def on_bound(cls, app):
+        app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
+                "task": cls.name,
+                "schedule": cls.run_every,
                 "args": (),
                 "kwargs": {},
-                "options": self.options or {},
-                "relative": self.relative,
+                "options": cls.options or {},
+                "relative": cls.relative,
         }
-
-        super(PeriodicTask, self).__init__()
-
-    def timedelta_seconds(self, delta):
-        """Convert :class:`~datetime.timedelta` to seconds.
-
-        Doesn't account for negative timedeltas.
-
-        """
-        return timeutils.timedelta_seconds(delta)
-
-    def is_due(self, last_run_at):
-        """Returns tuple of two items `(is_due, next_time_to_run)`,
-        where next time to run is in seconds.
-
-        See :meth:`celery.schedules.schedule.is_due` for more information.
-
-        """
-        return self.run_every.is_due(last_run_at)
-
-    def remaining_estimate(self, last_run_at):
-        """Returns when the periodic task should run next as a timedelta."""
-        return self.run_every.remaining_estimate(last_run_at)