Kaynağa Gözat

Backports py3k support from 3.0-devel branch into 2.4

Ask Solem 13 yıl önce
ebeveyn
işleme
2846d8868c
39 değiştirilmiş dosya ile 492 ekleme ve 260 silme
  1. 5 2
      celery/backends/base.py
  2. 9 9
      celery/beat.py
  3. 2 1
      celery/bin/celeryd_multi.py
  4. 2 2
      celery/concurrency/base.py
  5. 38 14
      celery/concurrency/processes/pool.py
  6. 70 11
      celery/datastructures.py
  7. 3 2
      celery/execute/trace.py
  8. 17 15
      celery/log.py
  9. 14 5
      celery/task/http.py
  10. 1 0
      celery/tests/__init__.py
  11. 9 9
      celery/tests/test_app/test_beat.py
  12. 4 2
      celery/tests/test_app/test_loaders.py
  13. 4 4
      celery/tests/test_backends/test_amqp.py
  14. 7 2
      celery/tests/test_backends/test_base.py
  15. 3 3
      celery/tests/test_backends/test_cache.py
  16. 5 0
      celery/tests/test_bin/test_celerybeat.py
  17. 2 2
      celery/tests/test_bin/test_celeryd.py
  18. 7 4
      celery/tests/test_concurrency/test_concurrency_processes.py
  19. 47 34
      celery/tests/test_events/test_events_state.py
  20. 2 2
      celery/tests/test_task/__init__.py
  21. 6 1
      celery/tests/test_task/test_result.py
  22. 5 2
      celery/tests/test_task/test_task_http.py
  23. 6 6
      celery/tests/test_utils/test_timer2.py
  24. 9 2
      celery/tests/test_utils/test_utils_encoding.py
  25. 5 5
      celery/tests/test_worker/test_worker_autoscale.py
  26. 32 26
      celery/tests/test_worker/test_worker_job.py
  27. 8 8
      celery/tests/test_worker/test_worker_mediator.py
  28. 3 3
      celery/tests/utils.py
  29. 12 7
      celery/utils/__init__.py
  30. 15 5
      celery/utils/compat.py
  31. 42 5
      celery/utils/encoding.py
  32. 24 9
      celery/utils/timer2.py
  33. 6 6
      celery/worker/autoscale.py
  34. 14 8
      celery/worker/job.py
  35. 6 6
      celery/worker/mediator.py
  36. 39 0
      contrib/release/py3k-run-tests
  37. 5 0
      requirements/default-py3k.txt
  38. 2 2
      setup.py
  39. 2 36
      tox.ini

+ 5 - 2
celery/backends/base.py

@@ -12,11 +12,13 @@ from .. import states
 from ..datastructures import LRUCache
 from ..exceptions import TimeoutError, TaskRevokedError
 from ..utils import timeutils
+from ..utils.encoding import from_utf8
 from ..utils.serialization import (get_pickled_exception,
                                    get_pickleable_exception,
                                    create_exception_cls)
 
 EXCEPTION_ABLE_CODECS = frozenset(["pickle", "yaml"])
+is_py3k = sys.version_info >= (3, 0)
 
 
 def unpickle_backend(cls, args, kwargs):
@@ -51,7 +53,8 @@ class BaseBackend(object):
         return payload
 
     def decode(self, payload):
-        return serialization.decode(str(payload),
+        payload = is_py3k and payload or str(payload)
+        return serialization.decode(payload,
                                     content_type=self.content_type,
                                     content_encoding=self.content_encoding)
 
@@ -108,7 +111,7 @@ class BaseBackend(object):
         """Convert serialized exception to Python exception."""
         if self.serializer in EXCEPTION_ABLE_CODECS:
             return get_pickled_exception(exc)
-        return create_exception_cls(exc["exc_type"].encode("utf-8"),
+        return create_exception_cls(from_utf8(exc["exc_type"]),
                                     sys.modules[__name__])
 
     def prepare_value(self, result):

+ 9 - 9
celery/beat.py

@@ -79,13 +79,13 @@ class ScheduleEntry(object):
     def _default_now(self):
         return datetime.now()
 
-    def next(self, last_run_at=None):
+    def _next_instance(self, last_run_at=None):
         """Returns a new instance of the same class, but with
         its date and count fields updated."""
         return self.__class__(**dict(self,
                                      last_run_at=last_run_at or datetime.now(),
                                      total_run_count=self.total_run_count + 1))
-    __next__ = next  # for 2to3
+    __next__ = next = _next_instance  # for 2to3
 
     def update(self, other):
         """Update values from another entry.
@@ -372,8 +372,8 @@ class Service(object):
         self.schedule_filename = schedule_filename or \
                                     app.conf.CELERYBEAT_SCHEDULE_FILENAME
 
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
+        self._is_shutdown = threading.Event()
+        self._is_stopped = threading.Event()
         self.debug = SilenceRepeated(self.logger.debug,
                         10 if self.max_interval < 60 else 1)
 
@@ -388,24 +388,24 @@ class Service(object):
             platforms.set_process_title("celerybeat")
 
         try:
-            while not self._shutdown.isSet():
+            while not self._is_shutdown.isSet():
                 interval = self.scheduler.tick()
                 self.debug("Celerybeat: Waking up %s." % (
                         humanize_seconds(interval, prefix="in ")))
                 time.sleep(interval)
         except (KeyboardInterrupt, SystemExit):
-            self._shutdown.set()
+            self._is_shutdown.set()
         finally:
             self.sync()
 
     def sync(self):
         self.scheduler.close()
-        self._stopped.set()
+        self._is_stopped.set()
 
     def stop(self, wait=False):
         self.logger.info("Celerybeat: Shutting down...")
-        self._shutdown.set()
-        wait and self._stopped.wait()  # block until shutdown done.
+        self._is_shutdown.set()
+        wait and self._is_stopped.wait()  # block until shutdown done.
 
     def get_scheduler(self, lazy=False):
         filename = self.schedule_filename

+ 2 - 1
celery/bin/celeryd_multi.py

@@ -100,6 +100,7 @@ from time import sleep
 
 from .. import __version__
 from ..utils import term
+from ..utils.encoding import from_utf8
 
 SIGNAMES = set(sig for sig in dir(signal)
                         if sig.startswith("SIG") and "_" not in sig)
@@ -368,7 +369,7 @@ class MultiTool(object):
 
     def waitexec(self, argv, path=sys.executable):
         args = " ".join([path] + list(argv))
-        argstr = shlex.split(args.encode("utf-8"))
+        argstr = shlex.split(from_utf8(args))
         pipe = Popen(argstr, env=self.env)
         self.info("  %s" % " ".join(argstr))
         retcode = pipe.wait()

+ 2 - 2
celery/concurrency/base.py

@@ -106,8 +106,8 @@ class BasePool(object):
         else:
             self.safe_apply_callback(callback, ret_value)
 
-    def on_worker_error(self, errback, exc):
-        errback(ExceptionInfo((exc.__class__, exc, None)))
+    def on_worker_error(self, errback, exc_info):
+        errback(exc_info)
 
     def safe_apply_callback(self, fun, *args):
         if fun:

+ 38 - 14
celery/concurrency/processes/pool.py

@@ -27,6 +27,7 @@ from multiprocessing import Process, cpu_count, TimeoutError
 from multiprocessing import util
 from multiprocessing.util import Finalize, debug
 
+from celery.datastructures import ExceptionInfo
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import WorkerLostError
 
@@ -74,16 +75,30 @@ class LaxBoundedSemaphore(threading._Semaphore):
         _Semaphore.__init__(self, value, verbose)
         self._initial_value = value
 
-    def release(self):
-        if self._Semaphore__value < self._initial_value:
-            _Semaphore.release(self)
-        if __debug__:
-            self._note("%s.release: success, value=%s (unchanged)" % (
-                self, self._Semaphore__value))
+    if sys.version_info >= (3, 0):
 
-    def clear(self):
-        while self._Semaphore__value < self._initial_value:
-            _Semaphore.release(self)
+        def release(self):
+            if self._value < self._initial_value:
+                _Semaphore.release(self)
+            if __debug__:
+                self._note("%s.release: success, value=%s (unchanged)" % (
+                    self, self._value))
+
+        def clear(self):
+            while self._value < self._initial_value:
+                _Semaphore.release(self)
+    else:
+
+        def release(self):  # noqa
+            if self._Semaphore__value < self._initial_value:
+                _Semaphore.release(self)
+            if __debug__:
+                self._note("%s.release: success, value=%s (unchanged)" % (
+                    self, self._Semaphore__value))
+
+        def clear(self):  # noqa
+            while self._Semaphore__value < self._initial_value:
+                _Semaphore.release(self)
 
 #
 # Exceptions
@@ -166,8 +181,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
         put((ACK, (job, i, time.time(), pid)))
         try:
             result = (True, func(*args, **kwds))
-        except Exception, e:
-            result = (False, e)
+        except Exception:
+            result = (False, ExceptionInfo(sys.exc_info()))
         try:
             put((READY, (job, i, result)))
         except Exception, exc:
@@ -320,7 +335,12 @@ class TimeoutHandler(PoolThread):
                 return
             debug('hard time limit exceeded for %i', i)
             # Remove from cache and set return value to an exception
-            job._set(i, (False, TimeLimitExceeded(hard_timeout)))
+            exc_info = None
+            try:
+                raise TimeLimitExceeded(hard_timeout)
+            except TimeLimitExceeded:
+                exc_info = sys.exc_info()
+            job._set(i, (False, ExceptionInfo(exc_info)))
 
             # Remove from _pool
             process, _index = _process_by_pid(job._worker_pid)
@@ -571,8 +591,12 @@ class Pool(object):
                 if not job.ready() and job._worker_lost]:
             now = now or time.time()
             if now - job._worker_lost > lost_worker_timeout:
-                err = WorkerLostError("Worker exited prematurely.")
-                job._set(None, (False, err))
+                exc_info = None
+                try:
+                    raise WorkerLostError("Worker exited prematurely.")
+                except WorkerLostError:
+                    exc_info = sys.exc_info()
+                job._set(None, (False, exc_info))
 
         if shutdown and not len(self._pool):
             raise WorkersJoined()

+ 70 - 11
celery/datastructures.py

@@ -11,6 +11,7 @@ Custom data structures.
 from __future__ import absolute_import
 from __future__ import with_statement
 
+import sys
 import time
 import traceback
 
@@ -81,8 +82,16 @@ class DictAttribute(object):
     def __contains__(self, key):
         return hasattr(self.obj, key)
 
-    def iteritems(self):
+    def _iterate_items(self):
         return vars(self.obj).iteritems()
+    iteritems = _iterate_items
+
+    if sys.version_info >= (3, 0):
+        items = _iterate_items
+    else:
+
+        def items(self):
+            return list(self._iterate_items())
 
 
 class ConfigurationView(AttributeDictMixin):
@@ -147,23 +156,53 @@ class ConfigurationView(AttributeDictMixin):
         # changes takes precedence.
         return chain(*[op(d) for d in reversed(self._order)])
 
-    def iterkeys(self):
+    def _iterate_keys(self):
         return self._iter(lambda d: d.iterkeys())
+    iterkeys = _iterate_keys
 
-    def iteritems(self):
+    def _iterate_items(self):
         return self._iter(lambda d: d.iteritems())
+    iteritems = _iterate_items
 
-    def itervalues(self):
+    def _iterate_values(self):
         return self._iter(lambda d: d.itervalues())
+    itervalues = _iterate_values
 
     def keys(self):
-        return list(self.iterkeys())
+        return list(self._iterate_keys())
 
     def items(self):
-        return list(self.iteritems())
+        return list(self._iterate_items())
 
     def values(self):
-        return list(self.itervalues())
+        return list(self._iterate_values())
+
+
+class _Code(object):
+
+    def __init__(self, code):
+        self.co_filename = code.co_filename
+        self.co_name = code.co_name
+
+
+class _Frame(object):
+    Code = _Code
+
+    def __init__(self, frame):
+        self.f_globals = {"__file__": frame.f_globals["__file__"]}
+        self.f_code = self.Code(frame.f_code)
+
+
+class _Traceback(object):
+    Frame = _Frame
+
+    def __init__(self, tb):
+        self.tb_frame = self.Frame(tb.tb_frame)
+        self.tb_lineno = tb.tb_lineno
+        if tb.tb_next is None:
+            self.tb_next = None
+        else:
+            self.tb_next = _Traceback(tb.tb_next)
 
 
 class ExceptionInfo(object):
@@ -174,15 +213,21 @@ class ExceptionInfo(object):
 
     """
 
-    #: The original exception.
+    #: Exception type.
+    type = None
+
+    #: Exception instance.
     exception = None
 
-    #: A traceback form the point when :attr:`exception` was raised.
+    #: Pickleable traceback instance for use with :mod:`traceback`
+    tb = None
+
+    #: String representation of the traceback.
     traceback = None
 
     def __init__(self, exc_info):
-        _, exception, _ = exc_info
-        self.exception = exception
+        self.type, self.exception, tb = exc_info
+        self.tb = _Traceback(tb)
         self.traceback = ''.join(traceback.format_exception(*exc_info))
 
     def __str__(self):
@@ -191,6 +236,10 @@ class ExceptionInfo(object):
     def __repr__(self):
         return "<ExceptionInfo: %r>" % (self.exception, )
 
+    @property
+    def exc_info(self):
+        return self.type, self.exception, self.tb
+
 
 def consume_queue(queue):
     """Iterator yielding all immediately available items in a
@@ -311,6 +360,16 @@ class LRUCache(UserDict):
             value = self[key] = self.data.pop(key)
             return value
 
+    def keys(self):
+        # userdict.keys in py3k calls __getitem__
+        return self.data.keys()
+
+    def values(self):
+        return self.data.values()
+
+    def items(self):
+        return self.data.items()
+
     def __setitem__(self, key, value):
         # remove least recently used key.
         with self.mutex:

+ 3 - 2
celery/execute/trace.py

@@ -91,13 +91,14 @@ class TaskTrace(object):
         handler = self._trace_handlers[trace.status]
         r = handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
         self.handle_after_return(trace.status, trace.retval,
-                                 trace.exc_type, trace.tb, trace.strtb)
+                                 trace.exc_type, trace.tb, trace.strtb,
+                                 einfo=trace.exc_info)
         return r
 
     def handle_after_return(self, status, retval, type_, tb, strtb,
             einfo=None):
         if status in states.EXCEPTION_STATES:
-            einfo = ExceptionInfo((retval, type_, tb))
+            einfo = ExceptionInfo(einfo)
         self.task.after_return(status, retval, self.task_id,
                                self.args, self.kwargs, einfo)
 

+ 17 - 15
celery/log.py

@@ -21,6 +21,8 @@ from .utils.encoding import safe_str
 from .utils.patch import ensure_process_aware_logger
 from .utils.term import colored
 
+is_py3k = sys.version_info >= (3, 0)
+
 
 class ColorFormatter(logging.Formatter):
     #: Loglevel -> Color mapping.
@@ -34,8 +36,8 @@ class ColorFormatter(logging.Formatter):
 
     def formatException(self, ei):
         r = logging.Formatter.formatException(self, ei)
-        if isinstance(r, str):
-            return r.decode("utf-8", "replace")    # Convert to unicode
+        if isinstance(r, str) and not is_py3k:
+            return safe_str(r)
         return r
 
     def format(self, record):
@@ -50,16 +52,15 @@ class ColorFormatter(logging.Formatter):
                         type(record.msg), exc)
                 record.exc_info = sys.exc_info()
 
-        # Very ugly, but have to make sure processName is supported
-        # by foreign logger instances.
-        # (processName is always supported by Python 2.7)
-        if "processName" not in record.__dict__:
-            process_name = current_process and current_process()._name or ""
-            record.__dict__["processName"] = process_name
-        t = logging.Formatter.format(self, record)
-        if isinstance(t, unicode):
-            return t.encode("utf-8", "replace")
-        return t
+        if not is_py3k:
+            # Very ugly, but have to make sure processName is supported
+            # by foreign logger instances.
+            # (processName is always supported by Python 2.7)
+            if "processName" not in record.__dict__:
+                process_name = (current_process and
+                                current_process()._name or "")
+                record.__dict__["processName"] = process_name
+        return safe_str(logging.Formatter.format(self, record))
 
 
 class Logging(object):
@@ -105,7 +106,8 @@ class Logging(object):
 
         if mputil and hasattr(mputil, "_logger"):
             mputil._logger = None
-        ensure_process_aware_logger()
+        if not is_py3k:
+            ensure_process_aware_logger()
         receivers = signals.setup_logging.send(sender=None,
                         loglevel=loglevel, logfile=logfile,
                         format=format, colorize=colorize)
@@ -203,9 +205,9 @@ class Logging(object):
         """
         proxy = LoggingProxy(logger, loglevel)
         if stdout:
-            sys.stdout = proxy
+            sys.stdout = sys.__stdout__ = proxy
         if stderr:
-            sys.stderr = proxy
+            sys.stderr = sys.__stderr__ = proxy
         return proxy
 
     def _setup_logger(self, logger, logfile, format, colorize,

+ 14 - 5
celery/task/http.py

@@ -1,5 +1,6 @@
 from __future__ import absolute_import
 
+import sys
 import urllib2
 
 from urllib import urlencode
@@ -36,11 +37,19 @@ def maybe_utf8(value):
     return value
 
 
-def utf8dict(tup):
-    """With a dict's items() tuple return a new dict with any utf-8
-    keys/values encoded."""
-    return dict((key.encode("utf-8"), maybe_utf8(value))
-                    for key, value in tup)
+if sys.version_info >= (3, 0):
+
+    def utf8dict(tup):
+        if not isinstance(tup, dict):
+            return dict(tup)
+        return tup
+else:
+
+    def utf8dict(tup):  # noqa
+        """With a dict's items() tuple return a new dict with any utf-8
+        keys/values encoded."""
+        return dict((key.encode("utf-8"), maybe_utf8(value))
+                        for key, value in tup)
 
 
 def extract_response(raw_response):

+ 1 - 0
celery/tests/__init__.py

@@ -11,6 +11,7 @@ os.environ.setdefault("CELERY_CONFIG_MODULE", config_module)
 os.environ["CELERY_LOADER"] = "default"
 os.environ["EVENTLET_NOPATCH"] = "yes"
 os.environ["GEVENT_NOPATCH"] = "yes"
+os.environ["KOMBU_DISABLE_LIMIT_PROTECTION"] = "yes"
 
 try:
     WindowsError = WindowsError  # noqa

+ 9 - 9
celery/tests/test_app/test_beat.py

@@ -191,7 +191,7 @@ class test_Scheduler(unittest.TestCase):
         self.assertTrue(scheduler.logger.logged[0])
         level, msg, args, kwargs = scheduler.logger.logged[0]
         self.assertEqual(level, logging.ERROR)
-        self.assertIn("Couldn't apply scheduled task", args[0].message)
+        self.assertIn("Couldn't apply scheduled task", args[0].args[0])
 
     def test_due_tick_RuntimeError(self):
         scheduler = mSchedulerRuntimeError()
@@ -262,7 +262,7 @@ class test_Service(unittest.TestCase):
                 if self.tick_raises_exit:
                     raise SystemExit()
                 if self.shutdown_service:
-                    self.shutdown_service._shutdown.set()
+                    self.shutdown_service._is_shutdown.set()
                 return 0.0
 
         return beat.Service(scheduler_cls=PersistentScheduler), sh
@@ -279,12 +279,12 @@ class test_Service(unittest.TestCase):
         s.sync()
         self.assertTrue(sh.closed)
         self.assertTrue(sh.synced)
-        self.assertTrue(s._stopped.isSet())
+        self.assertTrue(s._is_stopped.isSet())
         s.sync()
         s.stop(wait=False)
-        self.assertTrue(s._shutdown.isSet())
+        self.assertTrue(s._is_shutdown.isSet())
         s.stop(wait=True)
-        self.assertTrue(s._shutdown.isSet())
+        self.assertTrue(s._is_shutdown.isSet())
 
         p = s.scheduler._store
         s.scheduler._store = None
@@ -295,25 +295,25 @@ class test_Service(unittest.TestCase):
 
     def test_start_embedded_process(self):
         s, sh = self.get_service()
-        s._shutdown.set()
+        s._is_shutdown.set()
         s.start(embedded_process=True)
 
     def test_start_thread(self):
         s, sh = self.get_service()
-        s._shutdown.set()
+        s._is_shutdown.set()
         s.start(embedded_process=False)
 
     def test_start_tick_raises_exit_error(self):
         s, sh = self.get_service()
         s.scheduler.tick_raises_exit = True
         s.start()
-        self.assertTrue(s._shutdown.isSet())
+        self.assertTrue(s._is_shutdown.isSet())
 
     def test_start_manages_one_tick_before_shutdown(self):
         s, sh = self.get_service()
         s.scheduler.shutdown_service = s
         s.start()
-        self.assertTrue(s._shutdown.isSet())
+        self.assertTrue(s._is_shutdown.isSet())
 
 
 class test_EmbeddedService(unittest.TestCase):

+ 4 - 2
celery/tests/test_app/test_loaders.py

@@ -112,8 +112,10 @@ class TestLoaderBase(unittest.TestCase):
         self.assertEqual(self.loader.conf["foo"], "bar")
 
     def test_import_default_modules(self):
-        self.assertEqual(sorted(self.loader.import_default_modules()),
-                         sorted([os, sys, task]))
+        modnames = lambda l: [m.__name__ for m in l]
+        self.assertEqual(sorted(modnames(
+                            self.loader.import_default_modules())),
+                         sorted(modnames([os, sys, task])))
 
     def test_import_from_cwd_custom_imp(self):
 

+ 4 - 4
celery/tests/test_backends/test_amqp.py

@@ -63,10 +63,10 @@ class test_AMQPBackend(unittest.TestCase):
             raise KeyError("foo")
         except KeyError, exception:
             einfo = ExceptionInfo(sys.exc_info())
-        tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)
-        self.assertEqual(tb2.get_status(tid3), states.FAILURE)
-        self.assertIsInstance(tb2.get_result(tid3), KeyError)
-        self.assertEqual(tb2.get_traceback(tid3), einfo.traceback)
+            tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)
+            self.assertEqual(tb2.get_status(tid3), states.FAILURE)
+            self.assertIsInstance(tb2.get_result(tid3), KeyError)
+            self.assertEqual(tb2.get_traceback(tid3), einfo.traceback)
 
     def test_repair_uuid(self):
         from celery.backends.amqp import repair_uuid

+ 7 - 2
celery/tests/test_backends/test_base.py

@@ -4,6 +4,7 @@ import sys
 import types
 
 from mock import Mock
+from nose import SkipTest
 
 from celery.utils import serialization
 from celery.utils.serialization import subclass_exception
@@ -25,8 +26,10 @@ class wrapobject(object):
     def __init__(self, *args, **kwargs):
         self.args = args
 
-
-Oldstyle = types.ClassType("Oldstyle", (), {})
+if sys.version_info >= (3, 0):
+    Oldstyle = None
+else:
+    Oldstyle = types.ClassType("Oldstyle", (), {})
 Unpickleable = subclass_exception("Unpickleable", KeyError, "foo.module")
 Impossible = subclass_exception("Impossible", object, "foo.module")
 Lookalike = subclass_exception("Lookalike", wrapobject, "foo.module")
@@ -105,6 +108,8 @@ class test_BaseBackend_interface(unittest.TestCase):
 class test_exception_pickle(unittest.TestCase):
 
     def test_oldstyle(self):
+        if Oldstyle is None:
+            raise SkipTest("py3k does not support old style classes")
         self.assertIsNone(fnpe(Oldstyle()))
 
     def test_BaseException(self):

+ 3 - 3
celery/tests/test_backends/test_cache.py

@@ -53,9 +53,9 @@ class test_CacheBackend(unittest.TestCase):
             raise KeyError("foo")
         except KeyError, exception:
             pass
-        tb.mark_as_failure(tid3, exception)
-        self.assertEqual(tb.get_status(tid3), states.FAILURE)
-        self.assertIsInstance(tb.get_result(tid3), KeyError)
+            tb.mark_as_failure(tid3, exception)
+            self.assertEqual(tb.get_status(tid3), states.FAILURE)
+            self.assertIsInstance(tb.get_result(tid3), KeyError)
 
     def test_mget(self):
         tb = CacheBackend(backend="memory://")

+ 5 - 0
celery/tests/test_bin/test_celerybeat.py

@@ -104,6 +104,11 @@ class test_Beat(AppCase):
         MockService.in_sync = False
 
     def test_setup_logging(self):
+        try:
+            # py3k
+            delattr(sys.stdout, "logger")
+        except AttributeError:
+            pass
         b = beatapp.Beat()
         b.redirect_stdouts = False
         b.setup_logging()

+ 2 - 2
celery/tests/test_bin/test_celeryd.py

@@ -25,7 +25,7 @@ from celery.bin.celeryd import WorkerCommand, windows_main, \
 from celery.exceptions import ImproperlyConfigured
 
 from celery.tests.compat import catch_warnings
-from celery.tests.utils import (AppCase, StringIO, mask_modules,
+from celery.tests.utils import (AppCase, WhateverIO, mask_modules,
                                 reset_modules, skip_unless_module)
 
 
@@ -37,7 +37,7 @@ def disable_stdouts(fun):
 
     @wraps(fun)
     def disable(*args, **kwargs):
-        sys.stdout, sys.stderr = StringIO(), StringIO()
+        sys.stdout, sys.stderr = WhateverIO(), WhateverIO()
         try:
             return fun(*args, **kwargs)
         finally:

+ 7 - 4
celery/tests/test_concurrency/test_concurrency_processes.py

@@ -131,12 +131,15 @@ class test_TaskPool(unittest.TestCase):
             scratch[0] = einfo
 
         pool = TaskPool(10)
-        exc = KeyError("foo")
-        pool.on_worker_error(errback, exc)
+        exc_info = None
+        try:
+            raise KeyError("foo")
+        except KeyError:
+            exc_info = ExceptionInfo(sys.exc_info())
+        pool.on_worker_error(errback, exc_info)
 
         self.assertTrue(scratch[0])
-        self.assertIs(scratch[0].exception, exc)
-        self.assertTrue(scratch[0].traceback)
+        self.assertIs(scratch[0], exc_info)
 
     def test_on_ready_exception(self):
         scratch = [None]

+ 47 - 34
celery/tests/test_events/test_events_state.py

@@ -14,15 +14,20 @@ class replay(object):
     def __init__(self, state):
         self.state = state
         self.rewind()
+        self.setup()
+
+    def setup(self):
+        pass
 
     def __iter__(self):
         return self
 
-    def next(self):
+    def __next__(self):
         try:
             self.state.event(self.events[self.position()])
         except IndexError:
             raise StopIteration()
+    next = __next__
 
     def rewind(self):
         self.position = count(0).next
@@ -34,48 +39,56 @@ class replay(object):
 
 
 class ev_worker_online_offline(replay):
-    events = [
-        Event("worker-online", hostname="utest1"),
-        Event("worker-offline", hostname="utest1"),
-    ]
+
+    def setup(self):
+        self.events = [
+            Event("worker-online", hostname="utest1"),
+            Event("worker-offline", hostname="utest1"),
+        ]
 
 
 class ev_worker_heartbeats(replay):
-    events = [
-        Event("worker-heartbeat", hostname="utest1",
-              timestamp=time() - HEARTBEAT_EXPIRE * 2),
-        Event("worker-heartbeat", hostname="utest1"),
-    ]
+
+    def setup(self):
+        self.events = [
+            Event("worker-heartbeat", hostname="utest1",
+                timestamp=time() - HEARTBEAT_EXPIRE * 2),
+            Event("worker-heartbeat", hostname="utest1"),
+        ]
 
 
 class ev_task_states(replay):
-    tid = uuid()
-    events = [
-        Event("task-received", uuid=tid, name="task1",
-              args="(2, 2)", kwargs="{'foo': 'bar'}",
-              retries=0, eta=None, hostname="utest1"),
-        Event("task-started", uuid=tid, hostname="utest1"),
-        Event("task-revoked", uuid=tid, hostname="utest1"),
-        Event("task-retried", uuid=tid, exception="KeyError('bar')",
-              traceback="line 2 at main", hostname="utest1"),
-        Event("task-failed", uuid=tid, exception="KeyError('foo')",
-              traceback="line 1 at main", hostname="utest1"),
-        Event("task-succeeded", uuid=tid, result="4",
-              runtime=0.1234, hostname="utest1"),
-    ]
+
+    def setup(self):
+        tid = self.tid = uuid()
+        self.events = [
+            Event("task-received", uuid=tid, name="task1",
+                args="(2, 2)", kwargs="{'foo': 'bar'}",
+                retries=0, eta=None, hostname="utest1"),
+            Event("task-started", uuid=tid, hostname="utest1"),
+            Event("task-revoked", uuid=tid, hostname="utest1"),
+            Event("task-retried", uuid=tid, exception="KeyError('bar')",
+                traceback="line 2 at main", hostname="utest1"),
+            Event("task-failed", uuid=tid, exception="KeyError('foo')",
+                traceback="line 1 at main", hostname="utest1"),
+            Event("task-succeeded", uuid=tid, result="4",
+                runtime=0.1234, hostname="utest1"),
+        ]
 
 
 class ev_snapshot(replay):
-    events = [
-        Event("worker-online", hostname="utest1"),
-        Event("worker-online", hostname="utest2"),
-        Event("worker-online", hostname="utest3"),
-    ]
-    for i in range(20):
-        worker = not i % 2 and "utest2" or "utest1"
-        type = not i % 2 and "task2" or "task1"
-        events.append(Event("task-received", name=type,
-                      uuid=uuid(), hostname=worker))
+
+    def setup(self):
+        self.events = [
+            Event("worker-online", hostname="utest1"),
+            Event("worker-online", hostname="utest2"),
+            Event("worker-online", hostname="utest3"),
+        ]
+        for i in range(20):
+            worker = not i % 2 and "utest2" or "utest1"
+            type = not i % 2 and "task2" or "task1"
+            self.events.append(Event("task-received", name=type,
+                          uuid=uuid(), hostname=worker))
 
 
 class test_Worker(unittest.TestCase):

+ 2 - 2
celery/tests/test_task/__init__.py

@@ -13,7 +13,7 @@ from celery.schedules import crontab, crontab_parser, ParseException
 from celery.utils import uuid
 from celery.utils.timeutils import parse_iso8601
 
-from celery.tests.utils import with_eager_tasks, unittest, StringIO
+from celery.tests.utils import with_eager_tasks, unittest, WhateverIO
 
 
 def return_True(*args, **kwargs):
@@ -407,7 +407,7 @@ class TestCeleryTasks(unittest.TestCase):
     def test_get_logger(self):
         T1 = self.createTaskCls("T1", "c.unittest.t.t1")
         t1 = T1()
-        logfh = StringIO()
+        logfh = WhateverIO()
         logger = t1.get_logger(logfile=logfh, loglevel=0)
         self.assertTrue(logger)
 

+ 6 - 1
celery/tests/test_task/test_result.py

@@ -1,3 +1,6 @@
+from __future__ import absolute_import
+from __future__ import with_statement
+
 from celery import states
 from celery.app import app_or_default
 from celery.utils import uuid
@@ -351,7 +354,9 @@ class TestFailedTaskSetResult(TestTaskSetResult):
         for i in xrange(self.size - 1):
             t = it.next()
             self.assertEqual(t.get(), i)
-        self.assertRaises(KeyError, it.next().get)
+        with self.assertRaises(KeyError):
+            t = it.next()   # need to do this in two lines or 2to3 borks.
+            t.get()
 
     def test_completed_count(self):
         self.assertEqual(self.ts.completed_count(), self.ts.total - 1)

+ 5 - 2
celery/tests/test_task/test_task_http.py

@@ -13,7 +13,9 @@ except ImportError:  # py3k
 from anyjson import serialize
 
 from celery.task import http
-from celery.tests.utils import unittest, StringIO
+from celery.tests.utils import unittest
+from celery.utils.compat import StringIO
+from celery.utils.encoding import from_utf8
 
 
 @contextmanager
@@ -53,8 +55,9 @@ def unknown_response():
 class TestEncodings(unittest.TestCase):
 
     def test_utf8dict(self):
+        uk = "foobar"
         d = {u"følelser ær langé": u"ærbadægzaååÆØÅ",
-              "foobar".encode("utf-8"): "xuzzybaz".encode("utf-8")}
+             from_utf8(uk): from_utf8("xuzzybaz")}
 
         for key, value in http.utf8dict(d.items()).items():
             self.assertIsInstance(key, str)

+ 6 - 6
celery/tests/test_utils/test_timer2.py

@@ -142,16 +142,16 @@ class test_Timer(unittest.TestCase):
     @patch("os._exit")
     def test_thread_crash(self, _exit):
         t = timer2.Timer()
-        t.next = Mock()
-        t.next.side_effect = OSError(131)
+        t._next_entry = Mock()
+        t._next_entry.side_effect = OSError(131)
         t.run()
         _exit.assert_called_with(1)
 
     def test_gc_race_lost(self):
         t = timer2.Timer()
-        t._stopped.set = Mock()
-        t._stopped.set.side_effect = TypeError()
+        t._is_stopped.set = Mock()
+        t._is_stopped.set.side_effect = TypeError()
 
-        t._shutdown.set()
+        t._is_shutdown.set()
         t.run()
-        t._stopped.set.assert_called_with()
+        t._is_stopped.set.assert_called_with()

+ 9 - 2
celery/tests/test_utils/test_utils_encoding.py

@@ -1,15 +1,22 @@
-from celery.utils import encoding
+import sys
+
+from nose import SkipTest
 
+from celery.utils import encoding
 from celery.tests.utils import unittest
 
 
 class test_encoding(unittest.TestCase):
 
-    def test_smart_str(self):
+    def test_safe_str(self):
         self.assertTrue(encoding.safe_str(object()))
         self.assertTrue(encoding.safe_str("foo"))
         self.assertTrue(encoding.safe_str(u"foo"))
 
+    def test_safe_str_UnicodeDecodeError(self):
+        if sys.version_info >= (3, 0):
+            raise SkipTest("py3k: not relevant")
+
         class foo(unicode):
 
             def encode(self, *args, **kwargs):

+ 5 - 5
celery/tests/test_worker/test_worker_autoscale.py

@@ -59,7 +59,7 @@ class test_Autoscaler(unittest.TestCase):
                 self.joined = True
 
         x = Scaler(self.pool, 10, 3, logger=logger)
-        x._stopped.set()
+        x._is_stopped.set()
         x.stop()
         self.assertTrue(x.joined)
         x.joined = False
@@ -91,12 +91,12 @@ class test_Autoscaler(unittest.TestCase):
 
             def scale(self):
                 self.scale_called = True
-                self._shutdown.set()
+                self._is_shutdown.set()
 
         x = Scaler(self.pool, 10, 3, logger=logger)
         x.run()
-        self.assertTrue(x._shutdown.isSet())
-        self.assertTrue(x._stopped.isSet())
+        self.assertTrue(x._is_shutdown.isSet())
+        self.assertTrue(x._is_stopped.isSet())
         self.assertTrue(x.scale_called)
 
     def test_shrink_raises_exception(self):
@@ -121,7 +121,7 @@ class test_Autoscaler(unittest.TestCase):
         class _Autoscaler(autoscale.Autoscaler):
 
             def scale(self):
-                self._shutdown.set()
+                self._is_shutdown.set()
                 raise OSError("foo")
 
         x = _Autoscaler(self.pool, 10, 3, logger=logger)

+ 32 - 26
celery/tests/test_worker/test_worker_job.py

@@ -11,6 +11,7 @@ from datetime import datetime, timedelta
 
 from kombu.transport.base import Message
 from mock import Mock
+from nose import SkipTest
 
 from celery import states
 from celery.app import app_or_default
@@ -22,6 +23,7 @@ from celery.log import setup_logger
 from celery.result import AsyncResult
 from celery.task.base import Task
 from celery.utils import uuid
+from celery.utils.encoding import from_utf8
 from celery.worker.job import (WorkerTaskTrace, TaskRequest,
                                InvalidTaskError, execute_and_trace,
                                default_encode)
@@ -29,7 +31,7 @@ from celery.worker.state import revoked
 
 from celery.tests.compat import catch_warnings
 from celery.tests.utils import unittest
-from celery.tests.utils import StringIO, wrap_logger
+from celery.tests.utils import WhateverIO, wrap_logger
 
 
 scratch = {"ACK": False}
@@ -74,6 +76,10 @@ def mytask_raising(i, **kwargs):
 
 class test_default_encode(unittest.TestCase):
 
+    def setUp(self):
+        if sys.version_info >= (3, 0):
+            raise SkipTest("py3k: not relevant")
+
     def test_jython(self):
         prev, sys.platform = sys.platform, "java 1.6.1"
         try:
@@ -99,8 +105,7 @@ class test_RetryTaskError(unittest.TestCase):
             raise Exception("foo")
         except Exception, exc:
             ret = RetryTaskError("Retrying task", exc)
-
-        self.assertEqual(ret.exc, exc)
+            self.assertEqual(ret.exc, exc)
 
 
 class test_WorkerTaskTrace(unittest.TestCase):
@@ -215,8 +220,8 @@ class test_TaskRequest(unittest.TestCase):
             raise RetryTaskError("foo", KeyError("moofoobar"))
         except:
             einfo = ExceptionInfo(sys.exc_info())
-        tw.on_failure(einfo)
-        self.assertIn("task-retried", tw.eventer.sent)
+            tw.on_failure(einfo)
+            self.assertIn("task-retried", tw.eventer.sent)
 
     def test_terminate__task_started(self):
         pool = Mock()
@@ -382,9 +387,9 @@ class test_TaskRequest(unittest.TestCase):
             raise WorkerLostError("do re mi")
         except WorkerLostError:
             exc_info = ExceptionInfo(sys.exc_info())
-        tw.on_failure(exc_info)
-        self.assertEqual(mytask.backend.get_status(tw.task_id),
-                         states.FAILURE)
+            tw.on_failure(exc_info)
+            self.assertEqual(mytask.backend.get_status(tw.task_id),
+                             states.FAILURE)
 
         mytask.ignore_result = True
         try:
@@ -404,8 +409,8 @@ class test_TaskRequest(unittest.TestCase):
                 raise KeyError("foo")
             except KeyError:
                 exc_info = ExceptionInfo(sys.exc_info())
-            tw.on_failure(exc_info)
-            self.assertTrue(tw.acknowledged)
+                tw.on_failure(exc_info)
+                self.assertTrue(tw.acknowledged)
         finally:
             mytask.acks_late = False
 
@@ -515,8 +520,9 @@ class test_TaskRequest(unittest.TestCase):
         self.assertTrue(x)
 
     def test_from_message(self):
+        us = u"æØåveéðƒeæ"
         body = {"task": mytask.name, "id": uuid(),
-                "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
+                "args": [2], "kwargs": {us: "bar"}}
         m = Message(None, body=anyjson.serialize(body), backend="foo",
                           content_type="application/json",
                           content_encoding="utf-8")
@@ -525,9 +531,9 @@ class test_TaskRequest(unittest.TestCase):
         self.assertEqual(tw.task_name, body["task"])
         self.assertEqual(tw.task_id, body["id"])
         self.assertEqual(tw.args, body["args"])
-        self.assertEqual(tw.kwargs.keys()[0],
-                          u"æØåveéðƒeæ".encode("utf-8"))
-        self.assertNotIsInstance(tw.kwargs.keys()[0], unicode)
+        us = from_utf8(us)
+        self.assertEqual(tw.kwargs.keys()[0], us)
+        self.assertIsInstance(tw.kwargs.keys()[0], str)
         self.assertTrue(tw.logger)
 
     def test_from_message_nonexistant_task(self):
@@ -632,20 +638,20 @@ class test_TaskRequest(unittest.TestCase):
         except Exception:
             exc_info = ExceptionInfo(sys.exc_info())
 
-        logfh = StringIO()
-        tw.logger.handlers = []
-        tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
-                                 root=False)
+            logfh = WhateverIO()
+            tw.logger.handlers = []
+            tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
+                                     root=False)
 
-        app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
+            app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
 
-        tw.on_failure(exc_info)
-        logvalue = logfh.getvalue()
-        self.assertIn(mytask.name, logvalue)
-        self.assertIn(tid, logvalue)
-        self.assertIn("ERROR", logvalue)
+            tw.on_failure(exc_info)
+            logvalue = logfh.getvalue()
+            self.assertIn(mytask.name, logvalue)
+            self.assertIn(tid, logvalue)
+            self.assertIn("ERROR", logvalue)
 
-        app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
+            app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
 
     def test_on_failure(self):
         self._test_on_failure(Exception("Inside unit tests"))
@@ -655,4 +661,4 @@ class test_TaskRequest(unittest.TestCase):
 
     def test_on_failure_utf8_exception(self):
         self._test_on_failure(Exception(
-            u"Бобры атакуют".encode('utf8')))
+            from_utf8(u"Бобры атакуют")))

+ 8 - 8
celery/tests/test_worker/test_worker_mediator.py

@@ -32,12 +32,12 @@ class test_Mediator(unittest.TestCase):
         ready_queue = Queue()
         m = Mediator(ready_queue, lambda t: t)
         m.start()
-        self.assertFalse(m._shutdown.isSet())
-        self.assertFalse(m._stopped.isSet())
+        self.assertFalse(m._is_shutdown.isSet())
+        self.assertFalse(m._is_stopped.isSet())
         m.stop()
         m.join()
-        self.assertTrue(m._shutdown.isSet())
-        self.assertTrue(m._stopped.isSet())
+        self.assertTrue(m._is_shutdown.isSet())
+        self.assertTrue(m._is_stopped.isSet())
 
     def test_mediator_move(self):
         ready_queue = Queue()
@@ -63,7 +63,7 @@ class test_Mediator(unittest.TestCase):
                 try:
                     raise KeyError("foo")
                 finally:
-                    ms[0]._shutdown.set()
+                    ms[0]._is_shutdown.set()
 
         ready_queue = Queue()
         ms[0] = m = _Mediator(ready_queue, None)
@@ -92,12 +92,12 @@ class test_Mediator(unittest.TestCase):
             condition[0].set()
 
         m = Mediator(ready_queue, mycallback)
-        condition[0] = m._shutdown
+        condition[0] = m._is_shutdown
         ready_queue.put(MockTask("Elaine M. Benes"))
 
         m.run()
-        self.assertTrue(m._shutdown.isSet())
-        self.assertTrue(m._stopped.isSet())
+        self.assertTrue(m._is_shutdown.isSet())
+        self.assertTrue(m._is_stopped.isSet())
 
     def test_mediator_move_revoked(self):
         ready_queue = Queue()

+ 3 - 3
celery/tests/utils.py

@@ -25,7 +25,7 @@ from nose import SkipTest
 
 from ..app import app_or_default
 from ..utils import noop
-from ..utils.compat import StringIO, LoggerAdapter
+from ..utils.compat import WhateverIO, LoggerAdapter
 
 
 class Mock(mock.Mock):
@@ -92,7 +92,7 @@ def set_handlers(logger, new_handlers):
 @contextmanager
 def wrap_logger(logger, loglevel=logging.ERROR):
     old_handlers = get_handlers(logger)
-    sio = StringIO()
+    sio = WhateverIO()
     siohandler = logging.StreamHandler(sio)
     set_handlers(logger, [siohandler])
 
@@ -256,7 +256,7 @@ def mask_modules(*modnames):
 def override_stdouts():
     """Override `sys.stdout` and `sys.stderr` with `StringIO`."""
     prev_out, prev_err = sys.stdout, sys.stderr
-    mystdout, mystderr = StringIO(), StringIO()
+    mystdout, mystderr = WhateverIO(), WhateverIO()
     sys.stdout = sys.__stdout__ = mystdout
     sys.stderr = sys.__stderr__ = mystderr
 

+ 12 - 7
celery/utils/__init__.py

@@ -158,15 +158,20 @@ def noop(*args, **kwargs):
     pass
 
 
-def kwdict(kwargs):
-    """Make sure keyword arguments are not in unicode.
+if sys.version_info >= (3, 0):
 
-    This should be fixed in newer Python versions,
-      see: http://bugs.python.org/issue4978.
+    def kwdict(kwargs):
+        return kwargs
+else:
+    def kwdict(kwargs):  # noqa
+        """Make sure keyword arguments are not in unicode.
 
-    """
-    return dict((key.encode("utf-8"), value)
-                    for key, value in kwargs.items())
+        This should be fixed in newer Python versions,
+        see: http://bugs.python.org/issue4978.
+
+        """
+        return dict((key.encode("utf-8"), value)
+                        for key, value in kwargs.items())
 
 
 def first(predicate, iterable):

+ 15 - 5
celery/utils/compat.py

@@ -1,4 +1,6 @@
 ############## py3k #########################################################
+import sys
+
 try:
     from UserList import UserList       # noqa
 except ImportError:
@@ -9,13 +11,21 @@ try:
 except ImportError:
     from collections import UserDict    # noqa
 
-try:
-    from cStringIO import StringIO      # noqa
-except ImportError:
+if sys.version_info >= (3, 0):
+    from io import StringIO, BytesIO
+    from .encoding import bytes_to_str
+
+    class WhateverIO(StringIO):
+
+        def write(self, data):
+            StringIO.write(self, bytes_to_str(data))
+else:
     try:
-        from StringIO import StringIO   # noqa
+        from cStringIO import StringIO  # noqa
     except ImportError:
-        from io import StringIO         # noqa
+        from StringIO import StringIO   # noqa
+    BytesIO = WhateverIO = StringIO     # noqa
+
 
 ############## collections.OrderedDict ######################################
 try:

+ 42 - 5
celery/utils/encoding.py

@@ -1,25 +1,62 @@
+from __future__ import absolute_import
+
 import sys
 import traceback
 
+__all__ = ["default_encoding", "safe_str", "safe_repr"]
+is_py3k = sys.version_info >= (3, 0)
+
+
+if sys.version_info >= (3, 0):
+
+    def str_to_bytes(s):
+        if isinstance(s, str):
+            return s.encode()
+        return s
+
+    def bytes_to_str(s):
+        if isinstance(s, bytes):
+            return s.decode()
+        return s
+
+    def from_utf8(s, *args, **kwargs):
+        return s
+
+else:
+    def str_to_bytes(s):                # noqa
+        return s
+
+    def bytes_to_str(s):                # noqa
+        return s
+
+    def from_utf8(s, *args, **kwargs):  # noqa
+        return s.encode("utf-8", *args, **kwargs)
+
+
+if sys.platform.startswith("java"):
 
-def default_encoding():
-    if sys.platform.startswith("java"):
+    def default_encoding():
         return "utf-8"
-    return sys.getfilesystemencoding()
+else:
+    def default_encoding():       # noqa
+        return sys.getfilesystemencoding()
 
 
 def safe_str(s, errors="replace"):
+    s = bytes_to_str(s)
     if not isinstance(s, basestring):
         return safe_repr(s, errors)
     return _safe_str(s, errors)
 
 
 def _safe_str(s, errors="replace"):
+    if is_py3k:
+        return s
     encoding = default_encoding()
     try:
-        if isinstance(s, unicode):
+        if isinstance(s, str):
             return s.encode(encoding, errors)
-        return unicode(s, encoding, errors)
+        return str(s, encoding, errors)
     except Exception, exc:
         return "<Unrepresentable %r: %r %r>" % (
                 type(s), exc, "\n".join(traceback.format_stack()))

+ 24 - 9
celery/utils/timer2.py

@@ -49,6 +49,21 @@ class Entry(object):
         return "<TimerEntry: %s(*%r, **%r)" % (
                 self.fun.__name__, self.args, self.kwargs)
 
+    if sys.version_info >= (3, 0):
+
+        def hash(self):
+            return hash("|".join(map(repr, (self.fun, self.args,
+                                            self.kwargs))))
+
+        def __lt__(self, other):
+            return hash(self) < hash(other)
+
+        def __gt__(self, other):
+            return hash(self) > hash(other)
+
+        def __eq__(self, other):
+            return hash(self) == hash(other)
+
 
 def to_timestamp(d):
     if isinstance(d, datetime):
@@ -148,8 +163,8 @@ class Timer(Thread):
         self.on_tick = on_tick or self.on_tick
 
         Thread.__init__(self)
-        self._shutdown = Event()
-        self._stopped = Event()
+        self._is_shutdown = Event()
+        self._is_stopped = Event()
         self.mutex = Lock()
         self.logger = logging.getLogger("timer2.Timer")
         self.not_empty = Condition(self.mutex)
@@ -165,7 +180,7 @@ class Timer(Thread):
                 warnings.warn(TimedFunctionFailed(repr(exc))),
                 traceback.print_exception(typ, val, tb)
 
-    def next(self):
+    def _next_entry(self):
         with self.not_empty:
             delay, entry = self.scheduler.next()
             if entry is None:
@@ -173,15 +188,15 @@ class Timer(Thread):
                     self.not_empty.wait(1.0)
                 return delay
         return self.apply_entry(entry)
-    __next__ = next  # for 2to3
+    __next__ = next = _next_entry  # for 2to3
 
     def run(self):
         try:
             self.running = True
             self.scheduler = iter(self.schedule)
 
-            while not self._shutdown.isSet():
-                delay = self.next()
+            while not self._is_shutdown.isSet():
+                delay = self._next_entry()
                 if delay:
                     if self.on_tick:
                         self.on_tick(delay)
@@ -189,7 +204,7 @@ class Timer(Thread):
                         break
                     sleep(delay)
             try:
-                self._stopped.set()
+                self._is_stopped.set()
             except TypeError:  # pragma: no cover
                 # we lost the race at interpreter shutdown,
                 # so gc collected built-in modules.
@@ -201,8 +216,8 @@ class Timer(Thread):
 
     def stop(self):
         if self.running:
-            self._shutdown.set()
-            self._stopped.wait()
+            self._is_shutdown.set()
+            self._is_stopped.wait()
             self.join(1e10)
             self.running = False
 

+ 6 - 6
celery/worker/autoscale.py

@@ -23,8 +23,8 @@ class Autoscaler(threading.Thread):
         self.keepalive = keepalive
         self.logger = logger
         self._last_action = None
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
+        self._is_shutdown = threading.Event()
+        self._is_stopped = threading.Event()
         self.setDaemon(True)
         self.setName(self.__class__.__name__)
 
@@ -94,7 +94,7 @@ class Autoscaler(threading.Thread):
             self._shrink(n)
 
     def run(self):
-        while not self._shutdown.isSet():
+        while not self._is_shutdown.isSet():
             try:
                 self.scale()
                 sleep(1.0)
@@ -102,11 +102,11 @@ class Autoscaler(threading.Thread):
                 self.logger.error("Thread Autoscaler crashed: %r", exc,
                                   exc_info=sys.exc_info())
                 os._exit(1)
-        self._stopped.set()
+        self._is_stopped.set()
 
     def stop(self):
-        self._shutdown.set()
-        self._stopped.wait()
+        self._is_shutdown.set()
+        self._is_stopped.wait()
         if self.isAlive():
             self.join(1e10)
 

+ 14 - 8
celery/worker/job.py

@@ -17,6 +17,7 @@ from ..datastructures import ExceptionInfo
 from ..execute.trace import TaskTrace
 from ..utils import noop, kwdict, fun_takes_kwargs, truncate_text
 from ..utils.encoding import safe_repr, safe_str, default_encoding
+from ..utils.serialization import get_pickleable_exception
 from ..utils.timeutils import maybe_iso8601
 
 from . import state
@@ -30,8 +31,14 @@ class InvalidTaskError(Exception):
     """The task has invalid data or is not properly constructed."""
 
 
-def default_encode(obj):
-    return unicode(obj, default_encoding())
+if sys.version_info >= (3, 0):
+
+    def default_encode(obj):
+        return obj
+else:
+
+    def default_encode(obj):  # noqa
+        return unicode(obj, default_encoding())
 
 
 class WorkerTaskTrace(TaskTrace):
@@ -129,9 +136,8 @@ class WorkerTaskTrace(TaskTrace):
     def handle_failure(self, exc, type_, tb, strtb):
         """Handle exception."""
         if self._store_errors:
-            exc = self.task.backend.mark_as_failure(self.task_id, exc, strtb)
-        else:
-            exc = self.task.backend.prepare_exception(exc)
+            self.task.backend.mark_as_failure(self.task_id, exc, strtb)
+        exc = get_pickleable_exception(exc)
         return self.super.handle_failure(exc, type_, tb, strtb)
 
 
@@ -473,11 +479,11 @@ class TaskRequest(object):
                    "name": self.task_name,
                    "exc": safe_repr(exc_info.exception),
                    "traceback": safe_str(exc_info.traceback),
-                   "args": self.args,
-                   "kwargs": self.kwargs}
+                   "args": safe_repr(self.args),
+                   "kwargs": safe_repr(self.kwargs)}
 
         self.logger.error(self.error_msg.strip(), context,
-                          exc_info=exc_info,
+                          exc_info=exc_info.exc_info,
                           extra={"data": {"id": self.task_id,
                                           "name": self.task_name,
                                           "hostname": self.hostname}})

+ 6 - 6
celery/worker/mediator.py

@@ -25,8 +25,8 @@ class Mediator(threading.Thread):
         self.logger = logger or self.app.log.get_default_logger()
         self.ready_queue = ready_queue
         self.callback = callback
-        self._shutdown = threading.Event()
-        self._stopped = threading.Event()
+        self._is_shutdown = threading.Event()
+        self._is_stopped = threading.Event()
         self.setDaemon(True)
         self.setName(self.__class__.__name__)
 
@@ -55,17 +55,17 @@ class Mediator(threading.Thread):
 
     def run(self):
         """Move tasks until :meth:`stop` is called."""
-        while not self._shutdown.isSet():
+        while not self._is_shutdown.isSet():
             try:
                 self.move()
             except Exception, exc:
                 self.logger.error("Mediator crash: %r", exc, exc_info=True)
                 # exiting by normal means does not work here, so force exit.
                 os._exit(1)
-        self._stopped.set()
+        self._is_stopped.set()
 
     def stop(self):
         """Gracefully shutdown the thread."""
-        self._shutdown.set()
-        self._stopped.wait()
+        self._is_shutdown.set()
+        self._is_stopped.wait()
         self.join(1e10)

+ 39 - 0
contrib/release/py3k-run-tests

@@ -0,0 +1,39 @@
+#!/bin/bash
+base=${1:-.}
+nosetests -vd celery.tests                                      \
+          --with-coverage3                                      \
+            --cover3-branch                                     \
+            --cover3-xml                                        \
+            --cover3-xml-file="$base/coverage.xml"              \
+            --cover3-html                                       \
+            --cover3-html-dir="$base/cover"                     \
+            --cover3-package=celery                             \
+            --cover3-exclude="                                  \
+              celery                                            \
+              celery.tests.*                                    \
+              celery.bin.celeryd_multi                          \
+              celery.bin.celeryd_detach                         \
+              celery.bin.celeryctl                              \
+              celery.bin.camqadm                                \
+              celery.execute                                    \
+              celery.local                                      \
+              celery.platforms                                  \
+              celery.utils.patch                                \
+              celery.utils.compat                               \
+              celery.utils.mail                                 \
+              celery.utils.functional                           \
+              celery.utils.dispatch*                            \
+              celery.utils.term                                 \
+              celery.db.a805d4bd                                \
+              celery.db.dfd042c7                                \
+              celery.contrib*                                   \
+              celery.concurrency.threads                        \
+              celery.concurrency.processes.pool                 \
+              celery.concurrency.gevent                         \
+              celery.backends.mongodb                           \
+              celery.backends.tyrant                            \
+              celery.backends.cassandra                         \
+              celery.events.dumper                              \
+              celery.events.cursesmon"                          \
+            --with-xunit                                        \
+              --xunit-file="$base/nosetests.xml"

+ 5 - 0
requirements/default-py3k.txt

@@ -0,0 +1,5 @@
+python-dateutil>=2.0.0
+anyjson>=0.3.1
+pytz
+kombu
+

+ 2 - 2
setup.py

@@ -48,12 +48,11 @@ try:
 except ImportError:
     install_requires.append("importlib")
 install_requires.extend([
-    "python-dateutil>=1.5.0,<2.0.0",
     "anyjson>=0.3.1",
     "kombu>=1.3.1,<2.0.0",
 ])
 if is_py3k:
-    install_requires.append("python-dateutil>2.0.0")
+    install_requires.append("python-dateutil>=2.0.0")
 else:
     install_requires.append("python-dateutil>=1.5.0,<2.0.0")
 
@@ -119,6 +118,7 @@ setup(
         "Programming Language :: Python :: 2.5",
         "Programming Language :: Python :: 2.6",
         "Programming Language :: Python :: 2.7",
+        "Programming Language :: Python :: 3",
     ],
     entry_points={
         'console_scripts': console_scripts,

+ 2 - 36
tox.ini

@@ -10,44 +10,10 @@ commands = nosetests
 recreate = True
 basepython = python3.2
 changedir = .tox
-deps = -r{toxinidir}/requirements/default.txt
+deps = -r{toxinidir}/requirements/default-py3k.txt
        -r{toxinidir}/requirements/test-py3k.txt
 commands = {toxinidir}/contrib/release/removepyc.sh {toxinidir}
-           nosetests -vd celery.tests                           \
-                --with-coverage3                                \
-                --cover3-branch                                 \
-                --cover3-xml                                    \
-                --cover3-xml-file={toxinidir}/coverage.xml      \
-                --cover3-html                                   \
-                --cover3-html-dir={toxinidir}/cover             \
-                --cover3-package=celery                         \
-                --cover3-exclude="celery celery.tests.*         \
-                        celery.bin.celeryd_multi                \
-                        celery.bin.celeryd_detach               \
-                        celery.bin.celeryctl                    \
-                        celery.bin.camqadm                      \
-                        celery.execute                          \
-                        celery.local                            \
-                        celery.platforms                        \
-                        celery.utils.patch                      \
-                        celery.utils.compat                     \
-                        celery.utils.mail                       \
-                        celery.utils.functional                 \
-                        celery.utils.dispatch*                  \
-                        celery.utils.term                       \
-                        celery.db.a805d4bd                      \
-                        celery.db.dfd042c7                      \
-                        celery.contrib*                         \
-                        celery.concurrency.threads              \
-                        celery.concurrency.processes.pool       \
-                        celery.concurrency.gevent               \
-                        celery.backends.mongodb                 \
-                        celery.backends.tyrant                  \
-                        celery.backends.cassandra               \
-                        celery.events.dumper                    \
-                        celery.events.cursesmon"                \
-                --with-xunit                                    \
-                --xunit-file={toxinidir}/nosetests.xml
+           {toxinidir}/contrib/release/py3k-run-tests {toxinidir}
 
 [testenv:py27]
 basepython = python2.7