Sfoglia il codice sorgente

Merge branch 'master' into 5.0-devel

Ask Solem 8 anni fa
parent
commit
d53f998d34

+ 1 - 1
celery/app/amqp.py

@@ -9,8 +9,8 @@ from weakref import WeakValueDictionary
 from kombu import pools
 from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu.common import Broadcast
-from kombu.utils import cached_property
 from kombu.utils.functional import maybe_list
+from kombu.utils.objects import cached_property
 
 from celery import signals
 from celery.utils.nodenames import anon_nodename

+ 35 - 24
celery/app/base.py

@@ -10,7 +10,9 @@ from operator import attrgetter
 from kombu import pools
 from kombu.clocks import LamportClock
 from kombu.common import oid_from
-from kombu.utils import cached_property, register_after_fork, uuid
+from kombu.utils.compat import register_after_fork
+from kombu.utils.objects import cached_property
+from kombu.utils.uuid import uuid
 from vine import starpromise
 from vine.utils import wraps
 
@@ -278,13 +280,12 @@ class Celery:
         """Clean up after the application.
 
         Only necessary for dynamically created apps for which you can
-        use the :keyword:`with` statement instead:
+        use the :keyword:`with` statement instead
 
-        .. code-block:: python
-
-            with Celery(set_as_current=False) as app:
-                with app.connection_for_write() as conn:
-                    pass
+        Example:
+            >>> with Celery(set_as_current=False) as app:
+            ...     with app.connection_for_write() as conn:
+            ...         pass
         """
         self._pool = None
         _deregister_app(self)
@@ -493,8 +494,7 @@ class Celery:
         The value of the environment variable must be the name
         of a module to import.
 
-        .. code-block:: pycon
-
+        Example:
             >>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig'
             >>> celery.config_from_envvar('CELERY_CONFIG_MODULE')
         """
@@ -547,7 +547,7 @@ class Celery:
 
         If the name is empty, this will be delegated to fix-ups (e.g. Django).
 
-        For example if you have an (imagined) directory tree like this:
+        For example if you have an directory layout like this:
 
         .. code-block:: text
 
@@ -664,14 +664,16 @@ class Celery:
     def connection_for_read(self, url=None, **kwargs):
         """Establish connection used for consuming.
 
-        See :meth:`connection` for supported arguments.
+        See Also:
+            :meth:`connection` for supported arguments.
         """
         return self._connection(url or self.conf.broker_read_url, **kwargs)
 
     def connection_for_write(self, url=None, **kwargs):
         """Establish connection used for producing.
 
-        See :meth:`connection` for supported arguments.
+        See Also:
+            :meth:`connection` for supported arguments.
         """
         return self._connection(url or self.conf.broker_write_url, **kwargs)
 
@@ -688,7 +690,6 @@ class Celery:
 
         Arguments:
             url: Either the URL or the hostname of the broker to use.
-
             hostname (str): URL, Hostname/IP-address of the broker.
                 If a URL is used, then the other argument below will
                 be taken from the URL instead.
@@ -851,10 +852,7 @@ class Celery:
         self.on_after_fork.send(sender=self)
 
     def signature(self, *args, **kwargs):
-        """Return a new :class:`~celery.canvas.Signature` bound to this app.
-
-        See :meth:`~celery.signature`
-        """
+        """Return a new :class:`~celery.Signature` bound to this app."""
         kwargs['app'] = self
         return self.canvas.signature(*args, **kwargs)
 
@@ -975,19 +973,28 @@ class Celery:
 
     @cached_property
     def Worker(self):
-        """Worker application. See :class:`~@Worker`."""
+        """Worker application.
+
+        See Also:
+            :class:`~@Worker`.
+        """
         return self.subclass_with_self('celery.apps.worker:Worker')
 
     @cached_property
     def WorkController(self, **kwargs):
-        """Embeddable worker. See :class:`~@WorkController`."""
+        """Embeddable worker.
+
+        See Also:
+            :class:`~@WorkController`.
+        """
         return self.subclass_with_self('celery.worker:WorkController')
 
     @cached_property
     def Beat(self, **kwargs):
         """:program:`celery beat` scheduler application.
 
-        See :class:`~@Beat`.
+        See Also:
+            :class:`~@Beat`.
         """
         return self.subclass_with_self('celery.apps.beat:Beat')
 
@@ -1004,7 +1011,8 @@ class Celery:
     def AsyncResult(self):
         """Create new result instance.
 
-        See :class:`celery.result.AsyncResult`.
+        See Also:
+            :class:`celery.result.AsyncResult`.
         """
         return self.subclass_with_self('celery.result:AsyncResult')
 
@@ -1016,7 +1024,8 @@ class Celery:
     def GroupResult(self):
         """Create new group result instance.
 
-        See :class:`celery.result.GroupResult`.
+        See Also:
+            :class:`celery.result.GroupResult`.
         """
         return self.subclass_with_self('celery.result:GroupResult')
 
@@ -1024,7 +1033,8 @@ class Celery:
     def pool(self):
         """Broker connection pool: :class:`~@pool`.
 
-        This attribute is not related to the workers concurrency pool.
+        Note:
+            This attribute is not related to the workers concurrency pool.
         """
         if self._pool is None:
             self._ensure_after_fork()
@@ -1107,7 +1117,8 @@ class Celery:
     def tasks(self):
         """Task registry.
 
-        Accessing this attribute will also finalize the app.
+        Warning:
+            Accessing this attribute will also auto-finalize the app.
         """
         self.finalize(auto=True)
         return self._tasks

+ 1 - 1
celery/app/control.py

@@ -9,8 +9,8 @@ import warnings
 from billiard.common import TERM_SIGNAME
 
 from kombu.pidbox import Mailbox
-from kombu.utils import cached_property
 from kombu.utils.functional import lazy
+from kombu.utils.objects import cached_property
 
 from celery.exceptions import DuplicateNodenameWarning
 from celery.utils.text import pluralize

+ 1 - 1
celery/app/task.py

@@ -3,7 +3,7 @@
 import sys
 
 from billiard.einfo import ExceptionInfo
-from kombu.utils import uuid
+from kombu.utils.uuid import uuid
 
 from celery import current_app, group
 from celery import states

+ 1 - 1
celery/backends/async.py

@@ -7,7 +7,7 @@ from weakref import WeakKeyDictionary
 from queue import Empty
 
 from kombu.syn import detect_environment
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
 
 from celery import states
 from celery.exceptions import TimeoutError

+ 1 - 1
celery/backends/cache.py

@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 """Memcached and in-memory cache result backend."""
-from kombu.utils import cached_property
 from kombu.utils.encoding import bytes_to_str, ensure_bytes
+from kombu.utils.objects import cached_property
 
 from celery.exceptions import ImproperlyConfigured
 from celery.utils.functional import LRUCache

+ 1 - 1
celery/backends/cassandra.py

@@ -133,7 +133,7 @@ class CassandraBackend(BaseBackend):
         Arguments:
             write (bool): are we a writer?
         """
-        if self._connections is not None:
+        if self._connection is not None:
             return
         try:
             self._connection = cassandra.cluster.Cluster(

+ 1 - 1
celery/backends/database/session.py

@@ -5,7 +5,7 @@ from sqlalchemy.ext.declarative import declarative_base
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.pool import NullPool
 
-from kombu.utils import register_after_fork
+from kombu.utils.compat import register_after_fork
 
 ResultModelBase = declarative_base()
 

+ 2 - 1
celery/backends/mongodb.py

@@ -2,9 +2,10 @@
 """MongoDB result store backend."""
 from datetime import datetime, timedelta
 
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
 from kombu.utils.url import maybe_sanitize_url
 from kombu.exceptions import EncodeError
+
 from celery import states
 from celery.exceptions import ImproperlyConfigured
 

+ 2 - 1
celery/backends/redis.py

@@ -2,7 +2,8 @@
 """Redis result store backend."""
 from functools import partial
 
-from kombu.utils import cached_property, retry_over_time
+from kombu.utils.functional import retry_over_time
+from kombu.utils.objects import cached_property
 from kombu.utils.url import _parse_url
 
 from celery import states

+ 2 - 1
celery/backends/rpc.py

@@ -5,7 +5,8 @@ RPC-style result backend, using reply-to and one queue per client.
 """
 from kombu import Consumer, Exchange, Producer, Queue
 from kombu.common import maybe_declare
-from kombu.utils import cached_property, register_after_fork
+from kombu.utils.compat import register_after_fork
+from kombu.utils.objects import cached_property
 
 from celery import current_task
 from celery import states

+ 3 - 3
celery/beat.py

@@ -16,8 +16,8 @@ from time import monotonic
 from billiard import ensure_multiprocessing
 from billiard.context import Process
 from billiard.common import reset_signals
-from kombu.utils import cached_property, reprcall
-from kombu.utils.functional import maybe_evaluate
+from kombu.utils.functional import maybe_evaluate, reprcall
+from kombu.utils.objects import cached_property
 
 from . import __version__
 from . import platforms
@@ -133,7 +133,7 @@ class ScheduleEntry:
         return iter(vars(self).items())
 
     def __repr__(self):
-        return '<{name} {0.name} {call} {0.schedule}'.format(
+        return '<{name}: {0.name} {call} {0.schedule}'.format(
             self,
             call=reprcall(self.task, self.args or (), self.kwargs or {}),
             name=type(self).__name__,

+ 1 - 1
celery/bin/multi.py

@@ -103,8 +103,8 @@ from functools import partial
 from subprocess import Popen
 from time import sleep
 
-from kombu.utils import cached_property
 from kombu.utils.encoding import from_utf8
+from kombu.utils.objects import cached_property
 
 from celery import VERSION_BANNER
 from celery.platforms import Pidfile, IS_WINDOWS

+ 1 - 1
celery/bootsteps.py

@@ -5,7 +5,7 @@ from threading import Event
 from typing import Any, Callable, Mapping, Optional, Set, Sequence, Tuple
 
 from kombu.common import ignore_errors
-from kombu.utils import symbol_by_name
+from kombu.utils.imports import symbol_by_name
 
 from .utils.abstract import AbstractApp
 from .utils.graph import DependencyGraph, GraphFormatter

+ 4 - 2
celery/canvas.py

@@ -11,7 +11,9 @@ from functools import partial as _partial, reduce
 from operator import itemgetter
 from itertools import chain as _chain
 
-from kombu.utils import cached_property, fxrange, reprcall, uuid
+from kombu.utils.functional import fxrange, reprcall
+from kombu.utils.objects import cached_property
+from kombu.utils.uuid import uuid
 from vine import barrier
 
 from celery._state import current_app
@@ -1060,7 +1062,7 @@ class chord(Signature):
 
     Example:
 
-        The chrod:
+        The chord:
 
         .. code-block:: pycon
 

+ 1 - 1
celery/concurrency/__init__.py

@@ -3,7 +3,7 @@
 # Import from kombu directly as it's used
 # early in the import stage, where celery.utils loads
 # too much (e.g. for eventlet patching)
-from kombu.utils import symbol_by_name
+from kombu.utils.imports import symbol_by_name
 
 __all__ = ['get_implementation']
 

+ 1 - 1
celery/concurrency/asynpool.py

@@ -36,8 +36,8 @@ from billiard.compat import buf_t, setblocking, isblocking
 from billiard.queues import _SimpleQueue
 from kombu.async import READ, WRITE, ERR
 from kombu.serialization import pickle as _pickle
-from kombu.utils import fxrange
 from kombu.utils.eventio import SELECT_BAD_FD
+from kombu.utils.functional import fxrange
 from vine import promise
 
 from celery.utils.functional import noop

+ 6 - 5
celery/contrib/migrate.py

@@ -170,11 +170,12 @@ def move(predicate, connection=None, exchange=None, routing_key=None,
 
         move(is_wanted_task, transform=transform)
 
-    The predicate may also return a tuple of ``(exchange, routing_key)``
-    to specify the destination to where the task should be moved,
-    or a :class:`~kombu.entitiy.Queue` instance.
-    Any other true value means that the task will be moved to the
-    default exchange/routing_key.
+    Note:
+        The predicate may also return a tuple of ``(exchange, routing_key)``
+        to specify the destination to where the task should be moved,
+        or a :class:`~kombu.entitiy.Queue` instance.
+        Any other true value means that the task will be moved to the
+        default exchange/routing_key.
     """
     app = app_or_default(app)
     queues = [_maybe_queue(app, queue) for queue in source or []] or None

+ 2 - 1
celery/events/__init__.py

@@ -17,8 +17,9 @@ from operator import itemgetter
 from kombu import Exchange, Queue, Producer
 from kombu.connection import maybe_channel
 from kombu.mixins import ConsumerMixin
-from kombu.utils import cached_property, uuid
+from kombu.utils.objects import cached_property
 
+from celery import uuid
 from celery.app import app_or_default
 from celery.utils.functional import dictfilter
 from celery.utils.nodenames import anon_nodename

+ 1 - 1
celery/events/state.py

@@ -26,7 +26,7 @@ from time import time
 from weakref import WeakSet, ref
 
 from kombu.clocks import timetuple
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
 
 from celery import states
 from celery.utils.functional import LRUCache, memoize, pass1

+ 2 - 1
celery/fixups/django.py

@@ -3,7 +3,8 @@ import os
 import sys
 import warnings
 
-from kombu.utils import cached_property, symbol_by_name
+from kombu.utils.imports import symbol_by_name
+from kombu.utils.objects import cached_property
 
 from datetime import datetime
 from importlib import import_module

+ 1 - 1
celery/platforms.py

@@ -15,7 +15,7 @@ from collections import namedtuple
 
 from billiard.compat import get_fdmax, close_open_fds
 # fileno used to be in this module
-from kombu.utils import maybe_fileno
+from kombu.utils.compat import maybe_fileno
 from kombu.utils.encoding import safe_str
 from contextlib import contextmanager
 

+ 1 - 1
celery/result.py

@@ -4,7 +4,7 @@ from collections import deque
 from contextlib import contextmanager
 from time import monotonic
 
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
 from vine import Thenable, barrier, promise
 
 from . import current_app

+ 1 - 1
celery/schedules.py

@@ -7,7 +7,7 @@ from bisect import bisect, bisect_left
 from collections import Iterable, namedtuple
 from datetime import datetime, timedelta
 
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
 
 from . import current_app
 from .utils.collections import AttributeDict

+ 3 - 1
celery/signals.py

@@ -7,7 +7,9 @@ both workers and clients.
 Functions can be connected to these signals, and connected
 functions are called whenever a signal is called.
 
-See :ref:`signals` for more information.
+.. seealso::
+
+    :ref:`signals` for more information.
 """
 from .utils.dispatch import Signal
 

+ 1 - 1
celery/tests/app/test_beat.py

@@ -76,7 +76,7 @@ class test_ScheduleEntry(AppCase):
 
     def test_repr(self):
         entry = self.create_entry()
-        self.assertIn('<Entry:', repr(entry))
+        self.assertIn('<ScheduleEntry:', repr(entry))
 
     def test_reduce(self):
         entry = self.create_entry(schedule=timedelta(seconds=10))

+ 1 - 1
celery/tests/case.py

@@ -11,7 +11,7 @@ from datetime import datetime, timedelta
 from functools import partial
 
 from kombu import Queue
-from kombu.utils import symbol_by_name
+from kombu.utils.imports import symbol_by_name
 from vine.utils import wraps
 
 from celery import Celery

+ 1 - 1
celery/tests/worker/test_control.py

@@ -6,7 +6,7 @@ from datetime import datetime, timedelta
 from queue import Queue as FastQueue
 
 from kombu import pidbox
-from kombu.utils import uuid
+from kombu.utils.uuid import uuid
 
 from celery.utils.timer2 import Timer
 from celery.worker import WorkController as _WC

+ 1 - 1
celery/tests/worker/test_request.py

@@ -8,8 +8,8 @@ from datetime import datetime, timedelta
 from time import monotonic
 
 from billiard.einfo import ExceptionInfo
-from kombu.utils import uuid
 from kombu.utils.encoding import from_utf8, safe_str, safe_repr
+from kombu.utils.uuid import uuid
 
 from celery import states
 from celery.app.trace import (

+ 1 - 1
celery/tests/worker/test_worker.py

@@ -13,7 +13,7 @@ from kombu import Connection
 from kombu.common import QoS, ignore_errors
 from kombu.transport.base import Message
 from kombu.transport.memory import Transport
-from kombu.utils import uuid
+from kombu.utils.uuid import uuid
 
 from celery.bootsteps import RUN, CLOSE, TERMINATE, StartStopStep
 from celery.concurrency.base import BasePool

+ 4 - 4
celery/utils/__init__.py

@@ -4,12 +4,11 @@
 Do not import from here directly anymore, as these are only
 here for backwards compatibility.
 """
-from .functional import memoize  # noqa
-
+from .functional import memoize
 from .nodenames import worker_direct, nodename, nodesplit
 
 __all__ = ['worker_direct', 'gen_task_name', 'nodename', 'nodesplit',
-           'cached_property', 'uuid']
+           'cached_property', 'uuid', 'memoize']
 
 # ------------------------------------------------------------------------ #
 # > XXX Compat
@@ -18,4 +17,5 @@ from .imports import (          # noqa
     instantiate, import_from_cwd, gen_task_name,
 )
 from .functional import chunks, noop                    # noqa
-from kombu.utils import cached_property, uuid   # noqa
+from kombu.utils.objects import cached_property         # noqa
+from kombu.utils.uuid import uuid                       # noqa

+ 3 - 2
celery/utils/imports.py

@@ -10,7 +10,7 @@ from imp import reload
 from types import ModuleType
 from typing import Any, Callable, Iterator, Optional
 
-from kombu.utils import symbol_by_name
+from kombu.utils.imports import symbol_by_name
 
 #: Billiard sets this when execv is enabled.
 #: We use it to find out the name of the original ``__main__``
@@ -41,7 +41,8 @@ def qualname(obj: Any) -> str:
 def instantiate(name: Any, *args, **kwargs) -> Any:
     """Instantiate class by name.
 
-    See :func:`symbol_by_name`.
+    See Also:
+        :func:`symbol_by_name`.
     """
     return symbol_by_name(name)(*args, **kwargs)
 

+ 1 - 1
celery/utils/sysinfo.py

@@ -6,7 +6,7 @@ from collections import namedtuple
 
 from math import ceil
 
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
 
 __all__ = ['load_average', 'load_average_t', 'df']
 

+ 2 - 1
celery/utils/timeutils.py

@@ -8,7 +8,8 @@ from calendar import monthrange
 from datetime import date, datetime, timedelta, tzinfo
 from typing import Any, Callable, Dict, Optional, Union
 
-from kombu.utils import cached_property, reprcall
+from kombu.utils.functional import reprcall
+from kombu.utils.objects import cached_property
 
 from pytz import timezone as _timezone, AmbiguousTimeError, FixedOffset
 

+ 1 - 1
celery/worker/request.py

@@ -8,8 +8,8 @@ from datetime import datetime
 from weakref import ref
 
 from billiard.common import TERM_SIGNAME
-from kombu.utils import cached_property
 from kombu.utils.encoding import safe_repr, safe_str
+from kombu.utils.objects import cached_property
 
 from celery import signals
 from celery.app.trace import trace_task, trace_task_ret

+ 1 - 1
celery/worker/state.py

@@ -14,7 +14,7 @@ import zlib
 from collections import Counter
 
 from kombu.serialization import pickle, pickle_protocol
-from kombu.utils import cached_property
+from kombu.utils.objects import cached_property
 
 from celery import __version__
 from celery.exceptions import WorkerShutdown, WorkerTerminate