|
@@ -6,7 +6,7 @@
|
|
|
Sending and receiving messages using Kombu.
|
|
|
|
|
|
"""
|
|
|
-from __future__ import absolute_import
|
|
|
+from __future__ import absolute_import, unicode_literals
|
|
|
|
|
|
import numbers
|
|
|
|
|
@@ -22,6 +22,7 @@ from kombu.utils.functional import maybe_list
|
|
|
|
|
|
from celery import signals
|
|
|
from celery.five import items, string_t
|
|
|
+from celery.local import try_import
|
|
|
from celery.utils.saferepr import saferepr
|
|
|
from celery.utils.text import indent as textindent
|
|
|
from celery.utils.timeutils import to_utc
|
|
@@ -30,6 +31,9 @@ from . import routes as _routes
|
|
|
|
|
|
__all__ = ['AMQP', 'Queues', 'task_message']
|
|
|
|
|
|
+# json in Python2.7 borks if dict contains byte keys.
|
|
|
+JSON_NEEDS_UNICODE_KEYS = not try_import('simplejson')
|
|
|
+
|
|
|
#: Human readable queue declaration.
|
|
|
QUEUE_FORMAT = """
|
|
|
.> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
|
|
@@ -40,6 +44,10 @@ task_message = namedtuple('task_message',
|
|
|
('headers', 'properties', 'body', 'sent_event'))
|
|
|
|
|
|
|
|
|
+def utf8dict(d, encoding='utf-8'):
|
|
|
+ return {k.encode(encoding): v for k, v in items(d)}
|
|
|
+
|
|
|
+
|
|
|
class Queues(dict):
|
|
|
"""Queue name⇒ declaration mapping.
|
|
|
|
|
@@ -311,6 +319,14 @@ class AMQP(object):
|
|
|
argsrepr = saferepr(args)
|
|
|
kwargsrepr = saferepr(kwargs)
|
|
|
|
|
|
+ if JSON_NEEDS_UNICODE_KEYS:
|
|
|
+ if callbacks:
|
|
|
+ callbacks = [utf8dict(callback) for callback in callbacks]
|
|
|
+ if errbacks:
|
|
|
+ errbacks = [utf8dict(errback) for errback in errbacks]
|
|
|
+ if chord:
|
|
|
+ chord = utf8dict(chord)
|
|
|
+
|
|
|
return task_message(
|
|
|
headers={
|
|
|
'lang': 'py',
|
|
@@ -380,6 +396,14 @@ class AMQP(object):
|
|
|
eta = eta and eta.isoformat()
|
|
|
expires = expires and expires.isoformat()
|
|
|
|
|
|
+ if JSON_NEEDS_UNICODE_KEYS:
|
|
|
+ if callbacks:
|
|
|
+ callbacks = [utf8dict(callback) for callback in callbacks]
|
|
|
+ if errbacks:
|
|
|
+ errbacks = [utf8dict(errback) for errback in errbacks]
|
|
|
+ if chord:
|
|
|
+ chord = utf8dict(chord)
|
|
|
+
|
|
|
return task_message(
|
|
|
headers={},
|
|
|
properties={
|