|
@@ -48,16 +48,24 @@ WORKER_DIRECT_EXCHANGE = Exchange('C.dq')
|
|
|
#: Format for worker direct queue names.
|
|
|
WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq'
|
|
|
|
|
|
+#: Separator for worker node name and hostname.
|
|
|
NODENAME_SEP = '@'
|
|
|
|
|
|
|
|
|
def worker_direct(hostname):
|
|
|
+ """Returns :class:`kombu.Queue` that is a direct route to
|
|
|
+ a worker by hostname.
|
|
|
+
|
|
|
+ :param hostname: The fully qualified node name of a worker
|
|
|
+ (e.g. ``w1@example.com``). If passed a
|
|
|
+ :class:`kombu.Queue` instance it will simply return
|
|
|
+ that instead.
|
|
|
+ """
|
|
|
if isinstance(hostname, Queue):
|
|
|
return hostname
|
|
|
return Queue(WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname),
|
|
|
WORKER_DIRECT_EXCHANGE,
|
|
|
- hostname,
|
|
|
- auto_delete=True)
|
|
|
+ hostname, auto_delete=True)
|
|
|
|
|
|
|
|
|
def warn_deprecated(description=None, deprecation=None,
|
|
@@ -74,7 +82,18 @@ def warn_deprecated(description=None, deprecation=None,
|
|
|
|
|
|
def deprecated(description=None, deprecation=None,
|
|
|
removal=None, alternative=None):
|
|
|
+ """Decorator for deprecated functions.
|
|
|
+
|
|
|
+ A deprecation warning will be emitted when the function is called.
|
|
|
|
|
|
+ :keyword description: Description of what is being deprecated.
|
|
|
+ :keyword deprecation: Version that marks first deprecation, if this
|
|
|
+ argument is not set a ``PendingDeprecationWarning`` will be emitted
|
|
|
+ instead.
|
|
|
+ :keyword removed: Future version when this feature will be removed.
|
|
|
+ :keyword alternative: Instructions for an alternative solution (if any).
|
|
|
+
|
|
|
+ """
|
|
|
def _inner(fun):
|
|
|
|
|
|
@wraps(fun)
|
|
@@ -135,12 +154,13 @@ def fun_takes_kwargs(fun, kwlist=[]):
|
|
|
|
|
|
|
|
|
def isatty(fh):
|
|
|
- # Fixes bug with mod_wsgi:
|
|
|
- # mod_wsgi.Log object has no attribute isatty.
|
|
|
- return getattr(fh, 'isatty', None) and fh.isatty()
|
|
|
+ try:
|
|
|
+ return fh.isatty()
|
|
|
+ except AttributeError:
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
-def cry(): # pragma: no cover
|
|
|
+def cry(out=None, sepchr='=', seplen=49): # pragma: no cover
|
|
|
"""Return stacktrace of all active threads.
|
|
|
|
|
|
From https://gist.github.com/737056
|
|
@@ -148,21 +168,16 @@ def cry(): # pragma: no cover
|
|
|
"""
|
|
|
import threading
|
|
|
|
|
|
- tmap = {}
|
|
|
- main_thread = None
|
|
|
+ out = StringIO() if out is None else out
|
|
|
+ P = partial(print, file=out)
|
|
|
+
|
|
|
# get a map of threads by their ID so we can print their names
|
|
|
# during the traceback dump
|
|
|
- for t in threading.enumerate():
|
|
|
- if getattr(t, 'ident', None):
|
|
|
- tmap[t.ident] = t
|
|
|
- else:
|
|
|
- main_thread = t
|
|
|
+ tmap = dict((t.ident, t) for t in threading.enumerate())
|
|
|
|
|
|
- out = StringIO()
|
|
|
- P = partial(print, file=out)
|
|
|
- sep = '=' * 49
|
|
|
+ sep = sepchr * seplen
|
|
|
for tid, frame in items(sys._current_frames()):
|
|
|
- thread = tmap.get(tid, main_thread)
|
|
|
+ thread = tmap.get(tid)
|
|
|
if not thread:
|
|
|
# skip old junk (left-overs from a fork)
|
|
|
continue
|
|
@@ -178,7 +193,7 @@ def cry(): # pragma: no cover
|
|
|
|
|
|
|
|
|
def maybe_reraise():
|
|
|
- """Reraise if an exception is currently being handled, or return
|
|
|
+ """Re-raise if an exception is currently being handled, or return
|
|
|
otherwise."""
|
|
|
exc_info = sys.exc_info()
|
|
|
try:
|
|
@@ -192,6 +207,8 @@ def maybe_reraise():
|
|
|
def strtobool(term, table={'false': False, 'no': False, '0': False,
|
|
|
'true': True, 'yes': True, '1': True,
|
|
|
'on': True, 'off': False}):
|
|
|
+ """Convert common terms for true/false to bool
|
|
|
+ (true/false/yes/no/on/off/1/0)."""
|
|
|
if isinstance(term, string_t):
|
|
|
try:
|
|
|
return table[term.lower()]
|
|
@@ -247,6 +264,7 @@ def jsonify(obj,
|
|
|
|
|
|
|
|
|
def gen_task_name(app, name, module_name):
|
|
|
+ """Generate task name from name/module pair."""
|
|
|
try:
|
|
|
module = sys.modules[module_name]
|
|
|
except KeyError:
|
|
@@ -267,10 +285,12 @@ def gen_task_name(app, name, module_name):
|
|
|
|
|
|
|
|
|
def nodename(name, hostname):
|
|
|
+ """Create node name from name/hostname pair."""
|
|
|
return NODENAME_SEP.join((name, hostname))
|
|
|
|
|
|
|
|
|
def nodesplit(nodename):
|
|
|
+ """Split node name into tuple of name/hostname."""
|
|
|
parts = nodename.split(NODENAME_SEP, 1)
|
|
|
if len(parts) == 1:
|
|
|
return None, parts[0]
|