Browse Source

Cosmetics

Ask Solem 8 years ago
parent
commit
953a866642
6 changed files with 71 additions and 58 deletions
  1. 30 21
      celery/bootsteps.py
  2. 6 4
      celery/utils/term.py
  3. 18 16
      celery/utils/text.py
  4. 3 3
      celery/utils/threads.py
  5. 7 7
      celery/utils/time.py
  6. 7 7
      celery/utils/timer2.py

+ 30 - 21
celery/bootsteps.py

@@ -60,20 +60,24 @@ class Step(metaclass=StepType):
     #: Optional step name, will use ``qualname`` if not specified.
     name = None
 
-    #: Optional short name used for graph outputs and in logs.
+    #: Optional short name used for representation in logs and graphs.
     label = None
 
     #: Set this to true if the step is enabled based on some condition.
+    #: Note: This is only used for illustration purposes in graphs
+    #:       to differentiate optional steps - setting this won't affect
+    #:       your program.
     conditional = False
 
     #: List of other steps that that must be started before this step.
-    #: Note that all dependencies must be in the same blueprint.
+    #: Note: All dependencies that you list here must originate in the
+    #        same blueprint.
     requires = ()
 
-    #: This flag is reserved for the workers Consumer,
-    #: since it is required to always be started last.
-    #: There can only be one object marked last
-    #: in every blueprint.
+    #: Set to True if this step should always be started last.
+    #: Note: You cannot mark multiple steps in the same blueprint as .last,
+    #:       and it's not enforced so should you mark multiple
+    #:       steps as last then who wins is undefined behavior.
     last = False
 
     #: This provides the default for :meth:`include_if`.
@@ -231,8 +235,8 @@ class Blueprint:
 
     GraphFormatter = StepFormatter
 
-    name = None                        # type: Optional[str]
-    state = None                       # type: Optional[int]
+    name = None                        # type: str
+    state = None                       # type: int
     started = 0                        # type: int
     default_steps = set()              # type: Set[Union[str, Step]]
     state_to_name = {                  # type: Mapping[int, str]
@@ -242,18 +246,18 @@ class Blueprint:
         TERMINATE: 'terminating',
     }
 
-    def __init__(self, steps: Optional[Sequence]=None,
-                 name: Optional[str]=None,
-                 on_start: Optional[Callable[[], None]]=None,
-                 on_close: Optional[Callable[[], None]]=None,
-                 on_stopped: Optional[Callable[[], None]]=None) -> None:
+    def __init__(self, steps: Sequence = None,
+                 name: str = None,
+                 on_start: Callable[[], None] = None,
+                 on_close: Callable[[], None] = None,
+                 on_stopped: Callable[[], None] = None) -> None:
         self.name = name or self.name or qualname(type(self))
         self.types = set(steps or []) | set(self.default_steps)
         self.on_start = on_start
         self.on_close = on_close
         self.on_stopped = on_stopped
         self.shutdown_complete = Event()
-        self.steps = {}  # type: Mapping[str, Step]
+        self.steps = {} : Mapping[str, Step]
 
     def start(self, parent: Any) -> None:
         self.state = RUN
@@ -279,14 +283,18 @@ class Blueprint:
             self.on_close()
         self.send_all(parent, 'close', 'closing', reverse=False)
 
-    def restart(self, parent: Any, method: str='stop',
-                description: str='restarting', propagate: bool=False) -> None:
+    def restart(self,
+                parent: Any,
+                method: str = 'stop',
+                description: str = 'restarting',
+                propagate: bool = False) -> None:
         self.send_all(parent, method, description, propagate=propagate)
 
     def send_all(self, parent: Any, method: str,
-                 description: Optional[str]=None,
-                 reverse: bool=True, propagate: bool=True,
-                 args: Sequence=()) -> None:
+                 description: str = None,
+                 reverse: bool = True,
+                 propagate: bool = True,
+                 args: Sequence = ()) -> None:
         description = description or method.replace('_', ' ')
         steps = reversed(parent.steps) if reverse else parent.steps
         for step in steps:
@@ -304,7 +312,8 @@ class Blueprint:
                             'Error on %s %s: %r', description, step.alias, exc)
 
     def stop(self, parent: Any,
-             close: bool=True, terminate: bool=False) -> None:
+             close: bool = True,
+             terminate: bool = False) -> None:
         what = 'terminating' if terminate else 'stopping'
         if self.state in (CLOSE, TERMINATE):
             return
@@ -327,7 +336,7 @@ class Blueprint:
         self.state = TERMINATE
         self.shutdown_complete.set()
 
-    def join(self, timeout: Timeout=None) -> None:
+    def join(self, timeout: Timeout = None) -> None:
         try:
             # Will only get here if running green,
             # makes sure all greenthreads have exited.

+ 6 - 4
celery/utils/term.py

@@ -46,7 +46,7 @@ class colored:
     """
 
     def __init__(self, *s: Tuple[Any],
-                 enabled: bool=True, op: str='', **kwargs) -> None:
+                 enabled: bool=True, op: str = '', **kwargs) -> None:
         self.s = s
         self.enabled = not IS_WINDOWS and enabled
         self.op = op
@@ -166,16 +166,18 @@ class colored:
         return str(''.join((self.embed(), str(suffix))))
 
 
-def supports_images():
+def supports_images() -> bool:
     return isatty(sys.stdin) and ITERM_PROFILE
 
 
-def _read_as_base64(path):
+def _read_as_base64(path: str) -> bytes:
     with codecs.open(path, mode='rb') as fh:
         return base64.b64encode(fh.read())
 
 
-def imgcat(path, inline=1, preserve_aspect_ratio=0, **kwargs):
+def imgcat(path: str,
+           inline: int = 1,
+           preserve_aspect_ratio: int = 0, **kwargs):
     return '\n%s1337;File=inline=%d;preserveAspectRatio=%d:%s%s' % (
         _IMG_PRE, inline, preserve_aspect_ratio,
         _read_as_base64(path), _IMG_POST)

+ 18 - 16
celery/utils/text.py

@@ -30,33 +30,33 @@ def str_to_list(s: Union[str, Sequence[str]]) -> Sequence[str]:
     return s
 
 
-def dedent_initial(s: str, n: int=4) -> str:
+def dedent_initial(s: str, n: int = 4) -> str:
     """Remove identation from first line of text."""
     return s[n:] if s[:n] == ' ' * n else s
 
 
-def dedent(s: str, n: int=4, sep: str='\n') -> str:
+def dedent(s: str, n: int=4, sep: str = '\n') -> str:
     """Remove identation."""
     return sep.join(dedent_initial(l) for l in s.splitlines())
 
 
-def fill_paragraphs(s: str, width: int, sep: str='\n') -> str:
+def fill_paragraphs(s: str, width: int, sep: str = '\n') -> str:
     """Fill paragraphs with newlines (or custom separator)."""
     return sep.join(fill(p, width) for p in s.split(sep))
 
 
-def join(l: Sequence[str], sep: str='\n') -> str:
+def join(l: Sequence[str], sep: str = '\n') -> str:
     """Concatenate list of strings."""
     return sep.join(v for v in l if v)
 
 
-def ensure_sep(sep: str, s: str, n: int=2) -> str:
+def ensure_sep(sep: str, s: str, n: int = 2) -> str:
     """Ensure text s ends in separator sep'."""
     return s + sep * (n - s.count(sep))
 ensure_newlines = partial(ensure_sep, '\n')
 
 
-def abbr(S: str, max: int, ellipsis: str='...') -> str:
+def abbr(S: str, max: int, ellipsis: str = '...') -> str:
     """Abbreviate word."""
     if S is None:
         return '???'
@@ -76,12 +76,12 @@ def abbrtask(S: str, max: int) -> str:
     return S
 
 
-def indent(t: str, indent: int=0, sep: str='\n') -> str:
+def indent(t: str, indent: int = 0, sep: str = '\n') -> str:
     """Indent text."""
     return sep.join(' ' * indent + p for p in t.split(sep))
 
 
-def truncate(s: str, maxlen: int=128, suffix: str='...') -> str:
+def truncate(s: str, maxlen: int = 128, suffix: str = '...') -> str:
     """Truncate text to a maximum number of characters."""
     if maxlen and len(s) >= maxlen:
         return s[:maxlen].rsplit(' ', 1)[0] + suffix
@@ -89,13 +89,14 @@ def truncate(s: str, maxlen: int=128, suffix: str='...') -> str:
 
 
 def truncate_bytes(s: ByteString,
-                   maxlen: int=128, suffix: ByteString=b'...') -> ByteString:
+                   maxlen: int = 128,
+                   suffix: ByteString = b'...') -> ByteString:
     if maxlen and len(s) >= maxlen:
         return s[:maxlen].rsplit(b' ', 1)[0] + suffix
     return s
 
 
-def pluralize(n: int, text: str, suffix: str='s') -> str:
+def pluralize(n: int, text: str, suffix: str = 's') -> str:
     """Pluralize term when n is greater than one."""
     if n != 1:
         return text + suffix
@@ -103,7 +104,9 @@ def pluralize(n: int, text: str, suffix: str='s') -> str:
 
 
 def pretty(value: Any,
-           width: int=80, nl_width: int=80, sep: str='\n', **kw) -> str:
+           width: int = 80,
+           nl_width: int = 80,
+           sep: str = '\n', **kw) -> str:
     """Format value for printing to console."""
     if isinstance(value, dict):
         return '{{{0} {1}'.format(sep, pformat(value, 4, nl_width)[1:])
@@ -120,7 +123,8 @@ def match_case(s: str, other: str) -> str:
 
 
 def simple_format(s: str, keys: Mapping[str, Any],
-                  pattern: Pattern=RE_FORMAT, expand: Pattern=r'\1') -> str:
+                  pattern: Pattern = RE_FORMAT,
+                  expand: Pattern = r'\1') -> str:
     """Format string, expanding abbreviations in keys'."""
     if s:
         keys.setdefault('%', '%')
@@ -139,8 +143,7 @@ def simple_format(s: str, keys: Mapping[str, Any],
     return s
 
 
-def remove_repeating_from_task(task_name, s):
-    # type: (str, str) -> str
+def remove_repeating_from_task(task_name: str, s: str) -> str:
     """Given task name, remove repeating module names.
 
     Example:
@@ -155,8 +158,7 @@ def remove_repeating_from_task(task_name, s):
     return remove_repeating(module, s)
 
 
-def remove_repeating(substr, s):
-    # type: (str, str) -> str
+def remove_repeating(substr: str, s: str) -> str:
     """Remove repeating module names from string.
 
     Arguments:

+ 3 - 3
celery/utils/threads.py

@@ -47,7 +47,7 @@ def default_socket_timeout(timeout: Timeout) -> Iterator:
 class bgThread(threading.Thread):
     """Background service thread."""
 
-    def __init__(self, name: Optional[str]=None, **kwargs) -> None:
+    def __init__(self, name: str = None, **kwargs) -> None:
         super().__init__()
         self._is_shutdown = threading.Event()
         self._is_stopped = threading.Event()
@@ -267,8 +267,8 @@ class LocalManager:
     function for the wrapped locals.
     """
 
-    def __init__(self, locals: Optional[List]=None,
-                 ident_func: Optional[Callable]=None) -> None:
+    def __init__(self, locals: List = None,
+                 ident_func: Callable = None) -> None:
         if locals is None:
             self.locals = []
         elif isinstance(locals, Local):

+ 7 - 7
celery/utils/time.py

@@ -6,7 +6,7 @@ import time as _time
 
 from calendar import monthrange
 from datetime import date, datetime, timedelta, tzinfo
-from typing import Any, Callable, Dict, Optional, Union
+from typing import Any, Callable, Dict, Union
 
 from kombu.utils.functional import reprcall
 from kombu.utils.objects import cached_property
@@ -113,8 +113,8 @@ class _Zone:
         return self.get_timezone(tzinfo)
 
     def to_local(self, dt: datetime,
-                 local: Optional[tzinfo]=None,
-                 orig: Optional[tzinfo]=None) -> datetime:
+                 local: tzinfo = None,
+                 orig: tzinfo = None) -> datetime:
         if is_naive(dt):
             dt = make_aware(dt, orig or self.utc)
         return localize(dt, self.tz_or_local(local))
@@ -145,7 +145,7 @@ timezone = _Zone()
 
 
 def maybe_timedelta(
-        delta: Optional[Union[numbers.Real, timedelta]]) -> timedelta:
+        delta: Union[numbers.Real, timedelta]) -> timedelta:
     """Convert integer to timedelta, if argument is an integer."""
     if isinstance(delta, numbers.Real):
         return timedelta(seconds=delta)
@@ -176,7 +176,7 @@ def delta_resolution(dt: datetime, delta: timedelta) -> datetime:
 
 
 def remaining(start: datetime, ends_in: timedelta,
-              now: Optional[Callable[[], datetime]]=None,
+              now: Callable[[], datetime]=None,
               relative: bool=False) -> timedelta:
     """Calculate the remaining time for a start date and a timedelta.
 
@@ -255,7 +255,7 @@ def humanize_seconds(secs: numbers.Number,
     return now
 
 
-def maybe_iso8601(dt: Optional[Union[str, datetime]]) -> Optional[datetime]:
+def maybe_iso8601(dt: Union[str, datetime]) -> datetime:
     """Either ``datetime | str -> datetime`` or ``None -> None``."""
     if not dt:
         return
@@ -306,7 +306,7 @@ def to_utc(dt: datetime) -> datetime:
     return make_aware(dt, timezone.utc)
 
 
-def maybe_make_aware(dt: datetime, tz: Optional[tzinfo]=None) -> datetime:
+def maybe_make_aware(dt: datetime, tz: tzinfo=None) -> datetime:
     """Convert dt to aware datetime, do nothing if dt is already aware."""
     if is_naive(dt):
         dt = to_utc(dt)

+ 7 - 7
celery/utils/timer2.py

@@ -43,11 +43,11 @@ class Timer(threading.Thread):
             traceback.print_stack()
             super().start(*args, **kwargs)
 
-    def __init__(self, schedule: Optional[Schedule]=None,
-                 on_error: Optional[Callable]=None,
-                 on_tick: Optional[Callable]=None,
-                 on_start: Optional[Callable]=None,
-                 max_interval: Optional[Number]=None, **kwargs) -> None:
+    def __init__(self, schedule: Schedule = None,
+                 on_error: Callable = None,
+                 on_tick: Callable = None,
+                 on_start: Callable = None,
+                 max_interval: Number = None, **kwargs) -> None:
         self.schedule = schedule or self.Schedule(on_error=on_error,
                                                   max_interval=max_interval)
         self.on_start = on_start
@@ -114,7 +114,7 @@ class Timer(threading.Thread):
             return entry
 
     def enter(self, entry: Entry, eta: float,
-              priority: Optional[int]=None) -> Entry:
+              priority: int = None) -> Entry:
         return self._do_enter('enter_at', entry, eta, priority=priority)
 
     def call_at(self, *args, **kwargs) -> Entry:
@@ -130,7 +130,7 @@ class Timer(threading.Thread):
         return self._do_enter('call_repeatedly', *args, **kwargs)
 
     def exit_after(self, secs: Union[int, float],
-                   priority: Optional[int]=10) -> None:
+                   priority: int = 10) -> None:
         self.call_after(secs, sys.exit, priority)
 
     def cancel(self, tref: Entry) -> None: