|
@@ -108,12 +108,50 @@ def _ensure_after_fork():
|
|
|
|
|
|
|
|
|
class Celery(object):
|
|
|
+ """Celery application.
|
|
|
+
|
|
|
+ :param main: Name of the main module if running as `__main__`.
|
|
|
+ This is used as a prefix for task names.
|
|
|
+ :keyword broker: URL of the default broker used.
|
|
|
+ :keyword loader: The loader class, or the name of the loader class to use.
|
|
|
+ Default is :class:`celery.loaders.app.AppLoader`.
|
|
|
+ :keyword backend: The result store backend class, or the name of the
|
|
|
+ backend class to use. Default is the value of the
|
|
|
+ :setting:`CELERY_RESULT_BACKEND` setting.
|
|
|
+ :keyword amqp: AMQP object or class name.
|
|
|
+ :keyword events: Events object or class name.
|
|
|
+ :keyword log: Log object or class name.
|
|
|
+ :keyword control: Control object or class name.
|
|
|
+ :keyword set_as_current: Make this the global current app.
|
|
|
+ :keyword tasks: A task registry or the name of a registry class.
|
|
|
+ :keyword include: List of modules every worker should import.
|
|
|
+ :keyword fixups: List of fixup plug-ins (see e.g.
|
|
|
+ :mod:`celery.fixups.django`).
|
|
|
+ :keyword autofinalize: If set to False a :exc:`RuntimeError`
|
|
|
+ will be raised if the task registry or tasks are used before
|
|
|
+ the app is finalized.
|
|
|
+
|
|
|
+ """
|
|
|
#: This is deprecated, use :meth:`reduce_keys` instead
|
|
|
Pickler = AppPickler
|
|
|
|
|
|
SYSTEM = platforms.SYSTEM
|
|
|
IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
|
|
|
|
|
|
+ #: Name of the `__main__` module. Required for standalone scripts.
|
|
|
+ #:
|
|
|
+ #: If set this will be used instead of `__main__` when automatically
|
|
|
+ #: generating task names.
|
|
|
+ main = None
|
|
|
+
|
|
|
+ #: Custom options for command-line programs.
|
|
|
+ #: See :ref:`extending-commandoptions`
|
|
|
+ user_options = None
|
|
|
+
|
|
|
+ #: Custom bootsteps to extend and modify the worker.
|
|
|
+ #: See :ref:`extending-bootsteps`.
|
|
|
+ steps = None
|
|
|
+
|
|
|
amqp_cls = 'celery.app.amqp:AMQP'
|
|
|
backend_cls = None
|
|
|
events_cls = 'celery.events:Events'
|
|
@@ -204,9 +242,11 @@ class Celery(object):
|
|
|
_register_app(self)
|
|
|
|
|
|
def set_current(self):
|
|
|
+ """Makes this the current app for this thread."""
|
|
|
_set_current_app(self)
|
|
|
|
|
|
def set_default(self):
|
|
|
+ """Makes this the default app for all threads."""
|
|
|
set_default_app(self)
|
|
|
|
|
|
def __enter__(self):
|
|
@@ -216,6 +256,16 @@ class Celery(object):
|
|
|
self.close()
|
|
|
|
|
|
def close(self):
|
|
|
+ """Close any open pool connections and do any other steps necessary
|
|
|
+ to clean up after the application.
|
|
|
+
|
|
|
+ Only necessary for dynamically created apps for which you can
|
|
|
+ use the with statement instead::
|
|
|
+
|
|
|
+ with Celery(set_as_current=False) as app:
|
|
|
+ with app.connection() as conn:
|
|
|
+ pass
|
|
|
+ """
|
|
|
self._maybe_close_pool()
|
|
|
|
|
|
def on_init(self):
|
|
@@ -223,17 +273,55 @@ class Celery(object):
|
|
|
pass
|
|
|
|
|
|
def start(self, argv=None):
|
|
|
+ """Run :program:`celery` using `argv`.
|
|
|
+
|
|
|
+ Uses :data:`sys.argv` if `argv` is not specified.
|
|
|
+
|
|
|
+ """
|
|
|
return instantiate(
|
|
|
'celery.bin.celery:CeleryCommand',
|
|
|
app=self).execute_from_commandline(argv)
|
|
|
|
|
|
def worker_main(self, argv=None):
|
|
|
+ """Run :program:`celery worker` using `argv`.
|
|
|
+
|
|
|
+ Uses :data:`sys.argv` if `argv` is not specified.
|
|
|
+
|
|
|
+ """
|
|
|
return instantiate(
|
|
|
'celery.bin.worker:worker',
|
|
|
app=self).execute_from_commandline(argv)
|
|
|
|
|
|
def task(self, *args, **opts):
|
|
|
- """Creates new task class from any callable."""
|
|
|
+ """Decorator to create a task class out of any callable.
|
|
|
+
|
|
|
+ Examples:
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
+
|
|
|
+ @app.task
|
|
|
+ def refresh_feed(url):
|
|
|
+ return …
|
|
|
+
|
|
|
+ with setting extra options:
|
|
|
+
|
|
|
+ .. code-block:: python
|
|
|
+
|
|
|
+ @app.task(exchange="feeds")
|
|
|
+ def refresh_feed(url):
|
|
|
+ return …
|
|
|
+
|
|
|
+ .. admonition:: App Binding
|
|
|
+
|
|
|
+ For custom apps the task decorator will return a proxy
|
|
|
+ object, so that the act of creating the task is not performed
|
|
|
+ until the task is used or the task registry is accessed.
|
|
|
+
|
|
|
+ If you are depending on binding to be deferred, then you must
|
|
|
+ not access any attributes on the returned object until the
|
|
|
+ application is fully set up (finalized).
|
|
|
+
|
|
|
+ """
|
|
|
if _EXECV and opts.get('lazy', True):
|
|
|
# When using execv the task in the original module will point to a
|
|
|
# different app, so doing things like 'add.request' will point to
|
|
@@ -316,6 +404,8 @@ class Celery(object):
|
|
|
return gen_task_name(self, name, module)
|
|
|
|
|
|
def finalize(self, auto=False):
|
|
|
+ """Finalizes the app by loading built-in tasks,
|
|
|
+ and evaluating pending task decorators."""
|
|
|
with self._finalize_mutex:
|
|
|
if not self.finalized:
|
|
|
if auto and not self.autofinalize:
|
|
@@ -333,6 +423,22 @@ class Celery(object):
|
|
|
self.on_after_finalize.send(sender=self)
|
|
|
|
|
|
def add_defaults(self, fun):
|
|
|
+ """Add default configuration from dict ``d``.
|
|
|
+
|
|
|
+ If the argument is a callable function then it will be regarded
|
|
|
+ as a promise, and it won't be loaded until the configuration is
|
|
|
+ actually needed.
|
|
|
+
|
|
|
+ This method can be compared to::
|
|
|
+
|
|
|
+ >>> celery.conf.update(d)
|
|
|
+
|
|
|
+ with a difference that 1) no copy will be made and 2) the dict will
|
|
|
+ not be transferred when the worker spawns child processes, so
|
|
|
+ it's important that the same configuration happens at import time
|
|
|
+ when pickle restores the object on the other side.
|
|
|
+
|
|
|
+ """
|
|
|
if not callable(fun):
|
|
|
d, fun = fun, lambda: d
|
|
|
if self.configured:
|
|
@@ -340,12 +446,39 @@ class Celery(object):
|
|
|
self._pending_defaults.append(fun)
|
|
|
|
|
|
def config_from_object(self, obj, silent=False, force=False):
|
|
|
+ """Reads configuration from object, where object is either
|
|
|
+ an object or the name of a module to import.
|
|
|
+
|
|
|
+ :keyword silent: If true then import errors will be ignored.
|
|
|
+
|
|
|
+ :keyword force: Force reading configuration immediately.
|
|
|
+ By default the configuration will be read only when required.
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
+
|
|
|
+ >>> celery.config_from_object("myapp.celeryconfig")
|
|
|
+
|
|
|
+ >>> from myapp import celeryconfig
|
|
|
+ >>> celery.config_from_object(celeryconfig)
|
|
|
+
|
|
|
+ """
|
|
|
self._config_source = obj
|
|
|
if force or self.configured:
|
|
|
self._conf = None
|
|
|
return self.loader.config_from_object(obj, silent=silent)
|
|
|
|
|
|
def config_from_envvar(self, variable_name, silent=False, force=False):
|
|
|
+ """Read configuration from environment variable.
|
|
|
+
|
|
|
+ The value of the environment variable must be the name
|
|
|
+ of a module to import.
|
|
|
+
|
|
|
+ .. code-block:: pycon
|
|
|
+
|
|
|
+ >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
|
|
|
+ >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
|
|
|
+
|
|
|
+ """
|
|
|
module_name = os.environ.get(variable_name)
|
|
|
if not module_name:
|
|
|
if silent:
|
|
@@ -361,12 +494,69 @@ class Celery(object):
|
|
|
|
|
|
def setup_security(self, allowed_serializers=None, key=None, cert=None,
|
|
|
store=None, digest='sha1', serializer='json'):
|
|
|
+ """Setup the message-signing serializer.
|
|
|
+
|
|
|
+ This will affect all application instances (a global operation).
|
|
|
+
|
|
|
+ Disables untrusted serializers and if configured to use the ``auth``
|
|
|
+ serializer will register the auth serializer with the provided settings
|
|
|
+ into the Kombu serializer registry.
|
|
|
+
|
|
|
+ :keyword allowed_serializers: List of serializer names, or content_types
|
|
|
+ that should be exempt from being disabled.
|
|
|
+ :keyword key: Name of private key file to use.
|
|
|
+ Defaults to the :setting:`CELERY_SECURITY_KEY` setting.
|
|
|
+ :keyword cert: Name of certificate file to use.
|
|
|
+ Defaults to the :setting:`CELERY_SECURITY_CERTIFICATE` setting.
|
|
|
+ :keyword store: Directory containing certificates.
|
|
|
+ Defaults to the :setting:`CELERY_SECURITY_CERT_STORE` setting.
|
|
|
+ :keyword digest: Digest algorithm used when signing messages.
|
|
|
+ Default is ``sha1``.
|
|
|
+ :keyword serializer: Serializer used to encode messages after
|
|
|
+ they have been signed. See :setting:`CELERY_TASK_SERIALIZER` for
|
|
|
+ the serializers supported.
|
|
|
+ Default is ``json``.
|
|
|
+
|
|
|
+ """
|
|
|
from celery.security import setup_security
|
|
|
return setup_security(allowed_serializers, key, cert,
|
|
|
store, digest, serializer, app=self)
|
|
|
|
|
|
def autodiscover_tasks(self, packages=None,
|
|
|
related_name='tasks', force=False):
|
|
|
+ """Try to autodiscover and import modules with a specific name (by
|
|
|
+ default 'tasks').
|
|
|
+
|
|
|
+ If the name is empty, this will be delegated to fixups (e.g. Django).
|
|
|
+
|
|
|
+ For example if you have an (imagined) directory tree like this::
|
|
|
+
|
|
|
+ foo/__init__.py
|
|
|
+ tasks.py
|
|
|
+ models.py
|
|
|
+
|
|
|
+ bar/__init__.py
|
|
|
+ tasks.py
|
|
|
+ models.py
|
|
|
+
|
|
|
+ baz/__init__.py
|
|
|
+ models.py
|
|
|
+
|
|
|
+ Then calling ``app.autodiscover_tasks(['foo', bar', 'baz'])`` will
|
|
|
+ result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.
|
|
|
+
|
|
|
+ :param packages: List of packages to search.
|
|
|
+ This argument may also be a callable, in which case the
|
|
|
+ value returned is used (for lazy evaluation).
|
|
|
+ :keyword related_name: The name of the module to find. Defaults
|
|
|
+ to "tasks", which means it look for "module.tasks" for every
|
|
|
+ module in ``packages``.
|
|
|
+ :keyword force: By default this call is lazy so that the actual
|
|
|
+ autodiscovery will not happen until an application imports the
|
|
|
+ default modules. Forcing will cause the autodiscovery to happen
|
|
|
+ immediately.
|
|
|
+
|
|
|
+ """
|
|
|
if force:
|
|
|
return self._autodiscover_tasks(packages, related_name)
|
|
|
signals.import_modules.connect(promise(
|
|
@@ -399,6 +589,15 @@ class Celery(object):
|
|
|
reply_to=None, time_limit=None, soft_time_limit=None,
|
|
|
root_id=None, parent_id=None, route_name=None,
|
|
|
shadow=None, **options):
|
|
|
+ """Send task by name.
|
|
|
+
|
|
|
+ :param name: Name of task to call (e.g. `"tasks.add"`).
|
|
|
+ :keyword result_cls: Specify custom result class. Default is
|
|
|
+ using :meth:`AsyncResult`.
|
|
|
+
|
|
|
+ Otherwise supports the same arguments as :meth:`@-Task.apply_async`.
|
|
|
+
|
|
|
+ """
|
|
|
amqp = self.amqp
|
|
|
task_id = task_id or uuid()
|
|
|
producer = producer or publisher # XXX compat
|
|
@@ -436,6 +635,24 @@ class Celery(object):
|
|
|
connect_timeout=None, transport=None,
|
|
|
transport_options=None, heartbeat=None,
|
|
|
login_method=None, failover_strategy=None, **kwargs):
|
|
|
+ """Establish a connection to the message broker.
|
|
|
+
|
|
|
+ :param url: Either the URL or the hostname of the broker to use.
|
|
|
+
|
|
|
+ :keyword hostname: URL, Hostname/IP-address of the broker.
|
|
|
+ If an URL is used, then the other argument below will
|
|
|
+ be taken from the URL instead.
|
|
|
+ :keyword userid: Username to authenticate as.
|
|
|
+ :keyword password: Password to authenticate with
|
|
|
+ :keyword virtual_host: Virtual host to use (domain).
|
|
|
+ :keyword port: Port to connect to.
|
|
|
+ :keyword ssl: Defaults to the :setting:`BROKER_USE_SSL` setting.
|
|
|
+ :keyword transport: defaults to the :setting:`BROKER_TRANSPORT`
|
|
|
+ setting.
|
|
|
+
|
|
|
+ :returns :class:`kombu.Connection`:
|
|
|
+
|
|
|
+ """
|
|
|
conf = self.conf
|
|
|
return self.amqp.Connection(
|
|
|
hostname or conf.BROKER_URL,
|
|
@@ -466,10 +683,23 @@ class Celery(object):
|
|
|
return self.connection()
|
|
|
|
|
|
def connection_or_acquire(self, connection=None, pool=True, *_, **__):
|
|
|
+ """For use within a with-statement to get a connection from the pool
|
|
|
+ if one is not already provided.
|
|
|
+
|
|
|
+ :keyword connection: If not provided, then a connection will be
|
|
|
+ acquired from the connection pool.
|
|
|
+ """
|
|
|
return FallbackContext(connection, self._acquire_connection, pool=pool)
|
|
|
default_connection = connection_or_acquire # XXX compat
|
|
|
|
|
|
def producer_or_acquire(self, producer=None):
|
|
|
+ """For use within a with-statement to get a producer from the pool
|
|
|
+ if one is not already provided
|
|
|
+
|
|
|
+ :keyword producer: If not provided, then a producer will be
|
|
|
+ acquired from the producer pool.
|
|
|
+
|
|
|
+ """
|
|
|
return FallbackContext(
|
|
|
producer, self.amqp.producer_pool.acquire, block=True,
|
|
|
)
|
|
@@ -480,9 +710,12 @@ class Celery(object):
|
|
|
return find_deprecated_settings(c)
|
|
|
|
|
|
def now(self):
|
|
|
+ """Return the current time and date as a
|
|
|
+ :class:`~datetime.datetime` object."""
|
|
|
return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
|
|
|
|
|
|
def mail_admins(self, subject, body, fail_silently=False):
|
|
|
+ """Sends an email to the admins in the :setting:`ADMINS` setting."""
|
|
|
conf = self.conf
|
|
|
if conf.ADMINS:
|
|
|
to = [admin_email for _, admin_email in conf.ADMINS]
|
|
@@ -500,6 +733,9 @@ class Celery(object):
|
|
|
)
|
|
|
|
|
|
def select_queues(self, queues=None):
|
|
|
+ """Select a subset of queues, where queues must be a list of queue
|
|
|
+ names to keep."""
|
|
|
+
|
|
|
return self.amqp.queues.select(queues)
|
|
|
|
|
|
def either(self, default_key, *values):
|
|
@@ -508,6 +744,8 @@ class Celery(object):
|
|
|
return first(None, values) or self.conf.get(default_key)
|
|
|
|
|
|
def bugreport(self):
|
|
|
+ """Return a string with information useful for the Celery core
|
|
|
+ developers when reporting a bug."""
|
|
|
return bugreport(self)
|
|
|
|
|
|
def _get_backend(self):
|
|
@@ -560,6 +798,11 @@ class Celery(object):
|
|
|
amqp._producer_pool = None
|
|
|
|
|
|
def signature(self, *args, **kwargs):
|
|
|
+ """Return a new :class:`~celery.canvas.Signature` bound to this app.
|
|
|
+
|
|
|
+ See :meth:`~celery.signature`
|
|
|
+
|
|
|
+ """
|
|
|
kwargs['app'] = self
|
|
|
return self.canvas.signature(*args, **kwargs)
|
|
|
|
|
@@ -671,18 +914,26 @@ class Celery(object):
|
|
|
|
|
|
@cached_property
|
|
|
def Worker(self):
|
|
|
+ """Worker application. See :class:`~@Worker`."""
|
|
|
return self.subclass_with_self('celery.apps.worker:Worker')
|
|
|
|
|
|
@cached_property
|
|
|
def WorkController(self, **kwargs):
|
|
|
+ """Embeddable worker. See :class:`~@WorkController`."""
|
|
|
return self.subclass_with_self('celery.worker:WorkController')
|
|
|
|
|
|
@cached_property
|
|
|
def Beat(self, **kwargs):
|
|
|
+ """Celerybeat scheduler application.
|
|
|
+
|
|
|
+ See :class:`~@Beat`.
|
|
|
+
|
|
|
+ """
|
|
|
return self.subclass_with_self('celery.apps.beat:Beat')
|
|
|
|
|
|
@cached_property
|
|
|
def Task(self):
|
|
|
+ """Base task class for this app."""
|
|
|
return self.create_task_cls()
|
|
|
|
|
|
@cached_property
|
|
@@ -691,6 +942,11 @@ class Celery(object):
|
|
|
|
|
|
@cached_property
|
|
|
def AsyncResult(self):
|
|
|
+ """Create new result instance.
|
|
|
+
|
|
|
+ See :class:`celery.result.AsyncResult`.
|
|
|
+
|
|
|
+ """
|
|
|
return self.subclass_with_self('celery.result:AsyncResult')
|
|
|
|
|
|
@cached_property
|
|
@@ -699,6 +955,11 @@ class Celery(object):
|
|
|
|
|
|
@cached_property
|
|
|
def GroupResult(self):
|
|
|
+ """Create new group result instance.
|
|
|
+
|
|
|
+ See :class:`celery.result.GroupResult`.
|
|
|
+
|
|
|
+ """
|
|
|
return self.subclass_with_self('celery.result:GroupResult')
|
|
|
|
|
|
@cached_property
|
|
@@ -713,6 +974,11 @@ class Celery(object):
|
|
|
|
|
|
@property
|
|
|
def pool(self):
|
|
|
+ """Broker connection pool: :class:`~@pool`.
|
|
|
+
|
|
|
+ This attribute is not related to the workers concurrency pool.
|
|
|
+
|
|
|
+ """
|
|
|
if self._pool is None:
|
|
|
_ensure_after_fork()
|
|
|
limit = self.conf.BROKER_POOL_LIMIT
|
|
@@ -721,6 +987,8 @@ class Celery(object):
|
|
|
|
|
|
@property
|
|
|
def current_task(self):
|
|
|
+ """The instance of the task that is being executed, or
|
|
|
+ :const:`None`."""
|
|
|
return _task_stack.top
|
|
|
|
|
|
@cached_property
|
|
@@ -729,14 +997,17 @@ class Celery(object):
|
|
|
|
|
|
@cached_property
|
|
|
def amqp(self):
|
|
|
+ """AMQP related functionality: :class:`~@amqp`."""
|
|
|
return instantiate(self.amqp_cls, app=self)
|
|
|
|
|
|
@cached_property
|
|
|
def backend(self):
|
|
|
+ """Current backend instance."""
|
|
|
return self._get_backend()
|
|
|
|
|
|
@property
|
|
|
def conf(self):
|
|
|
+ """Current configuration."""
|
|
|
if self._conf is None:
|
|
|
self._load_config()
|
|
|
return self._conf
|
|
@@ -747,18 +1018,22 @@ class Celery(object):
|
|
|
|
|
|
@cached_property
|
|
|
def control(self):
|
|
|
+ """Remote control: :class:`~@control`."""
|
|
|
return instantiate(self.control_cls, app=self)
|
|
|
|
|
|
@cached_property
|
|
|
def events(self):
|
|
|
+ """Consuming and sending events: :class:`~@events`."""
|
|
|
return instantiate(self.events_cls, app=self)
|
|
|
|
|
|
@cached_property
|
|
|
def loader(self):
|
|
|
+ """Current loader instance."""
|
|
|
return get_loader_cls(self.loader_cls)(app=self)
|
|
|
|
|
|
@cached_property
|
|
|
def log(self):
|
|
|
+ """Logging: :class:`~@log`."""
|
|
|
return instantiate(self.log_cls, app=self)
|
|
|
|
|
|
@cached_property
|
|
@@ -768,11 +1043,22 @@ class Celery(object):
|
|
|
|
|
|
@cached_property
|
|
|
def tasks(self):
|
|
|
+ """Task registry.
|
|
|
+
|
|
|
+ Accessing this attribute will also finalize the app.
|
|
|
+
|
|
|
+ """
|
|
|
self.finalize(auto=True)
|
|
|
return self._tasks
|
|
|
|
|
|
@cached_property
|
|
|
def timezone(self):
|
|
|
+ """Current timezone for this app.
|
|
|
+
|
|
|
+ This is a cached property taking the time zone from the
|
|
|
+ :setting:`CELERY_TIMEZONE` setting.
|
|
|
+
|
|
|
+ """
|
|
|
from celery.utils.timeutils import timezone
|
|
|
conf = self.conf
|
|
|
tz = conf.CELERY_TIMEZONE
|