Ask Solem 8 anni fa
parent
commit
85debc5f14
4 ha cambiato i file con 211 aggiunte e 131 eliminazioni
  1. 5 0
      celery/result.py
  2. 61 8
      celery/utils/abstract.py
  3. 138 116
      celery/utils/collections.py
  4. 7 7
      celery/utils/term.py

+ 5 - 0
celery/result.py

@@ -15,6 +15,7 @@ from . import states
 from ._state import _set_task_join_will_block, task_join_will_block
 from .app import app_or_default
 from .exceptions import ImproperlyConfigured, IncompleteStream, TimeoutError
+from .utils.abstract import AbstractResult
 from .utils.graph import DependencyGraph, GraphFormatter
 
 try:
@@ -56,6 +57,7 @@ class ResultBase:
     parent = None
 
 
+@AbstractResult.register
 @Thenable.register
 class AsyncResult(ResultBase):
     """Query task state.
@@ -416,6 +418,7 @@ class AsyncResult(ResultBase):
 
 
 @Thenable.register
+@AbstractResult.register
 class ResultSet(ResultBase):
     """Working with more than one result.
 
@@ -743,6 +746,7 @@ class ResultSet(ResultBase):
         return self.app.backend if self.app else self.results[0].backend
 
 
+@AbstractResult.register
 @Thenable.register
 class GroupResult(ResultSet):
     """Like :class:`ResultSet`, but with an associated id.
@@ -818,6 +822,7 @@ class GroupResult(ResultSet):
         ).restore_group(id)
 
 
+@AbstractResult.register
 @Thenable.register
 class EagerResult(AsyncResult):
     """Result that we know has already been executed."""

+ 61 - 8
celery/utils/abstract.py

@@ -3,6 +3,8 @@
 from abc import ABCMeta, abstractmethod, abstractproperty
 from collections import Callable
 
+from typing import Any, Sequence, Tuple
+
 __all__ = ['CallableTask', 'CallableSignature']
 
 
@@ -21,7 +23,7 @@ class _AbstractClass(metaclass=ABCMeta):
         ) or NotImplemented
 
     @classmethod
-    def register(cls, other):
+    def register(cls, other: Any):
         # we override `register` to return other for use as a decorator.
         type(cls).register(cls, other)
         return other
@@ -57,6 +59,56 @@ class AbstractApp(_AbstractClass):  # pragma: no cover
         pass
 
 
+class AbstractResult(_AbstractClass):  # pragma: no cover
+    __required_attributes__ = frozenset({
+        'as_tuple', 'forget', 'get', 'ready', 'successful', 'failed',
+    })
+
+    @abstractmethod
+    def as_tuple(self) -> Tuple:
+        pass
+
+    @abstractmethod
+    def forget(self) -> None:
+        pass
+
+    @abstractmethod
+    def get(self, *args, **kwargs) -> Any:
+        pass
+
+    @abstractmethod
+    def ready(self) -> bool:
+        pass
+
+    @abstractmethod
+    def successful(self) -> bool:
+        pass
+
+    @abstractmethod
+    def failed(self) -> bool:
+        pass
+
+    @abstractproperty
+    def backend(self) -> Any:
+        pass
+
+    @abstractproperty
+    def children(self) -> Sequence['AbstractResult']:
+        pass
+
+    @abstractproperty
+    def result(self) -> Any:
+        pass
+
+    @abstractproperty
+    def traceback(self) -> str:
+        pass
+
+    @abstractproperty
+    def state(self) -> str:
+        pass
+
+
 class CallableTask(_AbstractClass, Callable):  # pragma: no cover
     __required_attributes__ = frozenset({
         'delay', 'apply_async', 'apply',
@@ -133,29 +185,30 @@ class CallableSignature(CallableTask):  # pragma: no cover
         pass
 
     @abstractmethod
-    def freeze(self, id=None, group_id=None, chord=None, root_id=None):
+    def freeze(self, id: str=None, group_id: str=None,
+               chord: str=None, root_id: str=None) -> AbstractResult:
         pass
 
     @abstractmethod
-    def set(self, immutable=None, **options):
+    def set(self, immutable: bool=None, **options) -> 'CallableSignature':
         pass
 
     @abstractmethod
-    def link(self, callback):
+    def link(self, callback: 'CallableSignature') -> 'CallableSignature':
         pass
 
     @abstractmethod
-    def link_error(self, errback):
+    def link_error(self, errback: 'CallableSignature') -> 'CallableSignature':
         pass
 
     @abstractmethod
-    def __or__(self, other):
+    def __or__(self, other: 'CallableSignature') -> 'CallableSignature':
         pass
 
     @abstractmethod
-    def __invert__(self):
+    def __invert__(self) -> Any:
         pass
 
     @classmethod
-    def __subclasshook__(cls, C):
+    def __subclasshook__(cls, C: Any) -> Any:
         return cls._subclasshook_using(CallableSignature, C)

+ 138 - 116
celery/utils/collections.py

@@ -3,15 +3,19 @@
 import time
 
 from collections import (
-    Callable, Mapping, MutableMapping, MutableSet, Sequence,
+    Mapping, MutableMapping, MutableSet, Sequence,
     OrderedDict as _OrderedDict, deque,
 )
 from heapq import heapify, heappush, heappop
 from itertools import chain, count
 from queue import Empty
+from typing import (
+    Any, Callable, Dict, Iterable, Iterator, Tuple, Optional, Union,
+)
 
 from .functional import first, uniq
 from .text import match_case
+from .typing import DictArgument
 
 try:
     # pypy: dicts are ordered in recent versions
@@ -37,14 +41,17 @@ REPR_LIMITED_SET = """\
 <{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\
 """
 
+#: Dictionary key type (key: str) -> str
+KeyCallback = Callable[[str], str]
+
 
-def force_mapping(m):
+def force_mapping(m: Any) -> Mapping:
     if isinstance(m, (LazyObject, LazySettings)):
         m = m._wrapped
     return DictAttribute(m) if not isinstance(m, Mapping) else m
 
 
-def lpmerge(L, R):
+def lpmerge(L: Mapping, R: Mapping) -> Mapping:
     """In place left precedent dictionary merge.
 
     Keeps values from `L`, if the value in `R` is :const:`None`.
@@ -56,7 +63,7 @@ def lpmerge(L, R):
 
 class OrderedDict(_OrderedDict):
 
-    def _LRUkey(self):
+    def _LRUkey(self) -> Any:
         # return value of od.keys does not support __next__,
         # but this version will also not create a copy of the list.
         return next(iter(self.keys()))
@@ -68,7 +75,7 @@ class AttributeDictMixin:
     I.e. `d.key -> d[key]`.
     """
 
-    def __getattr__(self, k):
+    def __getattr__(self, k: str) -> Any:
         """`d.key -> d[key]`"""
         try:
             return self[k]
@@ -77,7 +84,7 @@ class AttributeDictMixin:
                 '{0!r} object has no attribute {1!r}'.format(
                     type(self).__name__, k))
 
-    def __setattr__(self, key, value):
+    def __setattr__(self, key: str, value: Any) -> None:
         """`d[key] = value -> d.key = value`"""
         self[key] = value
 
@@ -93,50 +100,51 @@ class DictAttribute:
     `obj[k] -> obj.k`
     `obj[k] = val -> obj.k = val`
     """
-    obj = None
 
-    def __init__(self, obj):
+    obj = None  # type: Mapping[Any, Any]
+
+    def __init__(self, obj: Mapping) -> None:
         object.__setattr__(self, 'obj', obj)
 
-    def __getattr__(self, key):
+    def __getattr__(self, key: str) -> Any:
         return getattr(self.obj, key)
 
-    def __setattr__(self, key, value):
-        return setattr(self.obj, key, value)
+    def __setattr__(self, key: str, value: Any) -> None:
+        setattr(self.obj, key, value)
 
-    def get(self, key, default=None):
+    def get(self, key: str, default: Any=None) -> Any:
         try:
             return self[key]
         except KeyError:
             return default
 
-    def setdefault(self, key, default):
+    def setdefault(self, key: str, default: Any) -> None:
         if key not in self:
             self[key] = default
 
-    def __getitem__(self, key):
+    def __getitem__(self, key: str) -> Any:
         try:
             return getattr(self.obj, key)
         except AttributeError:
             raise KeyError(key)
 
-    def __setitem__(self, key, value):
+    def __setitem__(self, key: str, value: Any) -> None:
         setattr(self.obj, key, value)
 
-    def __contains__(self, key):
+    def __contains__(self, key: str) -> bool:
         return hasattr(self.obj, key)
 
-    def keys(self):
+    def keys(self) -> Iterator[str]:
         return iter(dir(self.obj))
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[str]:
         return self.keys()
 
-    def items(self):
+    def items(self) -> Iterator[Tuple[str, Any]]:
         for key in self.keys():
             yield key, getattr(self.obj, key)
 
-    def values(self):
+    def values(self) -> Iterator[Any]:
         for key in self.keys():
             yield getattr(self.obj, key)
 MutableMapping.register(DictAttribute)
@@ -144,39 +152,40 @@ MutableMapping.register(DictAttribute)
 
 class ChainMap(MutableMapping):
 
-    key_t = None
-    changes = None
-    defaults = None
-    maps = None
+    key_t = None      # type: Optional[KeyCallback]
+    changes = None    # type: Mapping
+    defaults = None   # type: Sequence[Mapping]
+    maps = None       # type: Sequence[Mapping]
 
-    def __init__(self, *maps, **kwargs):
+    def __init__(self, *maps: Sequence[Mapping],
+                 key_t: Optional[KeyCallback], **kwargs) -> None:
         maps = list(maps or [{}])
         self.__dict__.update(
-            key_t=kwargs.get('key_t'),
+            key_t=key_t,
             maps=maps,
             changes=maps[0],
             defaults=maps[1:],
         )
 
-    def add_defaults(self, d):
+    def add_defaults(self, d: Mapping) -> None:
         d = force_mapping(d)
         self.defaults.insert(0, d)
         self.maps.insert(1, d)
 
-    def pop(self, key, *default):
+    def pop(self, key: str, *default: Tuple[Any]) -> Any:
         try:
             return self.maps[0].pop(key, *default)
         except KeyError:
             raise KeyError(
                 'Key not found in the first mapping: {!r}'.format(key))
 
-    def __missing__(self, key):
+    def __missing__(self, key: str) -> Any:
         raise KeyError(key)
 
-    def _key(self, key):
+    def _key(self, key: str) -> str:
         return self.key_t(key) if self.key_t is not None else key
 
-    def __getitem__(self, key):
+    def __getitem__(self, key: str) -> Any:
         _key = self._key(key)
         for mapping in self.maps:
             try:
@@ -185,71 +194,72 @@ class ChainMap(MutableMapping):
                 pass
         return self.__missing__(key)
 
-    def __setitem__(self, key, value):
+    def __setitem__(self, key: str, value: Any) -> None:
         self.changes[self._key(key)] = value
 
-    def __delitem__(self, key):
+    def __delitem__(self, key: str) -> None:
         try:
             del self.changes[self._key(key)]
         except KeyError:
             raise KeyError('Key not found in first mapping: {0!r}'.format(key))
 
-    def clear(self):
+    def clear(self) -> None:
         self.changes.clear()
 
-    def get(self, key, default=None):
+    def get(self, key: str, default: Optional[Any]=None) -> Any:
         try:
             return self[self._key(key)]
         except KeyError:
             return default
 
-    def __len__(self):
+    def __len__(self) -> int:
         return len(set().union(*self.maps))
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[str]:
         return self.keys()
 
-    def __contains__(self, key):
+    def __contains__(self, key: str) -> bool:
         key = self._key(key)
         return any(key in m for m in self.maps)
 
-    def __bool__(self):
+    def __bool__(self) -> bool:
         return any(self.maps)
 
-    def setdefault(self, key, default):
+    def setdefault(self, key: str, default: Any) -> None:
         key = self._key(key)
         if key not in self:
             self[key] = default
 
-    def update(self, *args, **kwargs):
-        return self.changes.update(*args, **kwargs)
+    def update(self, *args: Tuple[Mapping], **kwargs: Dict[Any, Any]) -> None:
+        self.changes.update(*args, **kwargs)
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return '{0.__class__.__name__}({1})'.format(
             self, ', '.join(map(repr, self.maps)))
 
     @classmethod
-    def fromkeys(cls, iterable, *args):
+    def fromkeys(cls, iterable: Union[Sequence[str], Iterator[str]],
+                 *args: Tuple[Any]) -> 'ChainMap':
         """Create a ChainMap with a single dict created from the iterable."""
         return cls(dict.fromkeys(iterable, *args))
 
-    def copy(self):
+    def copy(self) -> 'ChainMap':
         """New ChainMap or subclass with a new copy of maps[0] and
         refs to maps[1:]."""
         return self.__class__(self.maps[0].copy(), *self.maps[1:])
 
-    def _iter(self, op):
+    def _iter(self, op: Callable[[Mapping], Iterator]) -> Iterator:
         # defaults must be first in the stream, so values in
         # changes take precedence.
         return chain(*[op(d) for d in reversed(self.maps)])
 
-    def keys(self):
+    def keys(self) -> Iterator[str]:
         return uniq(self._iter(lambda d: d.keys()))
 
-    def items(self):
+    def items(self) -> Iterator[Tuple[str, Any]]:
         return ((key, self[key]) for key in self)
 
-    def values(self):
+    def values(self) -> Iterator[Any]:
         return (self[key] for key in self)
 
 
@@ -267,21 +277,24 @@ class ConfigurationView(ChainMap, AttributeDictMixin):
             the default configuration.
     """
 
-    def __init__(self, changes, defaults=None, key_t=None, prefix=None):
+    def __init__(self, changes: Optional[Mapping],
+                 defaults: Sequence[Mapping] = None,
+                 key_t: KeyCallback = None,
+                 prefix: str = None) -> None:
         defaults = [] if defaults is None else defaults
         super().__init__(changes, *defaults, **{'key_t': key_t})
         self.__dict__.update(
             prefix=prefix.rstrip('_') + '_' if prefix else prefix,
         )
 
-    def _to_keys(self, key):
+    def _to_keys(self, key: str) -> Tuple[str]:
         prefix = self.prefix
         if prefix:
             pkey = prefix + key if not key.startswith(prefix) else key
             return match_case(pkey, prefix), key
         return key,
 
-    def __getitem__(self, key):
+    def __getitem__(self, key: str) -> Any:
         keys = self._to_keys(key)
         getitem = super().__getitem__
         for k in keys:
@@ -298,27 +311,27 @@ class ConfigurationView(ChainMap, AttributeDictMixin):
                     'Key not found: {0!r} (with prefix: {0!r})'.format(*keys))
             raise
 
-    def __setitem__(self, key, value):
+    def __setitem__(self, key: str, value: Any) -> None:
         self.changes[self._key(key)] = value
 
-    def first(self, *keys):
+    def first(self, *keys: Tuple[str]) -> Any:
         return first(None, (self.get(key) for key in keys))
 
-    def get(self, key, default=None):
+    def get(self, key: str, default: Optional[Any]=None) -> Any:
         try:
             return self[key]
         except KeyError:
             return default
 
-    def clear(self):
+    def clear(self) -> None:
         """Remove all changes, but keep defaults."""
         self.changes.clear()
 
-    def __contains__(self, key):
+    def __contains__(self, key: str) -> bool:
         keys = self._to_keys(key)
         return any(any(k in m for k in keys) for m in self.maps)
 
-    def swap_with(self, other):
+    def swap_with(self, other: 'ConfigurationView') -> None:
         changes = other.__dict__['changes']
         defaults = other.__dict__['defaults']
         self.__dict__.update(
@@ -386,16 +399,20 @@ class LimitedSet:
 
     max_heap_percent_overload = 15
 
-    def __init__(self, maxlen=0, expires=0, data=None, minlen=0):
+    def __init__(self,
+                 maxlen: Optional[int]=0,
+                 minlen: Optional[int]=0,
+                 expires: Optional[int]=0,
+                 data: Optional[DictArgument]=None) -> None:
         self.maxlen = 0 if maxlen is None else maxlen
         self.minlen = 0 if minlen is None else minlen
         self.expires = 0 if expires is None else expires
+
+        # type: Mapping[str, Any]
         self._data = {}
-        self._heap = []
 
-        # make shortcuts
-        self.__len__ = self._data.__len__
-        self.__contains__ = self._data.__contains__
+        # type: Sequence[Tuple[float, Any]]
+        self._heap = []
 
         if data:
             # import items from data
@@ -407,21 +424,21 @@ class LimitedSet:
         if self.expires < 0:
             raise ValueError('expires cannot be negative!')
 
-    def _refresh_heap(self):
+    def _refresh_heap(self) -> None:
         """Time consuming recreating of heap. Do not run this too often."""
         self._heap[:] = [entry for entry in self._data.values()]
         heapify(self._heap)
 
-    def _maybe_refresh_heap(self):
+    def _maybe_refresh_heap(self) -> None:
         if self._heap_overload >= self.max_heap_percent_overload:
             self._refresh_heap()
 
-    def clear(self):
+    def clear(self) -> None:
         """Clear all data, start from scratch again."""
         self._data.clear()
         self._heap[:] = []
 
-    def add(self, item, now=None):
+    def add(self, item: Any, now: Optional[float]=None) -> None:
         """Add a new item, or reset the expiry time of an existing item."""
         now = now or time.time()
         if item in self._data:
@@ -432,7 +449,7 @@ class LimitedSet:
         if self.maxlen and len(self._data) >= self.maxlen:
             self.purge()
 
-    def update(self, other):
+    def update(self, other: Optional[DictArgument]) -> None:
         """Update this set from other LimitedSet, dict or iterable."""
         if not other:
             return
@@ -458,13 +475,13 @@ class LimitedSet:
             for obj in other:
                 self.add(obj)
 
-    def discard(self, item):
+    def discard(self, item: Any) -> None:
         # mark an existing item as removed. If KeyError is not found, pass.
         self._data.pop(item, None)
         self._maybe_refresh_heap()
     pop_value = discard
 
-    def purge(self, now=None):
+    def purge(self, now: Optional[float]=None) -> None:
         """Check oldest items and remove them if needed.
 
         Arguments:
@@ -484,7 +501,7 @@ class LimitedSet:
                     break  # oldest item has not expired yet
                 self.pop()
 
-    def pop(self, default=None):
+    def pop(self, default: Optional[Any]=None) -> None:
         """Remove and return the oldest item, or :const:`None` when empty."""
         while self._heap:
             _, item = heappop(self._heap)
@@ -496,7 +513,7 @@ class LimitedSet:
                 return item
         return default
 
-    def as_dict(self):
+    def as_dict(self) -> Dict:
         """Whole set as serializable dictionary.
 
         Example:
@@ -511,35 +528,35 @@ class LimitedSet:
         """
         return {key: inserted for inserted, key in self._data.values()}
 
-    def __eq__(self, other):
+    def __eq__(self, other: 'LimitedSet') -> bool:
         return self._data == other._data
 
-    def __ne__(self, other):
+    def __ne__(self, other: 'LimitedSet') -> bool:
         return not self.__eq__(other)
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return REPR_LIMITED_SET.format(
             self, name=type(self).__name__, size=len(self),
         )
 
-    def __iter__(self):
+    def __iter__(self) -> Iterable[Any]:
         return (i for _, i in sorted(self._data.values()))
 
-    def __len__(self):
+    def __len__(self) -> int:
         return len(self._data)
 
-    def __contains__(self, key):
+    def __contains__(self, key: str) -> bool:
         return key in self._data
 
-    def __reduce__(self):
+    def __reduce__(self) -> Tuple:
         return self.__class__, (
             self.maxlen, self.expires, self.as_dict(), self.minlen)
 
-    def __bool__(self):
+    def __bool__(self) -> bool:
         return bool(self._data)
 
     @property
-    def _heap_overload(self):
+    def _heap_overload(self) -> float:
         """Compute how much is heap bigger than data [percents]."""
         return len(self._heap) * 100 / max(len(self._data), 1) - 100
 MutableSet.register(LimitedSet)
@@ -547,19 +564,19 @@ MutableSet.register(LimitedSet)
 
 class Evictable:
 
-    Empty = Empty
+    Empty = Empty  # type: Exception
 
-    def evict(self):
+    def evict(self) -> None:
         """Force evict until maxsize is enforced."""
         self._evict(range=count)
 
-    def _evict(self, limit=100, range=range):
+    def _evict(self, limit: int=100, range: Callable=range) -> None:
         try:
             [self._evict1() for _ in range(limit)]
         except IndexError:
             pass
 
-    def _evict1(self):
+    def _evict1(self) -> None:
         if self._evictcount <= self.maxsize:
             raise IndexError()
         try:
@@ -570,25 +587,26 @@ class Evictable:
 
 class Messagebuffer(Evictable):
 
-    Empty = Empty
-
-    def __init__(self, maxsize, iterable=None, deque=deque):
+    def __init__(self,
+                 maxsize: Optional[int],
+                 iterable: Optional[Iterable]=None, deque: Any=deque) -> None:
         self.maxsize = maxsize
-        self.data = deque(iterable or [])
-        self._append = self.data.append
-        self._pop = self.data.popleft
-        self._len = self.data.__len__
-        self._extend = self.data.extend
+        self.data = deque(iterable or [])  # type: deque
 
-    def put(self, item):
+        self._append = self.data.append    # type: Callable[[Any], None]
+        self._pop = self.data.popleft      # type: Callable[[], Any]
+        self._len = self.data.__len__      # type: Callable[[], int]
+        self._extend = self.data.extend    # type: Callable[[Iterable], None]
+
+    def put(self, item: Any) -> None:
         self._append(item)
         self.maxsize and self._evict()
 
-    def extend(self, it):
+    def extend(self, it: Iterable) -> None:
         self._extend(it)
         self.maxsize and self._evict()
 
-    def take(self, *default):
+    def take(self, *default: Tuple[Any]) -> Any:
         try:
             return self._pop()
         except IndexError:
@@ -596,35 +614,35 @@ class Messagebuffer(Evictable):
                 return default[0]
             raise self.Empty()
 
-    def _pop_to_evict(self):
+    def _pop_to_evict(self) -> Any:
         return self.take()
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return '<{0}: {1}/{2}>'.format(
             type(self).__name__, len(self), self.maxsize,
         )
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator:
         while 1:
             try:
                 yield self._pop()
             except IndexError:
                 break
 
-    def __len__(self):
+    def __len__(self) -> int:
         return self._len()
 
-    def __contains__(self, item):
+    def __contains__(self, item: Any) -> bool:
         return item in self.data
 
-    def __reversed__(self):
+    def __reversed__(self) -> Iterable:
         return reversed(self.data)
 
-    def __getitem__(self, index):
+    def __getitem__(self, index: int) -> Any:
         return self.data[index]
 
     @property
-    def _evictcount(self):
+    def _evictcount(self) -> int:
         return len(self)
 Sequence.register(Messagebuffer)
 
@@ -632,32 +650,36 @@ Sequence.register(Messagebuffer)
 class BufferMap(OrderedDict, Evictable):
 
     Buffer = Messagebuffer
-    Empty = Empty
 
     maxsize = None
     total = 0
     bufmaxsize = None
 
-    def __init__(self, maxsize, iterable=None, bufmaxsize=1000):
+    def __init__(self,
+                 maxsize: Optional[int],
+                 iterable: Optional[Iterable]=None,
+                 bufmaxsize: Optional[int]=1000) -> None:
         super().__init__()
         self.maxsize = maxsize
-        self.bufmaxsize = 1000
+        self.bufmaxsize = bufmaxsize
         if iterable:
             self.update(iterable)
+
+        # type: int
         self.total = sum(len(buf) for buf in self.items())
 
-    def put(self, key, item):
+    def put(self, key: Any, item: Any) -> None:
         self._get_or_create_buffer(key).put(item)
         self.total += 1
         self.move_to_end(key)   # least recently used.
         self.maxsize and self._evict()
 
-    def extend(self, key, it):
+    def extend(self, key: Any, it: Iterable) -> None:
         self._get_or_create_buffer(key).extend(it)
         self.total += len(it)
         self.maxsize and self._evict()
 
-    def take(self, key, *default):
+    def take(self, key: Any, *default: Tuple[Any]) -> Any:
         item, throw = None, False
         try:
             buf = self[key]
@@ -678,20 +700,20 @@ class BufferMap(OrderedDict, Evictable):
             raise self.Empty()
         return item
 
-    def _get_or_create_buffer(self, key):
+    def _get_or_create_buffer(self, key: Any) -> Messagebuffer:
         try:
             return self[key]
         except KeyError:
             buf = self[key] = self._new_buffer()
             return buf
 
-    def _new_buffer(self):
+    def _new_buffer(self) -> Messagebuffer:
         return self.Buffer(maxsize=self.bufmaxsize)
 
-    def _LRUpop(self, *default):
+    def _LRUpop(self, *default: Tuple[Any]) -> Any:
         return self[self._LRUkey()].take(*default)
 
-    def _pop_to_evict(self):
+    def _pop_to_evict(self) -> None:
         for i in range(100):
             key = self._LRUkey()
             buf = self[key]
@@ -711,11 +733,11 @@ class BufferMap(OrderedDict, Evictable):
                     self.move_to_end(key)
                 break
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return '<{0}: {1}/{2}>'.format(
             type(self).__name__, self.total, self.maxsize,
         )
 
     @property
-    def _evictcount(self):
+    def _evictcount(self) -> int:
         return self.total

+ 7 - 7
celery/utils/term.py

@@ -4,7 +4,7 @@ import platform
 
 from functools import reduce
 from typing import Any, Tuple
-from typing import Mapping  # noqa
+from typing import Callable, Mapping  # noqa
 
 __all__ = ['colored']
 
@@ -16,7 +16,7 @@ COLOR_SEQ = '\033[1;%dm'
 IS_WINDOWS = platform.system() == 'Windows'
 
 
-def fg(s: str) -> str:
+def fg(s: int) -> str:
     return COLOR_SEQ % s
 
 
@@ -31,13 +31,13 @@ class colored:
         ...       c.green('dog ')))
     """
 
-    def __init__(self, *s: Tuple[str],
+    def __init__(self, *s: Tuple[Any],
                  enabled: bool=True, op: str='', **kwargs) -> None:
         self.s = s
         self.enabled = not IS_WINDOWS and enabled
         self.op = op
 
-        # type: Mapping[str, str]
+        # type: Mapping[str, Callable]
         self.names = {
             'black': self.black,
             'red': self.red,
@@ -132,13 +132,13 @@ class colored:
     def imagenta(self, *s: Tuple[Any]) -> Any:
         return self.node(s, fg(40 + MAGENTA))
 
-    def icyan(self, *s: Tuple[Any]) -> any:
+    def icyan(self, *s: Tuple[Any]) -> Any:
         return self.node(s, fg(40 + CYAN))
 
-    def iwhite(self, *s: Tuple[Any]) -> any:
+    def iwhite(self, *s: Tuple[Any]) -> Any:
         return self.node(s, fg(40 + WHITE))
 
-    def reset(self, *s: Tuple[Any]) -> any:
+    def reset(self, *s: Tuple[Any]) -> Any:
         return self.node(s or [''], RESET_SEQ)
 
     def __add__(self, other: Any) -> str: