Sfoglia il codice sorgente

Use uuid() instead of gen_unique_id()

Ask Solem 13 anni fa
parent
commit
3087421869

+ 2 - 2
celery/app/amqp.py

@@ -18,7 +18,7 @@ from kombu.utils import cached_property
 
 from .. import routes as _routes
 from .. import signals
-from ..utils import gen_unique_id, textindent
+from ..utils import textindent, uuid
 
 #: List of known options to a Kombu producers send method.
 #: Used to extract the message related options out of any `dict`.
@@ -199,7 +199,7 @@ class TaskPublisher(messaging.Publisher):
                     exchange_type or self.exchange_type, retry, _retry_policy)
             _exchanges_declared.add(exchange)
 
-        task_id = task_id or gen_unique_id()
+        task_id = task_id or uuid()
         task_args = task_args or []
         task_kwargs = task_kwargs or {}
         if not isinstance(task_args, (list, tuple)):

+ 2 - 2
celery/app/task/__init__.py

@@ -9,7 +9,7 @@ from ...exceptions import MaxRetriesExceededError, RetryTaskError
 from ...execute.trace import TaskTrace
 from ...registry import tasks, _unpickle_task
 from ...result import EagerResult
-from ...utils import mattrgetter, gen_unique_id, fun_takes_kwargs
+from ...utils import fun_takes_kwargs, mattrgetter, uuid
 
 extract_exec_options = mattrgetter("queue", "routing_key",
                                    "exchange", "immediate",
@@ -547,7 +547,7 @@ class BaseTask(object):
         """
         args = args or []
         kwargs = kwargs or {}
-        task_id = options.get("task_id") or gen_unique_id()
+        task_id = options.get("task_id") or uuid()
         retries = options.get("retries", 0)
         throw = self.app.either("CELERY_EAGER_PROPAGATES_EXCEPTIONS",
                                 options.pop("throw", None))

+ 2 - 2
celery/events/__init__.py

@@ -13,7 +13,7 @@ from kombu.entity import Exchange, Queue
 from kombu.messaging import Consumer, Producer
 
 from ..app import app_or_default
-from ..utils import gen_unique_id
+from ..utils import uuid
 
 event_exchange = Exchange("celeryev", type="topic")
 
@@ -145,7 +145,7 @@ class EventReceiver(object):
         if handlers is not None:
             self.handlers = handlers
         self.routing_key = routing_key
-        self.node_id = node_id or gen_unique_id()
+        self.node_id = node_id or uuid()
         self.queue = Queue("%s.%s" % ("celeryev", self.node_id),
                            exchange=event_exchange,
                            routing_key=self.routing_key,

+ 7 - 7
celery/task/chords.py

@@ -2,7 +2,7 @@ from __future__ import absolute_import
 
 from .. import current_app
 from ..result import TaskSetResult
-from ..utils import gen_unique_id
+from ..utils import uuid
 
 from .sets import TaskSet, subtask
 
@@ -27,11 +27,11 @@ class Chord(current_app.Task):
         if not isinstance(set, TaskSet):
             set = TaskSet(set)
         r = []
-        setid = gen_unique_id()
+        setid = uuid()
         for task in set.tasks:
-            uuid = gen_unique_id()
-            task.options.update(task_id=uuid, chord=body)
-            r.append(current_app.AsyncResult(uuid))
+            tid = uuid()
+            task.options.update(task_id=tid, chord=body)
+            r.append(current_app.AsyncResult(tid))
         current_app.TaskSetResult(setid, r).save()
         self.backend.on_chord_apply(setid, body, interval,
                                     max_retries=max_retries,
@@ -47,7 +47,7 @@ class chord(object):
         self.options = options
 
     def __call__(self, body, **options):
-        uuid = body.options.setdefault("task_id", gen_unique_id())
+        tid = body.options.setdefault("task_id", uuid())
         self.Chord.apply_async((list(self.tasks), body), self.options,
                                 **options)
-        return body.type.app.AsyncResult(uuid)
+        return body.type.app.AsyncResult(tid)

+ 3 - 3
celery/task/sets.py

@@ -6,7 +6,7 @@ import warnings
 from .. import registry
 from ..app import app_or_default
 from ..datastructures import AttributeDict
-from ..utils import cached_property, gen_unique_id, reprcall
+from ..utils import cached_property, reprcall, uuid
 from ..utils.compat import UserList
 
 TASKSET_DEPRECATION_TEXT = """\
@@ -152,7 +152,7 @@ class TaskSet(UserList):
             return self.apply(taskset_id=taskset_id)
 
         with app.default_connection(connection, connect_timeout) as conn:
-            setid = taskset_id or gen_unique_id()
+            setid = taskset_id or uuid()
             pub = publisher or self.Publisher(connection=conn)
             try:
                 results = self._async_results(setid, pub)
@@ -168,7 +168,7 @@ class TaskSet(UserList):
 
     def apply(self, taskset_id=None):
         """Applies the taskset locally by blocking until all tasks return."""
-        setid = taskset_id or gen_unique_id()
+        setid = taskset_id or uuid()
         return self.app.TaskSetResult(setid, self._sync_results(setid))
 
     def _sync_results(self, taskset_id):

+ 2 - 2
celery/tests/test_app/test_beat.py

@@ -10,7 +10,7 @@ from celery import registry
 from celery.result import AsyncResult
 from celery.schedules import schedule
 from celery.task.base import Task
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 
 
 class Object(object):
@@ -119,7 +119,7 @@ class mScheduler(beat.Scheduler):
                           "args": args,
                           "kwargs": kwargs,
                           "options": options})
-        return AsyncResult(gen_unique_id())
+        return AsyncResult(uuid())
 
 
 class mSchedulerSchedulingError(mScheduler):

+ 37 - 37
celery/tests/test_backends/test_amqp.py

@@ -12,7 +12,7 @@ from celery.app import app_or_default
 from celery.backends.amqp import AMQPBackend
 from celery.datastructures import ExceptionInfo
 from celery.exceptions import TimeoutError
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 
 from celery.tests.utils import unittest
 from celery.tests.utils import sleepdeprived
@@ -34,7 +34,7 @@ class test_AMQPBackend(unittest.TestCase):
         tb1 = self.create_backend()
         tb2 = self.create_backend()
 
-        tid = gen_unique_id()
+        tid = uuid()
 
         tb1.mark_as_done(tid, 42)
         self.assertEqual(tb2.get_status(tid), states.SUCCESS)
@@ -46,7 +46,7 @@ class test_AMQPBackend(unittest.TestCase):
         tb1 = self.create_backend()
         tb2 = self.create_backend()
 
-        tid2 = gen_unique_id()
+        tid2 = uuid()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         tb1.mark_as_done(tid2, result)
         # is serialized properly.
@@ -58,7 +58,7 @@ class test_AMQPBackend(unittest.TestCase):
         tb1 = self.create_backend()
         tb2 = self.create_backend()
 
-        tid3 = gen_unique_id()
+        tid3 = uuid()
         try:
             raise KeyError("foo")
         except KeyError, exception:
@@ -71,8 +71,8 @@ class test_AMQPBackend(unittest.TestCase):
     def test_repair_uuid(self):
         from celery.backends.amqp import repair_uuid
         for i in range(10):
-            uuid = gen_unique_id()
-            self.assertEqual(repair_uuid(uuid.replace("-", "")), uuid)
+            tid = uuid()
+            self.assertEqual(repair_uuid(tid.replace("-", "")), tid)
 
     def test_expires_defaults_to_config(self):
         app = app_or_default()
@@ -122,7 +122,7 @@ class test_AMQPBackend(unittest.TestCase):
 
     def test_poll_no_messages(self):
         b = self.create_backend()
-        self.assertState(b.poll(gen_unique_id()), states.PENDING)
+        self.assertState(b.poll(uuid()), states.PENDING)
 
     def test_poll_result(self):
 
@@ -160,41 +160,41 @@ class test_AMQPBackend(unittest.TestCase):
         results.put(Message(status=states.RECEIVED, seq=1))
         results.put(Message(status=states.STARTED, seq=2))
         results.put(Message(status=states.FAILURE, seq=3))
-        r1 = backend.poll(gen_unique_id())
+        r1 = backend.poll(uuid())
         self.assertDictContainsSubset({"status": states.FAILURE,
                                        "seq": 3}, r1,
                                        "FFWDs to the last state")
 
         # Caches last known state.
         results.put(Message())
-        uuid = gen_unique_id()
-        backend.poll(uuid)
-        self.assertIn(uuid, backend._cache, "Caches last known state")
+        tid = uuid()
+        backend.poll(tid)
+        self.assertIn(tid, backend._cache, "Caches last known state")
 
         # Returns cache if no new states.
         results.queue.clear()
         assert not results.qsize()
-        backend._cache[uuid] = "hello"
-        self.assertEqual(backend.poll(uuid), "hello",
+        backend._cache[tid] = "hello"
+        self.assertEqual(backend.poll(tid), "hello",
                          "Returns cache if no new states")
 
     def test_wait_for(self):
         b = self.create_backend()
 
-        uuid = gen_unique_id()
-        self.assertRaises(TimeoutError, b.wait_for, uuid, timeout=0.1)
-        b.store_result(uuid, None, states.STARTED)
-        self.assertRaises(TimeoutError, b.wait_for, uuid, timeout=0.1)
-        b.store_result(uuid, None, states.RETRY)
-        self.assertRaises(TimeoutError, b.wait_for, uuid, timeout=0.1)
-        b.store_result(uuid, 42, states.SUCCESS)
-        self.assertEqual(b.wait_for(uuid, timeout=1), 42)
-        b.store_result(uuid, 56, states.SUCCESS)
-        self.assertEqual(b.wait_for(uuid, timeout=1), 42,
+        tid = uuid()
+        self.assertRaises(TimeoutError, b.wait_for, tid, timeout=0.1)
+        b.store_result(tid, None, states.STARTED)
+        self.assertRaises(TimeoutError, b.wait_for, tid, timeout=0.1)
+        b.store_result(tid, None, states.RETRY)
+        self.assertRaises(TimeoutError, b.wait_for, tid, timeout=0.1)
+        b.store_result(tid, 42, states.SUCCESS)
+        self.assertEqual(b.wait_for(tid, timeout=1), 42)
+        b.store_result(tid, 56, states.SUCCESS)
+        self.assertEqual(b.wait_for(tid, timeout=1), 42,
                          "result is cached")
-        self.assertEqual(b.wait_for(uuid, timeout=1, cache=False), 56)
-        b.store_result(uuid, KeyError("foo"), states.FAILURE)
-        self.assertRaises(KeyError, b.wait_for, uuid, timeout=1, cache=False)
+        self.assertEqual(b.wait_for(tid, timeout=1, cache=False), 56)
+        b.store_result(tid, KeyError("foo"), states.FAILURE)
+        self.assertRaises(KeyError, b.wait_for, tid, timeout=1, cache=False)
 
     def test_drain_events_remaining_timeouts(self):
 
@@ -205,7 +205,7 @@ class test_AMQPBackend(unittest.TestCase):
 
         b = self.create_backend()
         with current_app.pool.acquire_channel(block=False) as (_, channel):
-            binding = b._create_binding(gen_unique_id())
+            binding = b._create_binding(uuid())
             consumer = b._create_consumer(binding, channel)
             self.assertRaises(socket.timeout, b.drain_events,
                               Connection(), consumer, timeout=0.1)
@@ -213,25 +213,25 @@ class test_AMQPBackend(unittest.TestCase):
     def test_get_many(self):
         b = self.create_backend()
 
-        uuids = []
+        tids = []
         for i in xrange(10):
-            uuid = gen_unique_id()
-            b.store_result(uuid, i, states.SUCCESS)
-            uuids.append(uuid)
+            tid = uuid()
+            b.store_result(tid, i, states.SUCCESS)
+            tids.append(tid)
 
-        res = list(b.get_many(uuids, timeout=1))
-        expected_results = [(uuid, {"status": states.SUCCESS,
+        res = list(b.get_many(tids, timeout=1))
+        expected_results = [(tid, {"status": states.SUCCESS,
                                     "result": i,
                                     "traceback": None,
-                                    "task_id": uuid})
-                                for i, uuid in enumerate(uuids)]
+                                    "task_id": tid})
+                                for i, tid in enumerate(tids)]
         self.assertEqual(sorted(res), sorted(expected_results))
         self.assertDictEqual(b._cache[res[0][0]], res[0][1])
-        cached_res = list(b.get_many(uuids, timeout=1))
+        cached_res = list(b.get_many(tids, timeout=1))
         self.assertEqual(sorted(cached_res), sorted(expected_results))
         b._cache[res[0][0]]["status"] = states.RETRY
         self.assertRaises(socket.timeout, list,
-                          b.get_many(uuids, timeout=0.01))
+                          b.get_many(tids, timeout=0.01))
 
     def test_test_get_many_raises_outer_block(self):
 

+ 4 - 4
celery/tests/test_backends/test_base.py

@@ -15,7 +15,7 @@ from celery.utils.serialization import get_pickleable_exception as gpe
 from celery import states
 from celery.backends.base import BaseBackend, KeyValueStoreBackend
 from celery.backends.base import BaseDictBackend, DisabledBackend
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 
 from celery.tests.utils import unittest
 
@@ -230,7 +230,7 @@ class test_KeyValueStoreBackend(unittest.TestCase):
         self.b = KVBackend()
 
     def test_get_store_delete_result(self):
-        tid = gen_unique_id()
+        tid = uuid()
         self.b.mark_as_done(tid, "Hello world")
         self.assertEqual(self.b.get_result(tid), "Hello world")
         self.assertEqual(self.b.get_status(tid), states.SUCCESS)
@@ -245,7 +245,7 @@ class test_KeyValueStoreBackend(unittest.TestCase):
     def test_get_many(self):
         for is_dict in True, False:
             self.b.mget_returns_dict = is_dict
-            ids = dict((gen_unique_id(), i) for i in xrange(10))
+            ids = dict((uuid(), i) for i in xrange(10))
             for id, i in ids.items():
                 self.b.mark_as_done(id, i)
             it = self.b.get_many(ids.keys())
@@ -259,7 +259,7 @@ class test_KeyValueStoreBackend(unittest.TestCase):
         self.assertEqual(self.b.get_status("xxx-missing"), states.PENDING)
 
     def test_save_restore_delete_taskset(self):
-        tid = gen_unique_id()
+        tid = uuid()
         self.b.save_taskset(tid, "Hello world")
         self.assertEqual(self.b.restore_taskset(tid), "Hello world")
         self.b.delete_taskset(tid)

+ 5 - 5
celery/tests/test_backends/test_cache.py

@@ -9,7 +9,7 @@ from celery import states
 from celery.backends.cache import CacheBackend, DummyClient
 from celery.exceptions import ImproperlyConfigured
 from celery.result import AsyncResult
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 
 from celery.tests.utils import unittest, mask_modules, reset_modules
 
@@ -25,7 +25,7 @@ class test_CacheBackend(unittest.TestCase):
     def test_mark_as_done(self):
         tb = CacheBackend(backend="memory://")
 
-        tid = gen_unique_id()
+        tid = uuid()
 
         self.assertEqual(tb.get_status(tid), states.PENDING)
         self.assertIsNone(tb.get_result(tid))
@@ -37,7 +37,7 @@ class test_CacheBackend(unittest.TestCase):
     def test_is_pickled(self):
         tb = CacheBackend(backend="memory://")
 
-        tid2 = gen_unique_id()
+        tid2 = uuid()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         tb.mark_as_done(tid2, result)
         # is serialized properly.
@@ -48,7 +48,7 @@ class test_CacheBackend(unittest.TestCase):
     def test_mark_as_failure(self):
         tb = CacheBackend(backend="memory://")
 
-        tid3 = gen_unique_id()
+        tid3 = uuid()
         try:
             raise KeyError("foo")
         except KeyError, exception:
@@ -67,7 +67,7 @@ class test_CacheBackend(unittest.TestCase):
 
     def test_forget(self):
         tb = CacheBackend(backend="memory://")
-        tid = gen_unique_id()
+        tid = uuid()
         tb.mark_as_done(tid, {"foo": "bar"})
         x = AsyncResult(tid, backend=tb)
         x.forget()

+ 11 - 11
celery/tests/test_backends/test_database.py

@@ -10,7 +10,7 @@ from celery import states
 from celery.app import app_or_default
 from celery.exceptions import ImproperlyConfigured
 from celery.result import AsyncResult
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 
 from celery.tests.utils import mask_modules
 from celery.tests.utils import unittest
@@ -86,7 +86,7 @@ class test_DatabaseBackend(unittest.TestCase):
     def test_mark_as_done(self):
         tb = DatabaseBackend()
 
-        tid = gen_unique_id()
+        tid = uuid()
 
         self.assertEqual(tb.get_status(tid), states.PENDING)
         self.assertIsNone(tb.get_result(tid))
@@ -98,7 +98,7 @@ class test_DatabaseBackend(unittest.TestCase):
     def test_is_pickled(self):
         tb = DatabaseBackend()
 
-        tid2 = gen_unique_id()
+        tid2 = uuid()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         tb.mark_as_done(tid2, result)
         # is serialized properly.
@@ -108,19 +108,19 @@ class test_DatabaseBackend(unittest.TestCase):
 
     def test_mark_as_started(self):
         tb = DatabaseBackend()
-        tid = gen_unique_id()
+        tid = uuid()
         tb.mark_as_started(tid)
         self.assertEqual(tb.get_status(tid), states.STARTED)
 
     def test_mark_as_revoked(self):
         tb = DatabaseBackend()
-        tid = gen_unique_id()
+        tid = uuid()
         tb.mark_as_revoked(tid)
         self.assertEqual(tb.get_status(tid), states.REVOKED)
 
     def test_mark_as_retry(self):
         tb = DatabaseBackend()
-        tid = gen_unique_id()
+        tid = uuid()
         try:
             raise KeyError("foo")
         except KeyError, exception:
@@ -134,7 +134,7 @@ class test_DatabaseBackend(unittest.TestCase):
     def test_mark_as_failure(self):
         tb = DatabaseBackend()
 
-        tid3 = gen_unique_id()
+        tid3 = uuid()
         try:
             raise KeyError("foo")
         except KeyError, exception:
@@ -147,7 +147,7 @@ class test_DatabaseBackend(unittest.TestCase):
 
     def test_forget(self):
         tb = DatabaseBackend(backend="memory://")
-        tid = gen_unique_id()
+        tid = uuid()
         tb.mark_as_done(tid, {"foo": "bar"})
         x = AsyncResult(tid)
         x.forget()
@@ -160,7 +160,7 @@ class test_DatabaseBackend(unittest.TestCase):
     def test_save__restore__delete_taskset(self):
         tb = DatabaseBackend()
 
-        tid = gen_unique_id()
+        tid = uuid()
         res = {u"something": "special"}
         self.assertEqual(tb.save_taskset(tid, res), res)
 
@@ -175,8 +175,8 @@ class test_DatabaseBackend(unittest.TestCase):
     def test_cleanup(self):
         tb = DatabaseBackend()
         for i in range(10):
-            tb.mark_as_done(gen_unique_id(), 42)
-            tb.save_taskset(gen_unique_id(), {"foo": "bar"})
+            tb.mark_as_done(uuid(), 42)
+            tb.save_taskset(uuid(), {"foo": "bar"})
         s = tb.ResultSession()
         for t in s.query(Task).all():
             t.date_done = datetime.now() - tb.expires * 2

+ 4 - 4
celery/tests/test_backends/test_redis.py

@@ -10,7 +10,7 @@ from nose import SkipTest
 from celery.exceptions import ImproperlyConfigured
 
 from celery import states
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.backends import redis
 from celery.backends.redis import RedisBackend
 
@@ -63,7 +63,7 @@ class TestRedisBackend(unittest.TestCase):
     def test_mark_as_done(self):
         tb = get_redis_or_SkipTest()
 
-        tid = gen_unique_id()
+        tid = uuid()
 
         self.assertEqual(tb.get_status(tid), states.PENDING)
         self.assertIsNone(tb.get_result(tid))
@@ -75,7 +75,7 @@ class TestRedisBackend(unittest.TestCase):
     def test_is_pickled(self):
         tb = get_redis_or_SkipTest()
 
-        tid2 = gen_unique_id()
+        tid2 = uuid()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         tb.mark_as_done(tid2, result)
         # is serialized properly.
@@ -86,7 +86,7 @@ class TestRedisBackend(unittest.TestCase):
     def test_mark_as_failure(self):
         tb = get_redis_or_SkipTest()
 
-        tid3 = gen_unique_id()
+        tid3 = uuid()
         try:
             raise KeyError("foo")
         except KeyError, exception:

+ 10 - 10
celery/tests/test_backends/test_redis_unit.py

@@ -5,7 +5,7 @@ from kombu.utils import cached_property
 
 from celery import current_app
 from celery import states
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.utils.timeutils import timedelta_seconds
 
 from celery.tests.utils import unittest
@@ -134,16 +134,16 @@ class test_RedisBackend(unittest.TestCase):
 
     def test_get_set_forget(self):
         b = self.Backend()
-        uuid = gen_unique_id()
-        b.store_result(uuid, 42, states.SUCCESS)
-        self.assertEqual(b.get_status(uuid), states.SUCCESS)
-        self.assertEqual(b.get_result(uuid), 42)
-        b.forget(uuid)
-        self.assertEqual(b.get_status(uuid), states.PENDING)
+        tid = uuid()
+        b.store_result(tid, 42, states.SUCCESS)
+        self.assertEqual(b.get_status(tid), states.SUCCESS)
+        self.assertEqual(b.get_result(tid), 42)
+        b.forget(tid)
+        self.assertEqual(b.get_status(tid), states.PENDING)
 
     def test_set_expires(self):
         b = self.Backend(expires=512)
-        uuid = gen_unique_id()
-        key = b.get_key_for_task(uuid)
-        b.store_result(uuid, 42, states.SUCCESS)
+        tid = uuid()
+        key = b.get_key_for_task(tid)
+        b.store_result(tid, 42, states.SUCCESS)
         self.assertEqual(b.client.expiry[key], 512)

+ 4 - 4
celery/tests/test_backends/test_tyrant.py

@@ -7,7 +7,7 @@ from nose import SkipTest
 from celery.exceptions import ImproperlyConfigured
 
 from celery import states
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.backends import tyrant
 from celery.backends.tyrant import TyrantBackend
 
@@ -63,7 +63,7 @@ class TestTyrantBackend(unittest.TestCase):
     def test_mark_as_done(self):
         tb = get_tyrant_or_SkipTest()
 
-        tid = gen_unique_id()
+        tid = uuid()
 
         self.assertEqual(tb.get_status(tid), states.PENDING)
         self.assertIsNone(tb.get_result(tid), None)
@@ -75,7 +75,7 @@ class TestTyrantBackend(unittest.TestCase):
     def test_is_pickled(self):
         tb = get_tyrant_or_SkipTest()
 
-        tid2 = gen_unique_id()
+        tid2 = uuid()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         tb.mark_as_done(tid2, result)
         # is serialized properly.
@@ -86,7 +86,7 @@ class TestTyrantBackend(unittest.TestCase):
     def test_mark_as_failure(self):
         tb = get_tyrant_or_SkipTest()
 
-        tid3 = gen_unique_id()
+        tid3 = uuid()
         try:
             raise KeyError("foo")
         except KeyError, exception:

+ 3 - 3
celery/tests/test_compat/test_log.py

@@ -10,7 +10,7 @@ from celery.log import (setup_logger, setup_task_logger,
                         get_default_logger, get_task_logger,
                         redirect_stdouts_to_logger, LoggingProxy,
                         setup_logging_subsystem)
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.utils.compat import _CompatLoggerAdapter
 from celery.tests.utils import (override_stdouts, wrap_logger,
                                 get_handlers, set_handlers)
@@ -119,7 +119,7 @@ class test_task_logger(test_default_logger):
         logger = get_task_logger()
         logger.handlers = []
         logging.root.manager.loggerDict.pop(logger.name, None)
-        self.uid = gen_unique_id()
+        self.uid = uuid()
 
     def setup_logger(self, *args, **kwargs):
         return setup_task_logger(*args, **dict(kwargs, task_name=self.uid,
@@ -154,7 +154,7 @@ class test_CompatLoggerAdapter(unittest.TestCase):
         self.logger, self.adapter = self.createAdapter()
 
     def createAdapter(self, name=None, extra={"foo": "bar"}):
-        logger = MockLogger(name=name or gen_unique_id())
+        logger = MockLogger(name=name or uuid())
         return logger, _CompatLoggerAdapter(logger, extra)
 
     def test_levels(self):

+ 9 - 9
celery/tests/test_events/test_events_state.py

@@ -6,7 +6,7 @@ from itertools import count
 from celery import states
 from celery.events import Event
 from celery.events.state import State, Worker, Task, HEARTBEAT_EXPIRE
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 
 
 class replay(object):
@@ -49,18 +49,18 @@ class ev_worker_heartbeats(replay):
 
 
 class ev_task_states(replay):
-    uuid = gen_unique_id()
+    tid = uuid()
     events = [
-        Event("task-received", uuid=uuid, name="task1",
+        Event("task-received", uuid=tid, name="task1",
               args="(2, 2)", kwargs="{'foo': 'bar'}",
               retries=0, eta=None, hostname="utest1"),
-        Event("task-started", uuid=uuid, hostname="utest1"),
-        Event("task-revoked", uuid=uuid, hostname="utest1"),
-        Event("task-retried", uuid=uuid, exception="KeyError('bar')",
+        Event("task-started", uuid=tid, hostname="utest1"),
+        Event("task-revoked", uuid=tid, hostname="utest1"),
+        Event("task-retried", uuid=tid, exception="KeyError('bar')",
               traceback="line 2 at main", hostname="utest1"),
-        Event("task-failed", uuid=uuid, exception="KeyError('foo')",
+        Event("task-failed", uuid=tid, exception="KeyError('foo')",
               traceback="line 1 at main", hostname="utest1"),
-        Event("task-succeeded", uuid=uuid, result="4",
+        Event("task-succeeded", uuid=tid, result="4",
               runtime=0.1234, hostname="utest1"),
     ]
 
@@ -75,7 +75,7 @@ class ev_snapshot(replay):
         worker = not i % 2 and "utest2" or "utest1"
         type = not i % 2 and "task2" or "task1"
         events.append(Event("task-received", name=type,
-                      uuid=gen_unique_id(), hostname=worker))
+                      uuid=uuid(), hostname=worker))
 
 
 class test_Worker(unittest.TestCase):

+ 7 - 7
celery/tests/test_slow/test_buckets.py

@@ -7,7 +7,7 @@ from itertools import chain, izip
 from celery.registry import TaskRegistry
 from celery.task.base import Task
 from celery.utils import timeutils
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.worker import buckets
 
 from celery.tests.utils import skip_if_environ, unittest
@@ -164,7 +164,7 @@ class test_TaskBucket(unittest.TestCase):
         b = buckets.TaskBucket(task_registry=reg)
         reg["nonexisting.task"] = "foo"
 
-        b.put(MockJob(gen_unique_id(), "nonexisting.task", (), {}))
+        b.put(MockJob(uuid(), "nonexisting.task", (), {}))
         self.assertIn("nonexisting.task", b.buckets)
 
     @skip_if_disabled
@@ -200,7 +200,7 @@ class test_TaskBucket(unittest.TestCase):
     @skip_if_disabled
     def test_put__get(self):
         b = buckets.TaskBucket(task_registry=self.registry)
-        job = MockJob(gen_unique_id(), TaskA.name, ["theqbf"], {"foo": "bar"})
+        job = MockJob(uuid(), TaskA.name, ["theqbf"], {"foo": "bar"})
         b.put(job)
         self.assertEqual(b.get(), job)
 
@@ -208,7 +208,7 @@ class test_TaskBucket(unittest.TestCase):
     def test_fill_rate(self):
         b = buckets.TaskBucket(task_registry=self.registry)
 
-        cjob = lambda i: MockJob(gen_unique_id(), TaskA.name, [i], {})
+        cjob = lambda i: MockJob(uuid(), TaskA.name, [i], {})
         jobs = [cjob(i) for i in xrange(20)]
         [b.put(job) for job in jobs]
 
@@ -225,7 +225,7 @@ class test_TaskBucket(unittest.TestCase):
     def test__very_busy_queue_doesnt_block_others(self):
         b = buckets.TaskBucket(task_registry=self.registry)
 
-        cjob = lambda i, t: MockJob(gen_unique_id(), t.name, [i], {})
+        cjob = lambda i, t: MockJob(uuid(), t.name, [i], {})
         ajobs = [cjob(i, TaskA) for i in xrange(10)]
         bjobs = [cjob(i, TaskB) for i in xrange(20)]
         jobs = list(chain(*izip(bjobs, ajobs)))
@@ -245,7 +245,7 @@ class test_TaskBucket(unittest.TestCase):
         try:
             b = buckets.TaskBucket(task_registry=self.registry)
 
-            cjob = lambda i, t: MockJob(gen_unique_id(), t.name, [i], {})
+            cjob = lambda i, t: MockJob(uuid(), t.name, [i], {})
 
             ajobs = [cjob(i, TaskA) for i in xrange(10)]
             bjobs = [cjob(i, TaskB) for i in xrange(10)]
@@ -267,7 +267,7 @@ class test_TaskBucket(unittest.TestCase):
     def test_empty(self):
         x = buckets.TaskBucket(task_registry=self.registry)
         self.assertTrue(x.empty())
-        x.put(MockJob(gen_unique_id(), TaskC.name, [], {}))
+        x.put(MockJob(uuid(), TaskC.name, [], {}))
         self.assertFalse(x.empty())
         x.clear()
         self.assertTrue(x.empty())

+ 39 - 39
celery/tests/test_task/test_result.py

@@ -1,6 +1,6 @@
 from celery import states
 from celery.app import app_or_default
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.utils.serialization import pickle
 from celery.result import AsyncResult, EagerResult, TaskSetResult, ResultSet
 from celery.exceptions import TimeoutError
@@ -11,7 +11,7 @@ from celery.tests.utils import skip_if_quick
 
 
 def mock_task(name, status, result):
-    return dict(id=gen_unique_id(), name=name, status=status, result=result)
+    return dict(id=uuid(), name=name, status=status, result=result)
 
 
 def save_result(task):
@@ -62,7 +62,7 @@ class TestAsyncResult(unittest.TestCase):
         self.assertFalse(nok_res.successful())
         self.assertFalse(nok_res2.successful())
 
-        pending_res = AsyncResult(gen_unique_id())
+        pending_res = AsyncResult(uuid())
         self.assertFalse(pending_res.successful())
 
     def test_str(self):
@@ -73,7 +73,7 @@ class TestAsyncResult(unittest.TestCase):
         self.assertEqual(str(ok2_res), self.task2["id"])
         self.assertEqual(str(nok_res), self.task3["id"])
 
-        pending_id = gen_unique_id()
+        pending_id = uuid()
         pending_res = AsyncResult(pending_id)
         self.assertEqual(str(pending_res), pending_id)
 
@@ -88,7 +88,7 @@ class TestAsyncResult(unittest.TestCase):
         self.assertEqual(repr(nok_res), "<AsyncResult: %s>" % (
                 self.task3["id"]))
 
-        pending_id = gen_unique_id()
+        pending_id = uuid()
         pending_res = AsyncResult(pending_id)
         self.assertEqual(repr(pending_res), "<AsyncResult: %s>" % (
                 pending_id))
@@ -107,7 +107,7 @@ class TestAsyncResult(unittest.TestCase):
         self.assertTrue(nok_res.traceback)
         self.assertTrue(nok_res2.traceback)
 
-        pending_res = AsyncResult(gen_unique_id())
+        pending_res = AsyncResult(uuid())
         self.assertFalse(pending_res.traceback)
 
     def test_get(self):
@@ -126,7 +126,7 @@ class TestAsyncResult(unittest.TestCase):
         res = AsyncResult(self.task4["id"])             # has RETRY status
         self.assertRaises(TimeoutError, res.get, timeout=0.1)
 
-        pending_res = AsyncResult(gen_unique_id())
+        pending_res = AsyncResult(uuid())
         self.assertRaises(TimeoutError, pending_res.get, timeout=0.1)
 
     @skip_if_quick
@@ -141,7 +141,7 @@ class TestAsyncResult(unittest.TestCase):
         self.assertTrue(all(result.ready() for result in oks))
         self.assertFalse(AsyncResult(self.task4["id"]).ready())
 
-        self.assertFalse(AsyncResult(gen_unique_id()).ready())
+        self.assertFalse(AsyncResult(uuid()).ready())
 
 
 class test_ResultSet(unittest.TestCase):
@@ -212,35 +212,35 @@ class TestTaskSetResult(unittest.TestCase):
 
     def setUp(self):
         self.size = 10
-        self.ts = TaskSetResult(gen_unique_id(), make_mock_taskset(self.size))
+        self.ts = TaskSetResult(uuid(), make_mock_taskset(self.size))
 
     def test_total(self):
         self.assertEqual(self.ts.total, self.size)
 
     def test_iterate_raises(self):
-        ar = MockAsyncResultFailure(gen_unique_id())
-        ts = TaskSetResult(gen_unique_id(), [ar])
+        ar = MockAsyncResultFailure(uuid())
+        ts = TaskSetResult(uuid(), [ar])
         it = iter(ts)
         self.assertRaises(KeyError, it.next)
 
     def test_forget(self):
-        subs = [MockAsyncResultSuccess(gen_unique_id()),
-                MockAsyncResultSuccess(gen_unique_id())]
-        ts = TaskSetResult(gen_unique_id(), subs)
+        subs = [MockAsyncResultSuccess(uuid()),
+                MockAsyncResultSuccess(uuid())]
+        ts = TaskSetResult(uuid(), subs)
         ts.forget()
         for sub in subs:
             self.assertTrue(sub.forgotten)
 
     def test_getitem(self):
-        subs = [MockAsyncResultSuccess(gen_unique_id()),
-                MockAsyncResultSuccess(gen_unique_id())]
-        ts = TaskSetResult(gen_unique_id(), subs)
+        subs = [MockAsyncResultSuccess(uuid()),
+                MockAsyncResultSuccess(uuid())]
+        ts = TaskSetResult(uuid(), subs)
         self.assertIs(ts[0], subs[0])
 
     def test_save_restore(self):
-        subs = [MockAsyncResultSuccess(gen_unique_id()),
-                MockAsyncResultSuccess(gen_unique_id())]
-        ts = TaskSetResult(gen_unique_id(), subs)
+        subs = [MockAsyncResultSuccess(uuid()),
+                MockAsyncResultSuccess(uuid())]
+        ts = TaskSetResult(uuid(), subs)
         ts.save()
         self.assertRaises(AttributeError, ts.save, backend=object())
         self.assertEqual(TaskSetResult.restore(ts.taskset_id).subtasks,
@@ -253,42 +253,42 @@ class TestTaskSetResult(unittest.TestCase):
 
     def test_join_native(self):
         backend = SimpleBackend()
-        subtasks = [AsyncResult(gen_unique_id(), backend=backend)
+        subtasks = [AsyncResult(uuid(), backend=backend)
                         for i in range(10)]
-        ts = TaskSetResult(gen_unique_id(), subtasks)
+        ts = TaskSetResult(uuid(), subtasks)
         backend.ids = [subtask.task_id for subtask in subtasks]
         res = ts.join_native()
         self.assertEqual(res, range(10))
 
     def test_iter_native(self):
         backend = SimpleBackend()
-        subtasks = [AsyncResult(gen_unique_id(), backend=backend)
+        subtasks = [AsyncResult(uuid(), backend=backend)
                         for i in range(10)]
-        ts = TaskSetResult(gen_unique_id(), subtasks)
+        ts = TaskSetResult(uuid(), subtasks)
         backend.ids = [subtask.task_id for subtask in subtasks]
         self.assertEqual(len(list(ts.iter_native())), 10)
 
     def test_iterate_yields(self):
-        ar = MockAsyncResultSuccess(gen_unique_id())
-        ar2 = MockAsyncResultSuccess(gen_unique_id())
-        ts = TaskSetResult(gen_unique_id(), [ar, ar2])
+        ar = MockAsyncResultSuccess(uuid())
+        ar2 = MockAsyncResultSuccess(uuid())
+        ts = TaskSetResult(uuid(), [ar, ar2])
         it = iter(ts)
         self.assertEqual(it.next(), 42)
         self.assertEqual(it.next(), 42)
 
     def test_iterate_eager(self):
-        ar1 = EagerResult(gen_unique_id(), 42, states.SUCCESS)
-        ar2 = EagerResult(gen_unique_id(), 42, states.SUCCESS)
-        ts = TaskSetResult(gen_unique_id(), [ar1, ar2])
+        ar1 = EagerResult(uuid(), 42, states.SUCCESS)
+        ar2 = EagerResult(uuid(), 42, states.SUCCESS)
+        ts = TaskSetResult(uuid(), [ar1, ar2])
         it = iter(ts)
         self.assertEqual(it.next(), 42)
         self.assertEqual(it.next(), 42)
 
     def test_join_timeout(self):
-        ar = MockAsyncResultSuccess(gen_unique_id())
-        ar2 = MockAsyncResultSuccess(gen_unique_id())
-        ar3 = AsyncResult(gen_unique_id())
-        ts = TaskSetResult(gen_unique_id(), [ar, ar2, ar3])
+        ar = MockAsyncResultSuccess(uuid())
+        ar2 = MockAsyncResultSuccess(uuid())
+        ar3 = AsyncResult(uuid())
+        ts = TaskSetResult(uuid(), [ar, ar2, ar3])
         self.assertRaises(TimeoutError, ts.join, timeout=0.0000001)
 
     def test_itersubtasks(self):
@@ -328,7 +328,7 @@ class TestTaskSetResult(unittest.TestCase):
 class TestPendingAsyncResult(unittest.TestCase):
 
     def setUp(self):
-        self.task = AsyncResult(gen_unique_id())
+        self.task = AsyncResult(uuid())
 
     def test_result(self):
         self.assertIsNone(self.task.result)
@@ -342,7 +342,7 @@ class TestFailedTaskSetResult(TestTaskSetResult):
         failed = mock_task("ts11", states.FAILURE, KeyError("Baz"))
         save_result(failed)
         failed_res = AsyncResult(failed["id"])
-        self.ts = TaskSetResult(gen_unique_id(), subtasks + [failed_res])
+        self.ts = TaskSetResult(uuid(), subtasks + [failed_res])
 
     def test_itersubtasks(self):
 
@@ -377,9 +377,9 @@ class TestFailedTaskSetResult(TestTaskSetResult):
 class TestTaskSetPending(unittest.TestCase):
 
     def setUp(self):
-        self.ts = TaskSetResult(gen_unique_id(), [
-                                        AsyncResult(gen_unique_id()),
-                                        AsyncResult(gen_unique_id())])
+        self.ts = TaskSetResult(uuid(), [
+                                        AsyncResult(uuid()),
+                                        AsyncResult(uuid())])
 
     def test_completed_count(self):
         self.assertEqual(self.ts.completed_count(), 0)

+ 4 - 4
celery/tests/test_task/test_task.py

@@ -11,7 +11,7 @@ from celery.exceptions import RetryTaskError
 from celery.execute import send_task
 from celery.result import EagerResult
 from celery.schedules import crontab, crontab_parser
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.utils.timeutils import parse_iso8601
 
 from celery.tests.utils import with_eager_tasks, unittest, StringIO
@@ -224,7 +224,7 @@ class TestCeleryTasks(unittest.TestCase):
         return cls
 
     def test_AsyncResult(self):
-        task_id = gen_unique_id()
+        task_id = uuid()
         result = RetryTask.AsyncResult(task_id)
         self.assertEqual(result.backend, RetryTask.backend)
         self.assertEqual(result.task_id, task_id)
@@ -379,7 +379,7 @@ class TestCeleryTasks(unittest.TestCase):
         def yyy():
             pass
 
-        tid = gen_unique_id()
+        tid = uuid()
         yyy.update_state(tid, "FROBULATING", {"fooz": "baaz"})
         self.assertEqual(yyy.AsyncResult(tid).status, "FROBULATING")
         self.assertDictEqual(yyy.AsyncResult(tid).result, {"fooz": "baaz"})
@@ -460,7 +460,7 @@ class TestTaskSet(unittest.TestCase):
     def test_named_taskset(self):
         prefix = "test_named_taskset-"
         ts = task.TaskSet([return_True_task.subtask([1])])
-        res = ts.apply(taskset_id=prefix + gen_unique_id())
+        res = ts.apply(taskset_id=prefix + uuid())
         self.assertTrue(res.taskset_id.startswith(prefix))
 
 

+ 3 - 4
celery/tests/test_task/test_task_control.py

@@ -5,7 +5,7 @@ from kombu.pidbox import Mailbox
 from celery.app import app_or_default
 from celery.task import control
 from celery.task import PingTask
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.tests.utils import unittest
 
 
@@ -160,9 +160,8 @@ class test_Broadcast(unittest.TestCase):
 
     @with_mock_broadcast
     def test_revoke_from_resultset(self):
-        r = self.app.TaskSetResult(gen_unique_id(),
+        r = self.app.TaskSetResult(uuid(),
                                    map(self.app.AsyncResult,
-                                        [gen_unique_id()
-                                            for i in range(10)]))
+                                        [uuid() for i in range(10)]))
         r.revoke()
         self.assertIn("revoke", MockMailbox.sent)

+ 3 - 3
celery/tests/test_worker/test_worker.py

@@ -16,7 +16,7 @@ from celery.concurrency.base import BasePool
 from celery.exceptions import SystemTerminate
 from celery.task import task as task_dec
 from celery.task import periodic_task as periodic_task_dec
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.worker import WorkController
 from celery.worker.buckets import FastQueue
 from celery.worker.job import TaskRequest
@@ -87,7 +87,7 @@ def foo_periodic_task():
 
 
 def create_message(channel, **data):
-    data.setdefault("id", gen_unique_id())
+    data.setdefault("id", uuid())
     channel.no_ack_consumers = set()
     return Message(channel, body=pickle.dumps(dict(**data)),
                    content_type="application/x-python-serialize",
@@ -497,7 +497,7 @@ class test_Consumer(unittest.TestCase):
         l = MyKombuConsumer(ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = Mock()
-        id = gen_unique_id()
+        id = uuid()
         t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
                            kwargs={}, id=id)
         from celery.worker.state import revoked

+ 17 - 17
celery/tests/test_worker/test_worker_control.py

@@ -13,7 +13,7 @@ from celery.datastructures import AttributeDict
 from celery.task import task
 from celery.registry import tasks
 from celery.task import PingTask
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.worker.buckets import FastQueue
 from celery.worker.job import TaskRequest
 from celery.worker import state
@@ -38,7 +38,7 @@ class Consumer(object):
     def __init__(self):
         self.ready_queue = FastQueue()
         self.ready_queue.put(TaskRequest(task_name=mytask.name,
-                                         task_id=gen_unique_id(),
+                                         task_id=uuid(),
                                          args=(2, 2),
                                          kwargs={}))
         self.eta_schedule = Timer()
@@ -298,48 +298,48 @@ class test_ControlPanel(unittest.TestCase):
         self.assertRaises(KeyError, self.panel.handle, "foo", arguments={})
 
     def test_revoke_with_name(self):
-        uuid = gen_unique_id()
+        tid = uuid()
         m = {"method": "revoke",
              "destination": hostname,
-             "arguments": {"task_id": uuid,
+             "arguments": {"task_id": tid,
                            "task_name": mytask.name}}
         self.panel.dispatch_from_message(m)
-        self.assertIn(uuid, revoked)
+        self.assertIn(tid, revoked)
 
     def test_revoke_with_name_not_in_registry(self):
-        uuid = gen_unique_id()
+        tid = uuid()
         m = {"method": "revoke",
              "destination": hostname,
-             "arguments": {"task_id": uuid,
+             "arguments": {"task_id": tid,
                            "task_name": "xxxxxxxxx33333333388888"}}
         self.panel.dispatch_from_message(m)
-        self.assertIn(uuid, revoked)
+        self.assertIn(tid, revoked)
 
     def test_revoke(self):
-        uuid = gen_unique_id()
+        tid = uuid()
         m = {"method": "revoke",
              "destination": hostname,
-             "arguments": {"task_id": uuid}}
+             "arguments": {"task_id": tid}}
         self.panel.dispatch_from_message(m)
-        self.assertIn(uuid, revoked)
+        self.assertIn(tid, revoked)
 
         m = {"method": "revoke",
              "destination": "does.not.exist",
-             "arguments": {"task_id": uuid + "xxx"}}
+             "arguments": {"task_id": tid + "xxx"}}
         self.panel.dispatch_from_message(m)
-        self.assertNotIn(uuid + "xxx", revoked)
+        self.assertNotIn(tid + "xxx", revoked)
 
     def test_revoke_terminate(self):
         request = Mock()
-        request.task_id = uuid = gen_unique_id()
+        request.task_id = tid = uuid()
         state.active_requests.add(request)
         try:
-            r = builtins.revoke(Mock(), uuid, terminate=True)
-            self.assertIn(uuid, revoked)
+            r = builtins.revoke(Mock(), tid, terminate=True)
+            self.assertIn(tid, revoked)
             self.assertTrue(request.terminate.call_count)
             self.assertIn("terminated", r["ok"])
             # unknown task id only revokes
-            r = builtins.revoke(Mock(), gen_unique_id(), terminate=True)
+            r = builtins.revoke(Mock(), uuid(), terminate=True)
             self.assertIn("revoked", r["ok"])
         finally:
             state.active_requests.discard(request)

+ 63 - 63
celery/tests/test_worker/test_worker_job.py

@@ -21,7 +21,7 @@ from celery.exceptions import RetryTaskError, NotRegistered, WorkerLostError
 from celery.log import setup_logger
 from celery.result import AsyncResult
 from celery.task.base import Task
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.worker.job import (WorkerTaskTrace, TaskRequest,
                                InvalidTaskError, execute_and_trace,
                                default_encode)
@@ -113,10 +113,10 @@ class test_WorkerTaskTrace(unittest.TestCase):
 
             logger = mytask.app.log.get_default_logger()
             with wrap_logger(logger) as sio:
-                uuid = gen_unique_id()
-                ret = jail(uuid, mytask.name, [2], {})
+                tid = uuid()
+                ret = jail(tid, mytask.name, [2], {})
                 self.assertEqual(ret, 4)
-                mytask.backend.mark_as_done.assert_called_with(uuid, 4)
+                mytask.backend.mark_as_done.assert_called_with(tid, 4)
                 logs = sio.getvalue().strip()
                 self.assertIn("Process cleanup failed", logs)
         finally:
@@ -128,12 +128,12 @@ class test_WorkerTaskTrace(unittest.TestCase):
         mytask.backend.process_cleanup = Mock(side_effect=SystemExit())
         try:
             self.assertRaises(SystemExit,
-                    jail, gen_unique_id(), mytask.name, [2], {})
+                    jail, uuid(), mytask.name, [2], {})
         finally:
             mytask.backend = backend
 
     def test_execute_jail_success(self):
-        ret = jail(gen_unique_id(), mytask.name, [2], {})
+        ret = jail(uuid(), mytask.name, [2], {})
         self.assertEqual(ret, 4)
 
     def test_marked_as_started(self):
@@ -142,33 +142,33 @@ class test_WorkerTaskTrace(unittest.TestCase):
         class Backend(mytask.backend.__class__):
             _started = []
 
-            def mark_as_started(self, uuid, *args, **kwargs):
-                self._started.append(uuid)
+            def mark_as_started(self, tid, *args, **kwargs):
+                self._started.append(tid)
 
         prev, mytask.backend = mytask.backend, Backend()
 
         try:
-            uuid = gen_unique_id()
-            jail(uuid, mytask.name, [2], {})
-            self.assertIn(uuid, Backend._started)
+            tid = uuid()
+            jail(tid, mytask.name, [2], {})
+            self.assertIn(tid, Backend._started)
 
             mytask.ignore_result = True
-            uuid = gen_unique_id()
-            jail(uuid, mytask.name, [2], {})
-            self.assertNotIn(uuid, Backend._started)
+            tid = uuid()
+            jail(tid, mytask.name, [2], {})
+            self.assertNotIn(tid, Backend._started)
         finally:
             mytask.backend = prev
             mytask.track_started = False
             mytask.ignore_result = False
 
     def test_execute_jail_failure(self):
-        ret = jail(gen_unique_id(), mytask_raising.name,
+        ret = jail(uuid(), mytask_raising.name,
                    [4], {})
         self.assertIsInstance(ret, ExceptionInfo)
         self.assertTupleEqual(ret.exception.args, (4, ))
 
     def test_execute_ignore_result(self):
-        task_id = gen_unique_id()
+        task_id = uuid()
         ret = jail(id, MyTaskIgnoreResult.name,
                    [4], {})
         self.assertEqual(ret, 256)
@@ -187,29 +187,29 @@ class MockEventDispatcher(object):
 class test_TaskRequest(unittest.TestCase):
 
     def test_task_wrapper_repr(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         self.assertTrue(repr(tw))
 
     def test_sets_store_errors(self):
         mytask.ignore_result = True
         try:
-            tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+            tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
             self.assertFalse(tw._store_errors)
             mytask.store_errors_even_if_ignored = True
-            tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+            tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
             self.assertTrue(tw._store_errors)
         finally:
             mytask.ignore_result = False
             mytask.store_errors_even_if_ignored = False
 
     def test_send_event(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.eventer = MockEventDispatcher()
         tw.send_event("task-frobulated")
         self.assertIn("task-frobulated", tw.eventer.sent)
 
     def test_on_retry(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.eventer = MockEventDispatcher()
         try:
             raise RetryTaskError("foo", KeyError("moofoobar"))
@@ -220,7 +220,7 @@ class test_TaskRequest(unittest.TestCase):
 
     def test_terminate__task_started(self):
         pool = Mock()
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.time_start = time.time()
         tw.worker_pid = 313
         tw.terminate(pool, signal="KILL")
@@ -228,7 +228,7 @@ class test_TaskRequest(unittest.TestCase):
 
     def test_terminate__task_reserved(self):
         pool = Mock()
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.time_start = None
         tw.terminate(pool, signal="KILL")
         self.assertFalse(pool.terminate_job.call_count)
@@ -236,7 +236,7 @@ class test_TaskRequest(unittest.TestCase):
         tw.terminate(pool, signal="KILL")
 
     def test_revoked_expires_expired(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.expires = datetime.now() - timedelta(days=1)
         tw.revoked()
         self.assertIn(tw.task_id, revoked)
@@ -244,7 +244,7 @@ class test_TaskRequest(unittest.TestCase):
                          states.REVOKED)
 
     def test_revoked_expires_not_expired(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.expires = datetime.now() + timedelta(days=1)
         tw.revoked()
         self.assertNotIn(tw.task_id, revoked)
@@ -253,7 +253,7 @@ class test_TaskRequest(unittest.TestCase):
 
     def test_revoked_expires_ignore_result(self):
         mytask.ignore_result = True
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         try:
             tw.expires = datetime.now() - timedelta(days=1)
             tw.revoked()
@@ -276,7 +276,7 @@ class test_TaskRequest(unittest.TestCase):
         app.mail_admins = mock_mail_admins
         mytask.send_error_emails = True
         try:
-            tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+            tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
             try:
                 raise KeyError("moofoobar")
             except:
@@ -308,25 +308,25 @@ class test_TaskRequest(unittest.TestCase):
             mytask.error_whitelist = ()
 
     def test_already_revoked(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw._already_revoked = True
         self.assertTrue(tw.revoked())
 
     def test_revoked(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         revoked.add(tw.task_id)
         self.assertTrue(tw.revoked())
         self.assertTrue(tw._already_revoked)
         self.assertTrue(tw.acknowledged)
 
     def test_execute_does_not_execute_revoked(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         revoked.add(tw.task_id)
         tw.execute()
 
     def test_execute_acks_late(self):
         mytask_raising.acks_late = True
-        tw = TaskRequest(mytask_raising.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask_raising.name, uuid(), [1], {"f": "x"})
         try:
             tw.execute()
             self.assertTrue(tw.acknowledged)
@@ -334,17 +334,17 @@ class test_TaskRequest(unittest.TestCase):
             mytask_raising.acks_late = False
 
     def test_execute_using_pool_does_not_execute_revoked(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         revoked.add(tw.task_id)
         tw.execute_using_pool(None)
 
     def test_on_accepted_acks_early(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
         self.assertTrue(tw.acknowledged)
 
     def test_on_accepted_acks_late(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         mytask.acks_late = True
         try:
             tw.on_accepted(pid=os.getpid(), time_accepted=time.time())
@@ -353,7 +353,7 @@ class test_TaskRequest(unittest.TestCase):
             mytask.acks_late = False
 
     def test_on_accepted_terminates(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         pool = Mock()
         tw.terminate(pool, signal="KILL")
         self.assertFalse(pool.terminate_job.call_count)
@@ -361,13 +361,13 @@ class test_TaskRequest(unittest.TestCase):
         pool.terminate_job.assert_called_with(314, "KILL")
 
     def test_on_success_acks_early(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.time_start = 1
         tw.on_success(42)
         self.assertFalse(tw.acknowledged)
 
     def test_on_success_acks_late(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.time_start = 1
         mytask.acks_late = True
         try:
@@ -377,7 +377,7 @@ class test_TaskRequest(unittest.TestCase):
             mytask.acks_late = False
 
     def test_on_failure_WorkerLostError(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         try:
             raise WorkerLostError("do re mi")
         except WorkerLostError:
@@ -388,7 +388,7 @@ class test_TaskRequest(unittest.TestCase):
 
         mytask.ignore_result = True
         try:
-            tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+            tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
             tw.on_failure(exc_info)
             self.assertEqual(mytask.backend.get_status(tw.task_id),
                              states.PENDING)
@@ -396,7 +396,7 @@ class test_TaskRequest(unittest.TestCase):
             mytask.ignore_result = False
 
     def test_on_failure_acks_late(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.time_start = 1
         mytask.acks_late = True
         try:
@@ -428,7 +428,7 @@ class test_TaskRequest(unittest.TestCase):
             def error(self, msg, *args, **kwargs):
                 self.errors.append(msg % args)
 
-        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         tw.logger = MockLogger()
         tw.on_timeout(soft=True, timeout=1337)
         self.assertIn("Soft time limit (1337s) exceeded",
@@ -440,7 +440,7 @@ class test_TaskRequest(unittest.TestCase):
 
         mytask.ignore_result = True
         try:
-            tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
+            tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
             tw.logger = MockLogger()
         finally:
             mytask.ignore_result = False
@@ -449,7 +449,7 @@ class test_TaskRequest(unittest.TestCase):
                              states.PENDING)
 
     def test_execute_and_trace(self):
-        res = execute_and_trace(mytask.name, gen_unique_id(), [4], {})
+        res = execute_and_trace(mytask.name, uuid(), [4], {})
         self.assertEqual(res, 4 ** 4)
 
     def test_execute_safe_catches_exception(self):
@@ -461,7 +461,7 @@ class test_TaskRequest(unittest.TestCase):
         WorkerTaskTrace.execute = _error_exec
         try:
             with catch_warnings(record=True) as log:
-                res = execute_and_trace(mytask.name, gen_unique_id(),
+                res = execute_and_trace(mytask.name, uuid(),
                                         [4], {})
                 self.assertIsInstance(res, ExceptionInfo)
                 self.assertTrue(log)
@@ -478,31 +478,31 @@ class test_TaskRequest(unittest.TestCase):
 
     def test_worker_task_trace_handle_retry(self):
         from celery.exceptions import RetryTaskError
-        uuid = gen_unique_id()
-        w = WorkerTaskTrace(mytask.name, uuid, [4], {})
+        tid = uuid()
+        w = WorkerTaskTrace(mytask.name, tid, [4], {})
         type_, value_, tb_ = self.create_exception(ValueError("foo"))
         type_, value_, tb_ = self.create_exception(RetryTaskError(str(value_),
                                                                   exc=value_))
         w._store_errors = False
         w.handle_retry(value_, type_, tb_, "")
-        self.assertEqual(mytask.backend.get_status(uuid), states.PENDING)
+        self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
         w._store_errors = True
         w.handle_retry(value_, type_, tb_, "")
-        self.assertEqual(mytask.backend.get_status(uuid), states.RETRY)
+        self.assertEqual(mytask.backend.get_status(tid), states.RETRY)
 
     def test_worker_task_trace_handle_failure(self):
-        uuid = gen_unique_id()
-        w = WorkerTaskTrace(mytask.name, uuid, [4], {})
+        tid = uuid()
+        w = WorkerTaskTrace(mytask.name, tid, [4], {})
         type_, value_, tb_ = self.create_exception(ValueError("foo"))
         w._store_errors = False
         w.handle_failure(value_, type_, tb_, "")
-        self.assertEqual(mytask.backend.get_status(uuid), states.PENDING)
+        self.assertEqual(mytask.backend.get_status(tid), states.PENDING)
         w._store_errors = True
         w.handle_failure(value_, type_, tb_, "")
-        self.assertEqual(mytask.backend.get_status(uuid), states.FAILURE)
+        self.assertEqual(mytask.backend.get_status(tid), states.FAILURE)
 
     def test_task_wrapper_mail_attrs(self):
-        tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
+        tw = TaskRequest(mytask.name, uuid(), [], {})
         x = tw.success_msg % {"name": tw.task_name,
                               "id": tw.task_id,
                               "return_value": 10,
@@ -520,7 +520,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertTrue(x)
 
     def test_from_message(self):
-        body = {"task": mytask.name, "id": gen_unique_id(),
+        body = {"task": mytask.name, "id": uuid(),
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
         m = Message(None, body=anyjson.serialize(body), backend="foo",
                           content_type="application/json",
@@ -536,7 +536,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertTrue(tw.logger)
 
     def test_from_message_nonexistant_task(self):
-        body = {"task": "cu.mytask.doesnotexist", "id": gen_unique_id(),
+        body = {"task": "cu.mytask.doesnotexist", "id": uuid(),
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
         m = Message(None, body=anyjson.serialize(body), backend="foo",
                           content_type="application/json",
@@ -545,7 +545,7 @@ class test_TaskRequest(unittest.TestCase):
                           m, m.decode())
 
     def test_execute(self):
-        tid = gen_unique_id()
+        tid = uuid()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         self.assertEqual(tw.execute(), 256)
         meta = mytask.backend.get_task_meta(tid)
@@ -553,7 +553,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertEqual(meta["status"], states.SUCCESS)
 
     def test_execute_success_no_kwargs(self):
-        tid = gen_unique_id()
+        tid = uuid()
         tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
         self.assertEqual(tw.execute(), 256)
         meta = mytask_no_kwargs.backend.get_task_meta(tid)
@@ -561,7 +561,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertEqual(meta["status"], states.SUCCESS)
 
     def test_execute_success_some_kwargs(self):
-        tid = gen_unique_id()
+        tid = uuid()
         tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
         self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
         meta = mytask_some_kwargs.backend.get_task_meta(tid)
@@ -570,7 +570,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertEqual(meta["status"], states.SUCCESS)
 
     def test_execute_ack(self):
-        tid = gen_unique_id()
+        tid = uuid()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
                         on_ack=on_ack)
         self.assertEqual(tw.execute(), 256)
@@ -580,7 +580,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertEqual(meta["status"], states.SUCCESS)
 
     def test_execute_fail(self):
-        tid = gen_unique_id()
+        tid = uuid()
         tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
         self.assertIsInstance(tw.execute(), ExceptionInfo)
         meta = mytask_raising.backend.get_task_meta(tid)
@@ -588,7 +588,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertIsInstance(meta["result"], KeyError)
 
     def test_execute_using_pool(self):
-        tid = gen_unique_id()
+        tid = uuid()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
 
         class MockPool(BasePool):
@@ -615,7 +615,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertIn([4], p.args)
 
     def test_default_kwargs(self):
-        tid = gen_unique_id()
+        tid = uuid()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         self.assertDictEqual(
                 tw.extend_with_default_kwargs(10, "some_logfile"), {
@@ -630,7 +630,7 @@ class test_TaskRequest(unittest.TestCase):
 
     def _test_on_failure(self, exception):
         app = app_or_default()
-        tid = gen_unique_id()
+        tid = uuid()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         try:
             raise exception

+ 2 - 2
celery/tests/test_worker/test_worker_mediator.py

@@ -4,7 +4,7 @@ from Queue import Queue
 
 from mock import Mock, patch
 
-from celery.utils import gen_unique_id
+from celery.utils import uuid
 from celery.worker.mediator import Mediator
 from celery.worker.state import revoked as revoked_tasks
 
@@ -108,7 +108,7 @@ class test_Mediator(unittest.TestCase):
 
         m = Mediator(ready_queue, mycallback)
         t = MockTask("Jerry Seinfeld")
-        t.task_id = gen_unique_id()
+        t.task_id = uuid()
         revoked_tasks.add(t.task_id)
         ready_queue.put(t)
 

+ 1 - 0
celery/utils/__init__.py

@@ -18,6 +18,7 @@ from itertools import islice
 from pprint import pprint
 
 from kombu.utils import cached_property, gen_unique_id  # noqa
+uuid = gen_unique_id
 
 from .compat import StringIO
 from .encoding import safe_repr as _safe_repr