Explorar o código

Make uuid4.uuid4() available as celery.utils.gen_unique_id() so it is generic
if we want to change it later.

Ask Solem %!s(int64=16) %!d(string=hai) anos
pai
achega
ba3c1a1784

+ 2 - 2
celery/messaging.py

@@ -5,7 +5,7 @@ Sending and Receiving Messages
 """
 from carrot.messaging import Publisher, Consumer
 from celery import conf
-import uuid
+from celery.utils import gen_unique_id
 
 try:
     import cPickle as pickle
@@ -51,7 +51,7 @@ class TaskPublisher(Publisher):
 
         task_args = task_args or []
         task_kwargs = task_kwargs or {}
-        task_id = task_id or str(uuid.uuid4())
+        task_id = task_id or gen_unique_id()
         message_data = {
             "id": task_id,
             "task": task_name,

+ 3 - 2
celery/task/base.py

@@ -4,8 +4,9 @@ from celery.messaging import TaskPublisher, TaskConsumer
 from celery.log import setup_logger
 from celery.result import TaskSetResult
 from celery.execute import apply_async, delay_task
+from celery.utils import gen_unique_id
 from datetime import timedelta
-import uuid
+
 try:
     import cPickle as pickle
 except ImportError:
@@ -282,7 +283,7 @@ class TaskSet(object):
             [True, True]
 
         """
-        taskset_id = str(uuid.uuid4())
+        taskset_id = gen_unique_id()
         conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
         publisher = TaskPublisher(connection=conn)
         subtasks = [apply_async(self.task, args, kwargs,

+ 4 - 4
celery/tests/test_backends/test_cache.py

@@ -1,9 +1,9 @@
 import sys
 import unittest
-import uuid
 import errno
 import socket
 from celery.backends.cache import Backend as CacheBackend
+from celery.utils import gen_unique_id
 from django.conf import settings
 
 
@@ -18,7 +18,7 @@ class TestCacheBackend(unittest.TestCase):
     def test_mark_as_done(self):
         cb = CacheBackend()
 
-        tid = str(uuid.uuid4())
+        tid = gen_unique_id()
 
         self.assertFalse(cb.is_done(tid))
         self.assertEquals(cb.get_status(tid), "PENDING")
@@ -34,7 +34,7 @@ class TestCacheBackend(unittest.TestCase):
     def test_is_pickled(self):
         cb = CacheBackend()
     
-        tid2 = str(uuid.uuid4())
+        tid2 = gen_unique_id()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         cb.mark_as_done(tid2, result)
         # is serialized properly.
@@ -45,7 +45,7 @@ class TestCacheBackend(unittest.TestCase):
     def test_mark_as_failure(self):
         cb = CacheBackend()
 
-        tid3 = str(uuid.uuid4())
+        tid3 = gen_unique_id()
         try:
             raise KeyError("foo")
         except KeyError, exception:

+ 4 - 4
celery/tests/test_backends/test_database.py

@@ -1,6 +1,6 @@
 import unittest
 from celery.backends.database import Backend
-import uuid
+from celery.utils import gen_unique_id
 
 
 class SomeClass(object):
@@ -13,7 +13,7 @@ class TestDatabaseBackend(unittest.TestCase):
 
     def test_backend(self):
         b = Backend()
-        tid = str(uuid.uuid4())
+        tid = gen_unique_id()
 
         self.assertFalse(b.is_done(tid))
         self.assertEquals(b.get_status(tid), "PENDING")
@@ -26,7 +26,7 @@ class TestDatabaseBackend(unittest.TestCase):
         self.assertTrue(b._cache.get(tid))
         self.assertTrue(b.get_result(tid), 42)
 
-        tid2 = str(uuid.uuid4())
+        tid2 = gen_unique_id()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         b.mark_as_done(tid2, result)
         # is serialized properly.
@@ -34,7 +34,7 @@ class TestDatabaseBackend(unittest.TestCase):
         self.assertEquals(rindb.get("foo"), "baz")
         self.assertEquals(rindb.get("bar").data, 12345)
 
-        tid3 = str(uuid.uuid4())
+        tid3 = gen_unique_id()
         try:
             raise KeyError("foo")
         except KeyError, exception:

+ 4 - 4
celery/tests/test_backends/test_tyrant.py

@@ -1,10 +1,10 @@
 import sys
 import unittest
-import uuid
 import errno
 import socket
 from celery.backends.tyrant import Backend as TyrantBackend
 from django.conf import settings
+from celery.utils import gen_unique_id
 
 _no_tyrant_msg = "* Tokyo Tyrant not running. Will not execute related tests."
 _no_tyrant_msg_emitted = False
@@ -48,7 +48,7 @@ class TestTyrantBackend(unittest.TestCase):
         if not tb:
             return
 
-        tid = str(uuid.uuid4())
+        tid = gen_unique_id()
 
         self.assertFalse(tb.is_done(tid))
         self.assertEquals(tb.get_status(tid), "PENDING")
@@ -66,7 +66,7 @@ class TestTyrantBackend(unittest.TestCase):
         if not tb:
             return
     
-        tid2 = str(uuid.uuid4())
+        tid2 = gen_unique_id()
         result = {"foo": "baz", "bar": SomeClass(12345)}
         tb.mark_as_done(tid2, result)
         # is serialized properly.
@@ -79,7 +79,7 @@ class TestTyrantBackend(unittest.TestCase):
         if not tb:
             return
 
-        tid3 = str(uuid.uuid4())
+        tid3 = gen_unique_id()
         try:
             raise KeyError("foo")
         except KeyError, exception:

+ 2 - 2
celery/tests/test_models.py

@@ -1,9 +1,9 @@
 import unittest
-import uuid
 from datetime import datetime, timedelta
 from celery.models import TaskMeta, PeriodicTaskMeta
 from celery.task import PeriodicTask
 from celery.registry import tasks
+from celery.utils import gen_unique_id
 
 
 class TestPeriodicTask(PeriodicTask):
@@ -14,7 +14,7 @@ class TestPeriodicTask(PeriodicTask):
 class TestModels(unittest.TestCase):
 
     def createTaskMeta(self):
-        id = str(uuid.uuid4())
+        id = gen_unique_id()
         taskmeta, created = TaskMeta.objects.get_or_create(task_id=id)
         return taskmeta
 

+ 17 - 13
celery/tests/test_worker_job.py

@@ -6,10 +6,11 @@ from celery.datastructures import ExceptionInfo
 from celery.models import TaskMeta
 from celery.registry import tasks, NotRegistered
 from celery.pool import TaskPool
-from uuid import uuid4
+from celery.utils import gen_unique_id
 from carrot.backends.base import BaseMessage
 import simplejson
 
+uuid4 = gen_unique_id
 
 def mytask(i, **kwargs):
     return i ** i
@@ -30,11 +31,11 @@ get_db_connection.ignore_result = True
 class TestJail(unittest.TestCase):
 
     def test_execute_jail_success(self):
-        ret = jail(str(uuid4()), str(uuid4()), mytask, [2], {})
+        ret = jail(gen_unique_id(), gen_unique_id(), mytask, [2], {})
         self.assertEquals(ret, 4)
 
     def test_execute_jail_failure(self):
-        ret = jail(str(uuid4()), str(uuid4()), mytask_raising, [4], {})
+        ret = jail(gen_unique_id(), gen_unique_id(), mytask_raising, [4], {})
         self.assertTrue(isinstance(ret, ExceptionInfo))
         self.assertEquals(ret.exception.args, (4, ))
 
@@ -49,7 +50,8 @@ class TestJail(unittest.TestCase):
 
         connection.close = monkeypatched_connection_close
 
-        ret = jail(str(uuid4()), str(uuid4()), get_db_connection, [2], {})
+        ret = jail(gen_unique_id(), gen_unique_id(),
+                   get_db_connection, [2], {})
         self.assertTrue(connection._was_closed)
 
         connection.close = old_connection_close
@@ -58,16 +60,18 @@ class TestJail(unittest.TestCase):
 class TestTaskWrapper(unittest.TestCase):
 
     def test_task_wrapper_attrs(self):
-        tw = TaskWrapper(str(uuid4()), str(uuid4()), mytask, [1], {"f": "x"})
+        tw = TaskWrapper(gen_unique_id(), gen_unique_id(),
+                         mytask, [1], {"f": "x"})
         for attr in ("task_name", "task_id", "args", "kwargs", "logger"):
             self.assertTrue(getattr(tw, attr, None))
 
     def test_task_wrapper_repr(self):
-        tw = TaskWrapper(str(uuid4()), str(uuid4()), mytask, [1], {"f": "x"})
+        tw = TaskWrapper(gen_unique_id(), gen_unique_id(),
+                         mytask, [1], {"f": "x"})
         self.assertTrue(repr(tw))
 
     def test_task_wrapper_mail_attrs(self):
-        tw = TaskWrapper(str(uuid4()), str(uuid4()), mytask, [], {})
+        tw = TaskWrapper(gen_unique_id(), gen_unique_id(), mytask, [], {})
         x = tw.success_msg % {"name": tw.task_name,
                               "id": tw.task_id,
                               "return_value": 10}
@@ -84,7 +88,7 @@ class TestTaskWrapper(unittest.TestCase):
         self.assertTrue(x)
 
     def test_from_message(self):
-        body = {"task": "cu.mytask", "id": str(uuid4()),
+        body = {"task": "cu.mytask", "id": gen_unique_id(),
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
         m = BaseMessage(body=simplejson.dumps(body), backend="foo",
                         content_type="application/json",
@@ -101,7 +105,7 @@ class TestTaskWrapper(unittest.TestCase):
         self.assertTrue(tw.logger)
 
     def test_from_message_nonexistant_task(self):
-        body = {"task": "cu.mytask.doesnotexist", "id": str(uuid4()),
+        body = {"task": "cu.mytask.doesnotexist", "id": gen_unique_id(),
                 "args": [2], "kwargs": {u"æØåveéðƒeæ": "bar"}}
         m = BaseMessage(body=simplejson.dumps(body), backend="foo",
                         content_type="application/json",
@@ -110,7 +114,7 @@ class TestTaskWrapper(unittest.TestCase):
                           m, m.decode())
 
     def test_execute(self):
-        tid = str(uuid4())
+        tid = gen_unique_id(),
         tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
         self.assertEquals(tw.execute(), 256)
         meta = TaskMeta.objects.get(task_id=tid)
@@ -118,7 +122,7 @@ class TestTaskWrapper(unittest.TestCase):
         self.assertEquals(meta.status, "DONE")
 
     def test_execute_fail(self):
-        tid = str(uuid4())
+        tid = gen_unique_id(),
         tw = TaskWrapper("cu.mytask-raising", tid, mytask_raising, [4],
                          {"f": "x"})
         self.assertTrue(isinstance(tw.execute(), ExceptionInfo))
@@ -127,7 +131,7 @@ class TestTaskWrapper(unittest.TestCase):
         self.assertTrue(isinstance(meta.result, KeyError))
 
     def test_execute_using_pool(self):
-        tid = str(uuid4())
+        tid = gen_unique_id(),
         tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
         p = TaskPool(2)
         p.start()
@@ -136,7 +140,7 @@ class TestTaskWrapper(unittest.TestCase):
         p.stop()
 
     def test_default_kwargs(self):
-        tid = str(uuid4())
+        tid = gen_unique_id(),
         tw = TaskWrapper("cu.mytask", tid, mytask, [4], {"f": "x"})
         self.assertEquals(tw.extend_with_default_kwargs(10, "some_logfile"), {
             "f": "x",

+ 10 - 0
celery/utils.py

@@ -3,6 +3,7 @@
 Utility functions
 
 """
+import uuid
 
 
 def chunks(it, n):
@@ -28,3 +29,12 @@ def chunks(it, n):
             acc = []
         acc.append(item)
     yield acc
+
+
+def gen_unique_id(self):
+    """Generate a unique id, having - hopefully - a very small chance of
+    collission.
+    
+    For now this is provided by :func:`uuid.uuid4`.
+    """
+    return str(uuid.uuid4())