瀏覽代碼

Tests passing

Ask Solem 11 年之前
父節點
當前提交
131912f693

+ 1 - 1
celery/bin/amqp.py

@@ -71,7 +71,7 @@ class Spec(object):
 
         E.g. if :attr:`args` is `[('is_active', bool)]`:
 
-            >>> coerce(0, 'False')
+            >>> obj.coerce(0, 'False')
             False
 
         """

+ 17 - 11
celery/five.py

@@ -18,6 +18,8 @@ __all__ = ['Counter', 'reload', 'UserList', 'UserDict', 'Queue', 'Empty',
            'class_property', 'reclassmethod', 'create_module',
            'recreate_module', 'monotonic']
 
+import io
+
 try:
     from collections import Counter
 except ImportError:  # pragma: no cover
@@ -61,6 +63,7 @@ if PY3:  # pragma: no cover
     text_t = str
     range = range
     int_types = (int, )
+    _byte_t = bytes
 
     open_fqdn = 'builtins.open'
 
@@ -83,15 +86,6 @@ if PY3:  # pragma: no cover
             raise value.with_traceback(tb)
         raise value
 
-    from io import StringIO
-
-    class WhateverIO(StringIO):
-
-        def write(self, data):
-            if isinstance(data, bytes):
-                data = data.encode()
-            StringIO.write(self, data)
-
 else:
     import __builtin__ as builtins  # noqa
     from Queue import Queue, Empty  # noqa
@@ -102,6 +96,7 @@ else:
     long_t = long                   # noqa
     range = xrange
     int_types = (int, long)
+    _byte_t = (str, bytes)
 
     open_fqdn = '__builtin__.open'
 
@@ -131,8 +126,6 @@ else:
 
     exec_("""def reraise(tp, value, tb=None): raise tp, value, tb""")
 
-    from io import StringIO as WhateverIO  # noqa
-
 
 def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])):
     """Class decorator to set metaclass.
@@ -385,3 +378,16 @@ def get_origins(defs):
     for module, attrs in items(defs):
         origins.update(dict((attr, module) for attr in attrs))
     return origins
+
+
+_SIO_write = io.StringIO.write
+_SIO_init = io.StringIO.__init__
+
+
+class WhateverIO(io.StringIO):
+
+    def __init__(self, v=None, *a, **kw):
+        _SIO_init(self, v.decode() if isinstance(v, _byte_t) else v, *a, **kw)
+
+    def write(self, data):
+        _SIO_write(self, data.decode() if isinstance(data, _byte_t) else data)

+ 7 - 2
celery/local.py

@@ -13,6 +13,7 @@
 from __future__ import absolute_import
 
 import importlib
+import sys
 
 from .five import long_t, string
 
@@ -20,6 +21,8 @@ __all__ = ['Proxy', 'PromiseProxy', 'try_import', 'maybe_evaluate']
 
 __module__ = __name__  # used by Proxy class body
 
+PY3 = sys.version_info[0] == 3
+
 
 def _default_cls_attr(name, type_, cls_value):
     # Proxy uses properties to forward the standard
@@ -160,7 +163,6 @@ class Proxy(object):
     __ne__ = lambda x, o: x._get_current_object() != o
     __gt__ = lambda x, o: x._get_current_object() > o
     __ge__ = lambda x, o: x._get_current_object() >= o
-    __cmp__ = lambda x, o: cmp(x._get_current_object(), o)
     __hash__ = lambda x: hash(x._get_current_object())
     __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
     __len__ = lambda x: len(x._get_current_object())
@@ -188,7 +190,6 @@ class Proxy(object):
     __invert__ = lambda x: ~(x._get_current_object())
     __complex__ = lambda x: complex(x._get_current_object())
     __int__ = lambda x: int(x._get_current_object())
-    __long__ = lambda x: long_t(x._get_current_object())
     __float__ = lambda x: float(x._get_current_object())
     __oct__ = lambda x: oct(x._get_current_object())
     __hex__ = lambda x: hex(x._get_current_object())
@@ -198,6 +199,10 @@ class Proxy(object):
     __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
     __reduce__ = lambda x: x._get_current_object().__reduce__()
 
+    if not PY3:
+        __cmp__ = lambda x, o: cmp(x._get_current_object(), o)
+        __long__ = lambda x: long_t(x._get_current_object())
+
 
 class PromiseProxy(Proxy):
     """This is a proxy to an object that has not yet been evaulated.

+ 8 - 2
celery/tests/app/test_amqp.py

@@ -58,7 +58,10 @@ class test_TaskProducer(AppCase):
         now = datetime.datetime(2013, 11, 26, 16, 48, 46)
         prod.publish_task('tasks.add', (1, 1), {}, retry=False,
                           countdown=10, now=now)
-        self.assertEqual(prod.publish.call_args[0][0]['eta'], '2013-11-26T16:48:56+00:00')
+        self.assertEqual(
+            prod.publish.call_args[0][0]['eta'],
+            '2013-11-26T16:48:56+00:00',
+        )
 
     def test_publish_with_countdown_and_timezone(self):
         # use timezone with fixed offset to be sure it won't be changed
@@ -69,7 +72,10 @@ class test_TaskProducer(AppCase):
         now = datetime.datetime(2013, 11, 26, 16, 48, 46)
         prod.publish_task('tasks.add', (2, 2), {}, retry=False,
                           countdown=20, now=now)
-        self.assertEqual(prod.publish.call_args[0][0]['eta'], '2013-11-26T18:49:06+02:00')
+        self.assertEqual(
+            prod.publish.call_args[0][0]['eta'],
+            '2013-11-26T18:49:06+02:00',
+        )
 
     def test_event_dispatcher(self):
         prod = self.app.amqp.TaskProducer(Mock())

+ 2 - 3
celery/tests/compat_modules/test_http.py

@@ -3,7 +3,6 @@ from __future__ import absolute_import, unicode_literals
 
 from contextlib import contextmanager
 from functools import wraps
-from io import StringIO
 try:
     from urllib import addinfourl
 except ImportError:  # py3k
@@ -12,7 +11,7 @@ except ImportError:  # py3k
 from anyjson import dumps
 from kombu.utils.encoding import from_utf8
 
-from celery.five import items
+from celery.five import WhateverIO, items
 from celery.task import http
 from celery.tests.case import AppCase, Case
 
@@ -25,7 +24,7 @@ def mock_urlopen(response_method):
     @wraps(urlopen)
     def _mocked(url, *args, **kwargs):
         response_data, headers = response_method(url)
-        return addinfourl(StringIO(response_data), headers, url)
+        return addinfourl(WhateverIO(response_data), headers, url)
 
     http.urlopen = _mocked
 

+ 2 - 2
celery/tests/fixups/test_django.py

@@ -23,9 +23,9 @@ class FixupCase(AppCase):
     def fixup_context(self, app):
         with patch('celery.fixups.django.DjangoWorkerFixup.validate_models'):
             with patch('celery.fixups.django.symbol_by_name') as symbyname:
-                with patch('celery.fixups.django.import_module') as import_module:
+                with patch('celery.fixups.django.import_module') as impmod:
                     f = self.Fixup(app)
-                    yield f, import_module, symbyname
+                    yield f, impmod, symbyname
 
 
 class test_DjangoFixup(FixupCase):

+ 2 - 3
celery/utils/__init__.py

@@ -8,7 +8,6 @@
 """
 from __future__ import absolute_import, print_function
 
-import io
 import os
 import socket
 import sys
@@ -23,7 +22,7 @@ from pprint import pprint
 from kombu.entity import Exchange, Queue
 
 from celery.exceptions import CPendingDeprecationWarning, CDeprecationWarning
-from celery.five import items, reraise, string_t
+from celery.five import WhateverIO, items, reraise, string_t
 
 __all__ = ['worker_direct', 'warn_deprecated', 'deprecated', 'lpmerge',
            'is_iterable', 'isatty', 'cry', 'maybe_reraise', 'strtobool',
@@ -151,7 +150,7 @@ def cry(out=None, sepchr='=', seplen=49):  # pragma: no cover
     taken from https://gist.github.com/737056."""
     import threading
 
-    out = io.StringIO() if out is None else out
+    out = WhateverIO() if out is None else out
     P = partial(print, file=out)
 
     # get a map of threads by their ID so we can print their names

+ 1 - 2
celery/worker/consumer.py

@@ -120,7 +120,7 @@ MINGLE_GET_FIELDS = itemgetter('clock', 'revoked')
 
 def dump_body(m, body):
     if isinstance(body, buffer_t):
-        body = bytes_t(buffer)
+        body = bytes_t(body)
     return '{0} ({1}b)'.format(truncate(safe_repr(body), 1024),
                                len(m.body))
 
@@ -781,7 +781,6 @@ class Gossip(bootsteps.ConsumerStep):
             self.clock.forward()
 
 
-
 class Evloop(bootsteps.StartStopStep):
     label = 'event loop'
     last = True