|
@@ -12,10 +12,6 @@
|
|
|
from __future__ import absolute_import
|
|
|
from __future__ import with_statement
|
|
|
|
|
|
-import celery
|
|
|
-import kombu
|
|
|
-import os
|
|
|
-import platform as _platform
|
|
|
import warnings
|
|
|
|
|
|
from collections import deque
|
|
@@ -25,82 +21,40 @@ from functools import wraps
|
|
|
|
|
|
from kombu.clocks import LamportClock
|
|
|
|
|
|
-from .. import datastructures
|
|
|
from .. import platforms
|
|
|
from ..backends import get_backend_by_url
|
|
|
from ..exceptions import AlwaysEagerIgnored
|
|
|
from ..loaders import get_loader_cls
|
|
|
-from ..local import maybe_evaluate
|
|
|
-from ..utils import cached_property, lpmerge, register_after_fork
|
|
|
+from ..local import PromiseProxy, maybe_evaluate
|
|
|
+from ..utils import cached_property, register_after_fork
|
|
|
from ..utils.functional import first
|
|
|
-from ..utils.imports import instantiate, qualname
|
|
|
-from ..utils.text import pretty
|
|
|
+from ..utils.imports import instantiate
|
|
|
|
|
|
+from . import annotations
|
|
|
from .builtins import load_builtin_tasks
|
|
|
-from .defaults import DEFAULTS, find_deprecated_settings, find
|
|
|
+from .defaults import DEFAULTS, find_deprecated_settings
|
|
|
+from .state import _tls
|
|
|
+from .utils import AppPickler, Settings, bugreport, _unpickle_app
|
|
|
|
|
|
-import kombu
|
|
|
-if kombu.VERSION < (2, 0):
|
|
|
- raise ImportError("Celery requires Kombu version 1.1.0 or higher.")
|
|
|
|
|
|
-SETTINGS_INFO = """%s %s"""
|
|
|
+class App(object):
|
|
|
+ """Celery Application.
|
|
|
|
|
|
-BUGREPORT_INFO = """
|
|
|
-software -> celery:%(celery_v)s kombu:%(kombu_v)s py:%(py_v)s
|
|
|
-platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
|
|
|
-loader -> %(loader)s
|
|
|
-settings -> transport:%(transport)s results:%(results)s
|
|
|
+ :param main: Name of the main module if running as `__main__`.
|
|
|
+ :keyword loader: The loader class, or the name of the loader class to use.
|
|
|
+ Default is :class:`celery.loaders.app.AppLoader`.
|
|
|
+ :keyword backend: The result store backend class, or the name of the
|
|
|
+ backend class to use. Default is the value of the
|
|
|
+ :setting:`CELERY_RESULT_BACKEND` setting.
|
|
|
+ :keyword amqp: AMQP object or class name.
|
|
|
+ :keyword events: Events object or class name.
|
|
|
+ :keyword log: Log object or class name.
|
|
|
+ :keyword control: Control object or class name.
|
|
|
+ :keyword set_as_current: Make this the global current app.
|
|
|
|
|
|
-%(human_settings)s
|
|
|
+ """
|
|
|
+ Pickler = AppPickler
|
|
|
|
|
|
-"""
|
|
|
-
|
|
|
-
|
|
|
-class Settings(datastructures.ConfigurationView):
|
|
|
-
|
|
|
- @property
|
|
|
- def CELERY_RESULT_BACKEND(self):
|
|
|
- """Resolves deprecated alias ``CELERY_BACKEND``."""
|
|
|
- return self.first("CELERY_RESULT_BACKEND", "CELERY_BACKEND")
|
|
|
-
|
|
|
- @property
|
|
|
- def BROKER_TRANSPORT(self):
|
|
|
- """Resolves compat aliases :setting:`BROKER_BACKEND`
|
|
|
- and :setting:`CARROT_BACKEND`."""
|
|
|
- return self.first("BROKER_TRANSPORT",
|
|
|
- "BROKER_BACKEND", "CARROT_BACKEND")
|
|
|
-
|
|
|
- @property
|
|
|
- def BROKER_BACKEND(self):
|
|
|
- """Deprecated compat alias to :attr:`BROKER_TRANSPORT`."""
|
|
|
- return self.BROKER_TRANSPORT
|
|
|
-
|
|
|
- @property
|
|
|
- def BROKER_HOST(self):
|
|
|
- return (os.environ.get("CELERY_BROKER_URL") or
|
|
|
- self.first("BROKER_URL", "BROKER_HOST"))
|
|
|
-
|
|
|
- def without_defaults(self):
|
|
|
- # the last stash is the default settings, so just skip that
|
|
|
- return Settings({}, self._order[:-1])
|
|
|
-
|
|
|
- def find_value_for_key(self, name, namespace="celery"):
|
|
|
- return self.get_by_parts(*self.find_option(name, namespace)[:-1])
|
|
|
-
|
|
|
- def find_option(self, name, namespace="celery"):
|
|
|
- return find(name, namespace)
|
|
|
-
|
|
|
- def get_by_parts(self, *parts):
|
|
|
- return self["_".join(filter(None, parts))]
|
|
|
-
|
|
|
- def humanize(self):
|
|
|
- return "\n".join(SETTINGS_INFO % (key + ':', pretty(value, width=50))
|
|
|
- for key, value in self.without_defaults().iteritems())
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-class BaseApp(object):
|
|
|
- """Base class for apps."""
|
|
|
SYSTEM = platforms.SYSTEM
|
|
|
IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
|
|
|
|
|
@@ -139,12 +93,157 @@ class BaseApp(object):
|
|
|
if broker:
|
|
|
self._preconf["BROKER_URL"] = broker
|
|
|
|
|
|
+ if self.set_as_current:
|
|
|
+ self.set_current()
|
|
|
self.on_init()
|
|
|
|
|
|
+ def set_current(self):
|
|
|
+ """Make this the current app for this thread."""
|
|
|
+ _tls.current_app = self
|
|
|
+
|
|
|
def on_init(self):
|
|
|
- """Called at the end of the constructor."""
|
|
|
pass
|
|
|
|
|
|
+ def create_task_cls(self):
|
|
|
+ """Creates a base task class using default configuration
|
|
|
+ taken from this app."""
|
|
|
+ from .task import BaseTask
|
|
|
+
|
|
|
+ class Task(BaseTask):
|
|
|
+ app = self
|
|
|
+ abstract = True
|
|
|
+
|
|
|
+ Task.__doc__ = BaseTask.__doc__
|
|
|
+ Task.bind(self)
|
|
|
+
|
|
|
+ return Task
|
|
|
+
|
|
|
+ def Worker(self, **kwargs):
|
|
|
+ """Create new :class:`~celery.apps.worker.Worker` instance."""
|
|
|
+ return instantiate("celery.apps.worker:Worker", app=self, **kwargs)
|
|
|
+
|
|
|
+ def WorkController(self, **kwargs):
|
|
|
+ return instantiate("celery.worker:WorkController", app=self, **kwargs)
|
|
|
+
|
|
|
+ def Beat(self, **kwargs):
|
|
|
+ """Create new :class:`~celery.apps.beat.Beat` instance."""
|
|
|
+ return instantiate("celery.apps.beat:Beat", app=self, **kwargs)
|
|
|
+
|
|
|
+ def TaskSet(self, *args, **kwargs):
|
|
|
+ """Create new :class:`~celery.task.sets.TaskSet`."""
|
|
|
+ return instantiate("celery.task.sets:TaskSet",
|
|
|
+ app=self, *args, **kwargs)
|
|
|
+
|
|
|
+ def start(self, argv=None):
|
|
|
+ """Run :program:`celery` using `argv`. Uses :data:`sys.argv`
|
|
|
+ if `argv` is not specified."""
|
|
|
+ return instantiate("celery.bin.celery:CeleryCommand", app=self) \
|
|
|
+ .execute_from_commandline(argv)
|
|
|
+
|
|
|
+ def worker_main(self, argv=None):
|
|
|
+ """Run :program:`celeryd` using `argv`. Uses :data:`sys.argv`
|
|
|
+ if `argv` is not specified."""
|
|
|
+ return instantiate("celery.bin.celeryd:WorkerCommand", app=self) \
|
|
|
+ .execute_from_commandline(argv)
|
|
|
+
|
|
|
+ def task(self, *args, **options):
|
|
|
+ """Decorator to create a task class out of any callable.
|
|
|
+
|
|
|
+ **Examples:**
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
+
|
|
|
+ @task
|
|
|
+ def refresh_feed(url):
|
|
|
+ return ...
|
|
|
+
|
|
|
+ with setting extra options:
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
+
|
|
|
+ @task(exchange="feeds")
|
|
|
+ def refresh_feed(url):
|
|
|
+ return ...
|
|
|
+
|
|
|
+ .. admonition:: App Binding
|
|
|
+
|
|
|
+ For custom apps the task decorator returns proxy
|
|
|
+ objects, so that the act of creating the task is not performed
|
|
|
+ until the task is used or the task registry is accessed.
|
|
|
+
|
|
|
+ If you are depending on binding to be deferred, then you must
|
|
|
+ not access any attributes on the returned object until the
|
|
|
+ application is fully set up (finalized).
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ def inner_create_task_cls(**options):
|
|
|
+
|
|
|
+ def _create_task_cls(fun):
|
|
|
+ if self.accept_magic_kwargs: # compat mode
|
|
|
+ return self._task_from_fun(fun, **options)
|
|
|
+
|
|
|
+ # return a proxy object that is only evaluated when first used
|
|
|
+ promise = PromiseProxy(self._task_from_fun, (fun, ), options)
|
|
|
+ self._pending.append(promise)
|
|
|
+ return promise
|
|
|
+
|
|
|
+ return _create_task_cls
|
|
|
+
|
|
|
+ if len(args) == 1 and callable(args[0]):
|
|
|
+ return inner_create_task_cls(**options)(*args)
|
|
|
+ return inner_create_task_cls(**options)
|
|
|
+
|
|
|
+ def _task_from_fun(self, fun, **options):
|
|
|
+ base = options.pop("base", None) or self.Task
|
|
|
+
|
|
|
+ T = type(fun.__name__, (base, ), dict({
|
|
|
+ "app": self,
|
|
|
+ "accept_magic_kwargs": False,
|
|
|
+ "run": staticmethod(fun),
|
|
|
+ "__doc__": fun.__doc__,
|
|
|
+ "__module__": fun.__module__}, **options))()
|
|
|
+ return self._tasks[T.name] # return global instance.
|
|
|
+
|
|
|
+ def annotate_task(self, task):
|
|
|
+ if self.annotations:
|
|
|
+ match = annotations._first_match(self.annotations, task)
|
|
|
+ for attr, value in (match or {}).iteritems():
|
|
|
+ setattr(task, attr, value)
|
|
|
+ match_any = annotations._first_match_any(self.annotations)
|
|
|
+ for attr, value in (match_any or {}).iteritems():
|
|
|
+ setattr(task, attr, value)
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def Task(self):
|
|
|
+ """Default Task base class for this application."""
|
|
|
+ return self.create_task_cls()
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def annotations(self):
|
|
|
+ return annotations.prepare(self.conf.CELERY_ANNOTATIONS)
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )
|
|
|
+
|
|
|
+ def __reduce__(self):
|
|
|
+ # Reduce only pickles the configuration changes,
|
|
|
+ # so the default configuration doesn't have to be passed
|
|
|
+ # between processes.
|
|
|
+ return (_unpickle_app, (self.__class__, self.Pickler)
|
|
|
+ + self.__reduce_args__())
|
|
|
+
|
|
|
+ def __reduce_args__(self):
|
|
|
+ return (self.main,
|
|
|
+ self.conf.changes,
|
|
|
+ self.loader_cls,
|
|
|
+ self.backend_cls,
|
|
|
+ self.amqp_cls,
|
|
|
+ self.events_cls,
|
|
|
+ self.log_cls,
|
|
|
+ self.control_cls,
|
|
|
+ self.accept_magic_kwargs)
|
|
|
+
|
|
|
def finalize(self):
|
|
|
if not self.finalized:
|
|
|
load_builtin_tasks(self)
|
|
@@ -340,6 +439,9 @@ class BaseApp(object):
|
|
|
`*values` are true."""
|
|
|
return first(None, values) or self.conf.get(default_key)
|
|
|
|
|
|
+ def bugreport(self):
|
|
|
+ return bugreport(self)
|
|
|
+
|
|
|
def _get_backend(self):
|
|
|
backend, url = get_backend_by_url(
|
|
|
self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
|
|
@@ -355,18 +457,6 @@ class BaseApp(object):
|
|
|
self._pool.force_close_all()
|
|
|
self._pool = None
|
|
|
|
|
|
- def bugreport(self):
|
|
|
- return BUGREPORT_INFO % {"system": _platform.system(),
|
|
|
- "arch": _platform.architecture(),
|
|
|
- "py_i": platforms.pyimplementation(),
|
|
|
- "celery_v": celery.__version__,
|
|
|
- "kombu_v": kombu.__version__,
|
|
|
- "py_v": _platform.python_version(),
|
|
|
- "transport": self.conf.BROKER_TRANSPORT,
|
|
|
- "results": self.conf.CELERY_RESULT_BACKEND,
|
|
|
- "human_settings": self.conf.humanize(),
|
|
|
- "loader": qualname(self.loader.__class__)}
|
|
|
-
|
|
|
@property
|
|
|
def pool(self):
|
|
|
if self._pool is None:
|