فهرست منبع

Tests passing

Ask Solem 13 سال پیش
والد
کامیت
b4b239da34

+ 3 - 3
celery/backends/cache.py

@@ -3,7 +3,7 @@ from __future__ import absolute_import
 from ..datastructures import LRUCache
 from ..datastructures import LRUCache
 from ..exceptions import ImproperlyConfigured
 from ..exceptions import ImproperlyConfigured
 from ..utils import cached_property
 from ..utils import cached_property
-from ..utils.encoding import str_to_bytes
+from ..utils.encoding import ensure_bytes
 
 
 from .base import KeyValueStoreBackend
 from .base import KeyValueStoreBackend
 
 
@@ -84,10 +84,10 @@ class CacheBackend(KeyValueStoreBackend):
                                                 ", ".join(backends.keys())))
                                                 ", ".join(backends.keys())))
 
 
     def get_key_for_task(self, task_id):
     def get_key_for_task(self, task_id):
-        return str_to_bytes(self.task_keyprefix + task_id)
+        return ensure_bytes(self.task_keyprefix) + ensure_bytes(task_id)
 
 
     def get_key_for_taskset(self, taskset_id):
     def get_key_for_taskset(self, taskset_id):
-        return str_to_bytes(self.taskset_keyprefix + taskset_id)
+        return ensure_bytes(self.taskset_keyprefix) + ensure_bytes(taskset_id)
 
 
     def get(self, key):
     def get(self, key):
         return self.client.get(key)
         return self.client.get(key)

+ 6 - 4
celery/datastructures.py

@@ -341,10 +341,10 @@ class LRUCache(UserDict):
         return self.data.keys()
         return self.data.keys()
 
 
     def values(self):
     def values(self):
-        return list(self.itervalues())
+        return list(self._iterate_values())
 
 
     def items(self):
     def items(self):
-        return list(self.iteritems())
+        return list(self._iterate_items())
 
 
     def __setitem__(self, key, value):
     def __setitem__(self, key, value):
         # remove least recently used key.
         # remove least recently used key.
@@ -356,19 +356,21 @@ class LRUCache(UserDict):
     def __iter__(self):
     def __iter__(self):
         return self.data.iterkeys()
         return self.data.iterkeys()
 
 
-    def iteritems(self):
+    def _iterate_items(self):
         for k in self.data:
         for k in self.data:
             try:
             try:
                 yield (k, self.data[k])
                 yield (k, self.data[k])
             except KeyError:
             except KeyError:
                 pass
                 pass
+    iteritems = _iterate_items
 
 
-    def itervalues(self):
+    def _iterate_values(self):
         for k in self.data:
         for k in self.data:
             try:
             try:
                 yield self.data[k]
                 yield self.data[k]
             except KeyError:
             except KeyError:
                 pass
                 pass
+    itervalues = _iterate_values
 
 
 
 
 class TokenBucket(object):
 class TokenBucket(object):

+ 7 - 1
celery/task/sets.py

@@ -115,6 +115,12 @@ class subtask(AttributeDict):
         return registry.tasks[self.task]
         return registry.tasks[self.task]
 
 
 
 
+def maybe_subtask(t):
+    if not isinstance(t, subtask):
+        return subtask(t)
+    return t
+
+
 class TaskSet(UserList):
 class TaskSet(UserList):
     """A task containing several subtasks, making it possible
     """A task containing several subtasks, making it possible
     to track how many, or when all of the tasks have been completed.
     to track how many, or when all of the tasks have been completed.
@@ -139,7 +145,7 @@ class TaskSet(UserList):
         self.app = app_or_default(app)
         self.app = app_or_default(app)
         if task is not None:
         if task is not None:
             if hasattr(task, "__iter__"):
             if hasattr(task, "__iter__"):
-                tasks = [subtask(t) for t in task]
+                tasks = [maybe_subtask(t) for t in task]
             else:
             else:
                 # Previously TaskSet only supported applying one kind of task.
                 # Previously TaskSet only supported applying one kind of task.
                 # the signature then was TaskSet(task, arglist),
                 # the signature then was TaskSet(task, arglist),

+ 3 - 2
celery/tests/test_backends/test_cache.py

@@ -11,6 +11,7 @@ from celery.backends.cache import CacheBackend, DummyClient
 from celery.exceptions import ImproperlyConfigured
 from celery.exceptions import ImproperlyConfigured
 from celery.result import AsyncResult
 from celery.result import AsyncResult
 from celery.utils import uuid
 from celery.utils import uuid
+from celery.utils.encoding import str_to_bytes
 
 
 from celery.tests.utils import unittest, mask_modules, reset_modules
 from celery.tests.utils import unittest, mask_modules, reset_modules
 
 
@@ -185,7 +186,7 @@ class test_memcache_key(unittest.TestCase, MockCacheMixin):
                 with mask_modules("pylibmc"):
                 with mask_modules("pylibmc"):
                     from celery.backends import cache
                     from celery.backends import cache
                     cache._imp = [None]
                     cache._imp = [None]
-                    task_id, result = bytes(uuid()), 42
+                    task_id, result = str_to_bytes(uuid()), 42
                     b = cache.CacheBackend(backend='memcache')
                     b = cache.CacheBackend(backend='memcache')
                     b.store_result(task_id, result, status=states.SUCCESS)
                     b.store_result(task_id, result, status=states.SUCCESS)
                     self.assertEqual(b.get_result(task_id), result)
                     self.assertEqual(b.get_result(task_id), result)
@@ -205,7 +206,7 @@ class test_memcache_key(unittest.TestCase, MockCacheMixin):
             with self.mock_pylibmc():
             with self.mock_pylibmc():
                 from celery.backends import cache
                 from celery.backends import cache
                 cache._imp = [None]
                 cache._imp = [None]
-                task_id, result = bytes(uuid()), 42
+                task_id, result = str_to_bytes(uuid()), 42
                 b = cache.CacheBackend(backend='memcache')
                 b = cache.CacheBackend(backend='memcache')
                 b.store_result(task_id, result, status=states.SUCCESS)
                 b.store_result(task_id, result, status=states.SUCCESS)
                 self.assertEqual(b.get_result(task_id), result)
                 self.assertEqual(b.get_result(task_id), result)

+ 2 - 0
celery/tests/test_task/test_task_sets.py

@@ -2,6 +2,7 @@ from __future__ import absolute_import
 from __future__ import with_statement
 from __future__ import with_statement
 
 
 import anyjson
 import anyjson
+import warnings
 
 
 from celery import registry
 from celery import registry
 from celery.app import app_or_default
 from celery.app import app_or_default
@@ -103,6 +104,7 @@ class test_subtask(unittest.TestCase):
 class test_TaskSet(unittest.TestCase):
 class test_TaskSet(unittest.TestCase):
 
 
     def test_interface__compat(self):
     def test_interface__compat(self):
+        warnings.resetwarnings()
         with catch_warnings(record=True) as log:
         with catch_warnings(record=True) as log:
             ts = TaskSet(MockTask, [[(2, 2)], [(4, 4)], [(8, 8)]])
             ts = TaskSet(MockTask, [[(2, 2)], [(4, 4)], [(8, 8)]])
             self.assertListEqual(ts.tasks,
             self.assertListEqual(ts.tasks,

+ 0 - 2
celery/tests/test_worker/__init__.py

@@ -607,9 +607,7 @@ class test_Consumer(unittest.TestCase):
             def __exit__(self, *exc_info):
             def __exit__(self, *exc_info):
                 self.cancel()
                 self.cancel()
 
 
-
         l.pidbox_node.listen = BConsumer()
         l.pidbox_node.listen = BConsumer()
-
         connections = []
         connections = []
 
 
         class Connection(object):
         class Connection(object):

+ 3 - 3
celery/utils/compat.py

@@ -156,14 +156,14 @@ try:
 except ImportError:
 except ImportError:
     LoggerAdapter = _CompatLoggerAdapter  # noqa
     LoggerAdapter = _CompatLoggerAdapter  # noqa
 
 
-############## itertools.izip_longest #######################################
+############## itertools.zip_longest #######################################
 
 
 try:
 try:
-    from itertools import izip_longest
+    from itertools import izip_longest as zip_longest
 except ImportError:
 except ImportError:
     import itertools
     import itertools
 
 
-    def izip_longest(*args, **kwds):  # noqa
+    def zip_longest(*args, **kwds):  # noqa
         fillvalue = kwds.get("fillvalue")
         fillvalue = kwds.get("fillvalue")
 
 
         def sentinel(counter=([fillvalue] * (len(args) - 1)).pop):
         def sentinel(counter=([fillvalue] * (len(args) - 1)).pop):

+ 11 - 1
celery/utils/encoding.py

@@ -13,7 +13,7 @@ import sys
 import traceback
 import traceback
 
 
 __all__ = ["str_to_bytes", "bytes_to_str", "from_utf8",
 __all__ = ["str_to_bytes", "bytes_to_str", "from_utf8",
-           "default_encoding", "safe_str", "safe_repr"]
+           "default_encoding", "safe_str", "safe_repr", "bytes_t"]
 is_py3k = sys.version_info >= (3, 0)
 is_py3k = sys.version_info >= (3, 0)
 
 
 
 
@@ -32,6 +32,13 @@ if is_py3k:
     def from_utf8(s, *args, **kwargs):
     def from_utf8(s, *args, **kwargs):
         return s
         return s
 
 
+    def ensure_bytes(s):
+        if not isinstance(s, bytes):
+            return str_to_bytes(s)
+        return s
+
+    bytes_t = bytes
+
 else:
 else:
 
 
     def str_to_bytes(s):                # noqa
     def str_to_bytes(s):                # noqa
@@ -45,6 +52,9 @@ else:
     def from_utf8(s, *args, **kwargs):  # noqa
     def from_utf8(s, *args, **kwargs):  # noqa
         return s.encode("utf-8", *args, **kwargs)
         return s.encode("utf-8", *args, **kwargs)
 
 
+    bytes_t = str
+    ensure_bytes = str_to_bytes
+
 
 
 if sys.platform.startswith("java"):
 if sys.platform.startswith("java"):
 
 

+ 1 - 1
celery/utils/timer2.py

@@ -57,7 +57,7 @@ class Entry(object):
 
 
     if sys.version_info >= (3, 0):
     if sys.version_info >= (3, 0):
 
 
-        def hash(self):
+        def __hash__(self):
             return hash("|".join(map(repr, (self.fun, self.args,
             return hash("|".join(map(repr, (self.fun, self.args,
                                             self.kwargs))))
                                             self.kwargs))))
 
 

+ 0 - 2
celery/worker/__init__.py

@@ -11,7 +11,6 @@ from __future__ import absolute_import
 import atexit
 import atexit
 import logging
 import logging
 import socket
 import socket
-import sys
 import threading
 import threading
 import traceback
 import traceback
 
 
@@ -267,7 +266,6 @@ class WorkController(object):
                 self._running = i + 1
                 self._running = i + 1
                 blocking(component.start)
                 blocking(component.start)
         except SystemTerminate:
         except SystemTerminate:
-            print("GOT TERMINATE")
             self.terminate()
             self.terminate()
         except:
         except:
             self.stop()
             self.stop()

+ 2 - 2
celery/worker/buckets.py

@@ -23,7 +23,7 @@ from Queue import Queue, Empty
 
 
 from ..datastructures import TokenBucket
 from ..datastructures import TokenBucket
 from ..utils import timeutils
 from ..utils import timeutils
-from ..utils.compat import izip_longest, chain_from_iterable
+from ..utils.compat import zip_longest, chain_from_iterable
 
 
 __all__ = ["RateLimitExceeded", "TaskBucket",
 __all__ = ["RateLimitExceeded", "TaskBucket",
            "FastQueue", "TokenBucketQueue"]
            "FastQueue", "TokenBucketQueue"]
@@ -216,7 +216,7 @@ class TaskBucket(object):
         """Flattens the data in all of the buckets into a single list."""
         """Flattens the data in all of the buckets into a single list."""
         # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
         # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
         # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
         # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
-        return filter(None, chain_from_iterable(izip_longest(*[bucket.items
+        return filter(None, chain_from_iterable(zip_longest(*[bucket.items
                                     for bucket in self.buckets.values()])))
                                     for bucket in self.buckets.values()])))
 
 
 
 

+ 1 - 0
requirements/test-py3k.txt

@@ -1,3 +1,4 @@
+distribute
 nose
 nose
 nose-cover3
 nose-cover3
 coverage>=3.0
 coverage>=3.0

+ 5 - 1
tox.ini

@@ -11,8 +11,12 @@ recreate = True
 basepython = python3.2
 basepython = python3.2
 changedir = .tox
 changedir = .tox
 deps = -r{toxinidir}/requirements/default-py3k.txt
 deps = -r{toxinidir}/requirements/default-py3k.txt
-       -r{toxinidir}/requirements/test-py3k.txt
 commands = {toxinidir}/contrib/release/removepyc.sh {toxinidir}
 commands = {toxinidir}/contrib/release/removepyc.sh {toxinidir}
+           {envbindir}/easy_install -U distribute
+           {envbindir}/pip install                              \
+                --download-cache={toxworkdir}/_download         \
+                -r{toxinidir}/requirements/test-py3k.txt
+
            {toxinidir}/contrib/release/py3k-run-tests {toxinidir}
            {toxinidir}/contrib/release/py3k-run-tests {toxinidir}
 
 
 [testenv:py27]
 [testenv:py27]