Browse Source

AsyncResult.task_id -> AsyncResult.id (also TasksetResult.taskset_id -> .id

Ask Solem 13 years ago
parent
commit
b398ba3994

+ 2 - 2
Changelog

@@ -1118,7 +1118,7 @@ Fixes
 * SQLAlchemy result backend: `date_done` was no longer part of the results as it had
   been accidentally removed.  It is now available again (Issue #325).
 
-* SQLAlchemy result backend: Added unique constraint on `Task.task_id` and
+* SQLAlchemy result backend: Added unique constraint on `Task.id` and
   `TaskSet.taskset_id`.  Tables needs to be recreated for this to take effect.
 
 * Fixed exception raised when iterating on the result of ``TaskSet.apply()``.
@@ -4842,7 +4842,7 @@ News
 * **IMPORTANT** The `subtask_ids` attribute on the `TaskSetResult`
   instance has been removed. To get this information instead use:
 
-        >>> subtask_ids = [subtask.task_id for subtask in ts_res.subtasks]
+        >>> subtask_ids = [subtask.id for subtask in ts_res.subtasks]
 
 * `Taskset.run()` now respects extra message options from the task class.
 

+ 10 - 1
celery/backends/base.py

@@ -207,10 +207,19 @@ class BaseBackend(object):
         pass
 
     def on_chord_apply(self, setid, body, result=None, **kwargs):
-        kwargs["result"] = [r.task_id for r in result]
+        kwargs["result"] = [r.id for r in result]
         self.app.tasks["celery.chord_unlock"].apply_async((setid, body, ),
                                                           kwargs, countdown=1)
 
+    def _serializable_child(self):
+        if isinstance(node, ResultSet):
+            return (node, )
+
+    def current_task_children(self):
+        current = current_task()
+        if current:
+            return
+
     def __reduce__(self, args=(), kwargs={}):
         return (unpickle_backend, (self.__class__, args, kwargs))
 

+ 1 - 2
celery/beat.py

@@ -180,8 +180,7 @@ class Scheduler(object):
                                   traceback.format_stack(),
                                   exc_info=True)
             else:
-                self.logger.debug("%s sent. id->%s", entry.task,
-                                                     result.task_id)
+                self.logger.debug("%s sent. id->%s", entry.task, result.id)
         return next_time_to_run
 
     def tick(self):

+ 1 - 1
celery/bin/celeryctl.py

@@ -202,7 +202,7 @@ class apply(Command):
                                  routing_key=kw.get("routing_key"),
                                  eta=maybe_iso8601(kw.get("eta")),
                                  expires=expires)
-        self.out(res.task_id)
+        self.out(res.id)
 apply = command(apply)
 
 

+ 2 - 2
celery/contrib/abortable.py

@@ -111,7 +111,7 @@ class AbortableAsyncResult(AsyncResult):
 
     def is_aborted(self):
         """Returns :const:`True` if the task is (being) aborted."""
-        return self.backend.get_status(self.task_id) == ABORTED
+        return self.state == ABORTED
 
     def abort(self):
         """Set the state of the task to :const:`ABORTED`.
@@ -126,7 +126,7 @@ class AbortableAsyncResult(AsyncResult):
         """
         # TODO: store_result requires all four arguments to be set,
         # but only status should be updated here
-        return self.backend.store_result(self.task_id, result=None,
+        return self.backend.store_result(self.id, result=None,
                                          status=ABORTED, traceback=None)
 
 

+ 1 - 1
celery/contrib/batches.py

@@ -117,7 +117,7 @@ class SimpleRequest(object):
 
     @classmethod
     def from_request(cls, request):
-        return cls(request.task_id, request.task_name, request.args,
+        return cls(request.id, request.name, request.args,
                    request.kwargs, request.delivery_info, request.hostname)
 
 

+ 49 - 29
celery/result.py

@@ -33,7 +33,7 @@ def _unpickle_result(task_id, task_name):
 class AsyncResult(object):
     """Query task state.
 
-    :param task_id: see :attr:`task_id`.
+    :param id: see :attr:`id`.
     :keyword backend: see :attr:`backend`.
 
     """
@@ -41,21 +41,24 @@ class AsyncResult(object):
     #: Error raised for timeouts.
     TimeoutError = TimeoutError
 
-    #: The task uuid.
-    task_id = None
+    #: The task's UUID.
+    id = None
 
     #: The task result backend to use.
     backend = None
 
-    def __init__(self, task_id, backend=None, task_name=None, app=None):
+    def __init__(self, id, backend=None, task_name=None, app=None):
         self.app = app_or_default(app)
-        self.task_id = task_id
+        self.id = id
         self.backend = backend or self.app.backend
         self.task_name = task_name
 
+    def serializable(self):
+        return self.id, []
+
     def forget(self):
         """Forget about (and possibly remove the result of) this task."""
-        self.backend.forget(self.task_id)
+        self.backend.forget(self.id)
 
     def revoke(self, connection=None):
         """Send revoke signal to all workers.
@@ -64,7 +67,7 @@ class AsyncResult(object):
         task, *must* ignore it.
 
         """
-        self.app.control.revoke(self.task_id, connection=connection)
+        self.app.control.revoke(self.id, connection=connection)
 
     def get(self, timeout=None, propagate=True, interval=0.5):
         """Wait until task is ready, and return its result.
@@ -90,9 +93,9 @@ class AsyncResult(object):
         be re-raised.
 
         """
-        return self.backend.wait_for(self.task_id, timeout=timeout,
-                                                   propagate=propagate,
-                                                   interval=interval)
+        return self.backend.wait_for(self.id, timeout=timeout,
+                                              propagate=propagate,
+                                              interval=interval)
     wait = get  # deprecated alias to :meth:`get`.
 
     def collect(self, timeout=None, propagate=True):
@@ -156,29 +159,29 @@ class AsyncResult(object):
         return self.state == states.FAILURE
 
     def __str__(self):
-        """`str(self) -> self.task_id`"""
-        return self.task_id
+        """`str(self) -> self.id`"""
+        return self.id
 
     def __hash__(self):
-        """`hash(self) -> hash(self.task_id)`"""
-        return hash(self.task_id)
+        """`hash(self) -> hash(self.id)`"""
+        return hash(self.id)
 
     def __repr__(self):
-        return "<AsyncResult: %s>" % self.task_id
+        return "<AsyncResult: %s>" % self.id
 
     def __eq__(self, other):
         if isinstance(other, self.__class__):
-            return self.task_id == other.task_id
-        return other == self.task_id
+            return self.id == other.id
+        return other == self.id
 
     def __copy__(self):
-        return self.__class__(self.task_id, backend=self.backend)
+        return self.__class__(self.id, backend=self.backend)
 
     def __reduce__(self):
         if self.task_name:
-            return (_unpickle_result, (self.task_id, self.task_name))
+            return (_unpickle_result, (self.id, self.task_name))
         else:
-            return (self.__class__, (self.task_id, self.backend,
+            return (self.__class__, (self.id, self.backend,
                                      None, self.app))
 
     @property
@@ -190,13 +193,13 @@ class AsyncResult(object):
         """When the task has been executed, this contains the return value.
         If the task raised an exception, this will be the exception
         instance."""
-        return self.backend.get_result(self.task_id)
+        return self.backend.get_result(self.id)
     info = result
 
     @property
     def traceback(self):
         """Get the traceback of a failed task."""
-        return self.backend.get_traceback(self.task_id)
+        return self.backend.get_traceback(self.id)
 
     @property
     def state(self):
@@ -228,8 +231,15 @@ class AsyncResult(object):
                 then contains the tasks return value.
 
         """
-        return self.backend.get_status(self.task_id)
+        return self.backend.get_status(self.id)
     status = state
+
+    def _get_task_id(self):
+        return self.id
+
+    def _set_task_id(self, id):
+        self.id = id
+    task_id = property(_get_task_id, _set_task_id)
 BaseAsyncResult = AsyncResult  # for backwards compatibility.
 
 
@@ -359,7 +369,7 @@ class ResultSet(object):
 
         """
         elapsed = 0.0
-        results = OrderedDict((result.task_id, copy(result))
+        results = OrderedDict((result.id, copy(result))
                                 for result in self.results)
 
         while results:
@@ -439,7 +449,7 @@ class ResultSet(object):
 
         """
         backend = self.results[0].backend
-        ids = [result.task_id for result in self.results]
+        ids = [result.id for result in self.results]
         return backend.get_many(ids, timeout=timeout, interval=interval)
 
     def join_native(self, timeout=None, propagate=True, interval=0.5):
@@ -528,23 +538,33 @@ class TaskSetResult(ResultSet):
     def __reduce__(self):
         return (self.__class__, (self.taskset_id, self.results))
 
+    def serializable(self):
+        return self.id, [r.serializable() for r in self.results]
+
     @classmethod
     def restore(self, taskset_id, backend=None):
         """Restore previously saved taskset result."""
         return (backend or current_app.backend).restore_taskset(taskset_id)
 
+    def _get_taskset_id(self):
+        return self.id
+
+    def _set_taskset_id(self, id):
+        self.taskset_id = id
+    taskset_id = property(_get_taskset_id, _set_taskset_id)
+
 
 class EagerResult(AsyncResult):
     """Result that we know has already been executed."""
 
-    def __init__(self, task_id, ret_value, state, traceback=None):
-        self.task_id = task_id
+    def __init__(self, id, ret_value, state, traceback=None):
+        self.id = id
         self._result = ret_value
         self._state = state
         self._traceback = traceback
 
     def __reduce__(self):
-        return (self.__class__, (self.task_id, self._result,
+        return (self.__class__, (self.id, self._result,
                                  self._state, self._traceback))
 
     def __copy__(self):
@@ -570,7 +590,7 @@ class EagerResult(AsyncResult):
         self._state = states.REVOKED
 
     def __repr__(self):
-        return "<EagerResult: %s>" % self.task_id
+        return "<EagerResult: %s>" % self.id
 
     @property
     def result(self):

+ 7 - 7
celery/tests/test_slow/test_buckets.py

@@ -20,16 +20,16 @@ skip_if_disabled = partial(skip_if_environ("SKIP_RLIMITS"))
 
 class MockJob(object):
 
-    def __init__(self, task_id, task_name, args, kwargs):
-        self.task_id = task_id
-        self.task_name = task_name
+    def __init__(self, id, name, args, kwargs):
+        self.id = id
+        self.name = name
         self.args = args
         self.kwargs = kwargs
 
     def __eq__(self, other):
         if isinstance(other, self.__class__):
-            return bool(self.task_id == other.task_id \
-                    and self.task_name == other.task_name \
+            return bool(self.id == other.id \
+                    and self.name == other.name \
                     and self.args == other.args \
                     and self.kwargs == other.kwargs)
         else:
@@ -37,7 +37,7 @@ class MockJob(object):
 
     def __repr__(self):
         return "<MockJob: task:%s id:%s args:%s kwargs:%s" % (
-                self.task_name, self.task_id, self.args, self.kwargs)
+                self.name, self.id, self.args, self.kwargs)
 
 
 class test_TokenBucketQueue(Case):
@@ -241,7 +241,7 @@ class test_TaskBucket(Case):
 
         got_ajobs = 0
         for job in (b.get() for i in xrange(20)):
-            if job.task_name == TaskA.name:
+            if job.name == TaskA.name:
                 got_ajobs += 1
 
         self.assertGreater(got_ajobs, 2)

+ 4 - 4
celery/tests/test_task/__init__.py

@@ -200,13 +200,13 @@ class TestCeleryTasks(Case):
         task_id = uuid()
         result = retry_task.AsyncResult(task_id)
         self.assertEqual(result.backend, retry_task.backend)
-        self.assertEqual(result.task_id, task_id)
+        self.assertEqual(result.id, task_id)
 
     def assertNextTaskDataEqual(self, consumer, presult, task_name,
             test_eta=False, test_expires=False, **kwargs):
         next_task = consumer.fetch()
         task_data = next_task.decode()
-        self.assertEqual(task_data["id"], presult.task_id)
+        self.assertEqual(task_data["id"], presult.id)
         self.assertEqual(task_data["task"], task_name)
         task_kwargs = task_data.get("kwargs", {})
         if test_eta:
@@ -285,7 +285,7 @@ class TestCeleryTasks(Case):
         self.assertIsNone(consumer.fetch())
 
         self.assertFalse(presult.successful())
-        T1.backend.mark_as_done(presult.task_id, result=None)
+        T1.backend.mark_as_done(presult.id, result=None)
         self.assertTrue(presult.successful())
 
         publisher = T1.get_publisher()
@@ -418,7 +418,7 @@ class TestTaskSet(Case):
             m = consumer.fetch().payload
             self.assertDictContainsSubset({"taskset": taskset_id,
                                            "task": increment_counter.name,
-                                           "id": subtask.task_id}, m)
+                                           "id": subtask.id}, m)
             increment_counter(
                     increment_by=m.get("kwargs", {}).get("increment_by"))
         self.assertEqual(increment_counter.count, sum(xrange(1, 10)))

+ 1 - 1
celery/tests/test_task/test_chord.py

@@ -108,7 +108,7 @@ class test_chord(AppCase):
         x = chord(add.subtask((i, i)) for i in xrange(10))
         body = add.subtask((2, ))
         result = x(body)
-        self.assertEqual(result.task_id, body.options["task_id"])
+        self.assertEqual(result.id, body.options["task_id"])
         self.assertTrue(chord.Chord.apply_async.call_count)
 
 

+ 4 - 4
celery/tests/test_task/test_result.py

@@ -56,11 +56,11 @@ class TestAsyncResult(AppCase):
     def test_reduce(self):
         a1 = AsyncResult("uuid", task_name=mytask.name)
         restored = pickle.loads(pickle.dumps(a1))
-        self.assertEqual(restored.task_id, "uuid")
+        self.assertEqual(restored.id, "uuid")
         self.assertEqual(restored.task_name, mytask.name)
 
         a2 = AsyncResult("uuid")
-        self.assertEqual(pickle.loads(pickle.dumps(a2)).task_id, "uuid")
+        self.assertEqual(pickle.loads(pickle.dumps(a2)).id, "uuid")
 
     def test_successful(self):
         ok_res = AsyncResult(self.task1["id"])
@@ -271,7 +271,7 @@ class TestTaskSetResult(AppCase):
         subtasks = [AsyncResult(uuid(), backend=backend)
                         for i in range(10)]
         ts = TaskSetResult(uuid(), subtasks)
-        backend.ids = [subtask.task_id for subtask in subtasks]
+        backend.ids = [subtask.id for subtask in subtasks]
         res = ts.join_native()
         self.assertEqual(res, range(10))
 
@@ -280,7 +280,7 @@ class TestTaskSetResult(AppCase):
         subtasks = [AsyncResult(uuid(), backend=backend)
                         for i in range(10)]
         ts = TaskSetResult(uuid(), subtasks)
-        backend.ids = [subtask.task_id for subtask in subtasks]
+        backend.ids = [subtask.id for subtask in subtasks]
         self.assertEqual(len(list(ts.iter_native())), 10)
 
     def test_iterate_yields(self):

+ 3 - 3
celery/tests/test_task/test_task_abortable.py

@@ -15,18 +15,18 @@ class TestAbortableTask(Case):
     def test_async_result_is_abortable(self):
         t = MyAbortableTask()
         result = t.apply_async()
-        tid = result.task_id
+        tid = result.id
         self.assertIsInstance(t.AsyncResult(tid), AbortableAsyncResult)
 
     def test_is_not_aborted(self):
         t = MyAbortableTask()
         result = t.apply_async()
-        tid = result.task_id
+        tid = result.id
         self.assertFalse(t.is_aborted(task_id=tid))
 
     def test_abort_yields_aborted(self):
         t = MyAbortableTask()
         result = t.apply_async()
         result.abort()
-        tid = result.task_id
+        tid = result.id
         self.assertTrue(t.is_aborted(task_id=tid))

+ 2 - 2
celery/tests/test_worker/__init__.py

@@ -344,7 +344,7 @@ class test_Consumer(Case):
 
         in_bucket = self.ready_queue.get_nowait()
         self.assertIsInstance(in_bucket, Request)
-        self.assertEqual(in_bucket.task_name, foo_task.name)
+        self.assertEqual(in_bucket.name, foo_task.name)
         self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
         self.assertTrue(self.eta_schedule.empty())
 
@@ -591,7 +591,7 @@ class test_Consumer(Case):
         eta, priority, entry = in_hold
         task = entry.args[0]
         self.assertIsInstance(task, Request)
-        self.assertEqual(task.task_name, foo_task.name)
+        self.assertEqual(task.name, foo_task.name)
         self.assertEqual(task.execute(), 2 * 4 * 8)
         with self.assertRaises(Empty):
             self.ready_queue.get_nowait()

+ 1 - 1
celery/tests/test_worker/test_worker_control.py

@@ -334,7 +334,7 @@ class test_ControlPanel(Case):
 
     def test_revoke_terminate(self):
         request = Mock()
-        request.task_id = tid = uuid()
+        request.id = tid = uuid()
         state.active_requests.add(request)
         try:
             r = control.revoke(Mock(), tid, terminate=True)

+ 21 - 21
celery/tests/test_worker/test_worker_job.py

@@ -257,16 +257,16 @@ class test_TaskRequest(Case):
         tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
                          expires=datetime.utcnow() - timedelta(days=1))
         tw.revoked()
-        self.assertIn(tw.task_id, revoked)
-        self.assertEqual(mytask.backend.get_status(tw.task_id),
+        self.assertIn(tw.id, revoked)
+        self.assertEqual(mytask.backend.get_status(tw.id),
                          states.REVOKED)
 
     def test_revoked_expires_not_expired(self):
         tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"},
                          expires=datetime.utcnow() + timedelta(days=1))
         tw.revoked()
-        self.assertNotIn(tw.task_id, revoked)
-        self.assertNotEqual(mytask.backend.get_status(tw.task_id),
+        self.assertNotIn(tw.id, revoked)
+        self.assertNotEqual(mytask.backend.get_status(tw.id),
                          states.REVOKED)
 
     def test_revoked_expires_ignore_result(self):
@@ -275,8 +275,8 @@ class test_TaskRequest(Case):
                          expires=datetime.utcnow() - timedelta(days=1))
         try:
             tw.revoked()
-            self.assertIn(tw.task_id, revoked)
-            self.assertNotEqual(mytask.backend.get_status(tw.task_id),
+            self.assertIn(tw.id, revoked)
+            self.assertNotEqual(mytask.backend.get_status(tw.id),
                                 states.REVOKED)
 
         finally:
@@ -338,14 +338,14 @@ class test_TaskRequest(Case):
 
     def test_revoked(self):
         tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-        revoked.add(tw.task_id)
+        revoked.add(tw.id)
         self.assertTrue(tw.revoked())
         self.assertTrue(tw._already_revoked)
         self.assertTrue(tw.acknowledged)
 
     def test_execute_does_not_execute_revoked(self):
         tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-        revoked.add(tw.task_id)
+        revoked.add(tw.id)
         tw.execute()
 
     def test_execute_acks_late(self):
@@ -359,7 +359,7 @@ class test_TaskRequest(Case):
 
     def test_execute_using_pool_does_not_execute_revoked(self):
         tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
-        revoked.add(tw.task_id)
+        revoked.add(tw.id)
         tw.execute_using_pool(None)
 
     def test_on_accepted_acks_early(self):
@@ -411,7 +411,7 @@ class test_TaskRequest(Case):
         tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
         exc_info = get_ei()
         tw.on_failure(exc_info)
-        self.assertEqual(mytask.backend.get_status(tw.task_id),
+        self.assertEqual(mytask.backend.get_status(tw.id),
                          states.FAILURE)
 
         mytask.ignore_result = True
@@ -419,7 +419,7 @@ class test_TaskRequest(Case):
             exc_info = get_ei()
             tw = TaskRequest(mytask.name, uuid(), [1], {"f": "x"})
             tw.on_failure(exc_info)
-            self.assertEqual(mytask.backend.get_status(tw.task_id),
+            self.assertEqual(mytask.backend.get_status(tw.id),
                              states.PENDING)
         finally:
             mytask.ignore_result = False
@@ -464,7 +464,7 @@ class test_TaskRequest(Case):
                       tw.logger.warnings[0])
         tw.on_timeout(soft=False, timeout=1337)
         self.assertIn("Hard time limit (1337s) exceeded", tw.logger.errors[0])
-        self.assertEqual(mytask.backend.get_status(tw.task_id),
+        self.assertEqual(mytask.backend.get_status(tw.id),
                          states.FAILURE)
 
         mytask.ignore_result = True
@@ -473,7 +473,7 @@ class test_TaskRequest(Case):
             tw.logger = MockLogger()
         finally:
             tw.on_timeout(soft=True, timeout=1336)
-            self.assertEqual(mytask.backend.get_status(tw.task_id),
+            self.assertEqual(mytask.backend.get_status(tw.id),
                              states.PENDING)
             mytask.ignore_result = False
 
@@ -534,13 +534,13 @@ class test_TaskRequest(Case):
 
     def test_task_wrapper_mail_attrs(self):
         tw = TaskRequest(mytask.name, uuid(), [], {})
-        x = tw.success_msg % {"name": tw.task_name,
-                              "id": tw.task_id,
+        x = tw.success_msg % {"name": tw.name,
+                              "id": tw.id,
                               "return_value": 10,
                               "runtime": 0.3641}
         self.assertTrue(x)
-        x = tw.error_msg % {"name": tw.task_name,
-                           "id": tw.task_id,
+        x = tw.error_msg % {"name": tw.name,
+                           "id": tw.id,
                            "exc": "FOOBARBAZ",
                            "traceback": "foobarbaz"}
         self.assertTrue(x)
@@ -554,8 +554,8 @@ class test_TaskRequest(Case):
                           content_encoding="utf-8")
         tw = TaskRequest.from_message(m, m.decode())
         self.assertIsInstance(tw, Request)
-        self.assertEqual(tw.task_name, body["task"])
-        self.assertEqual(tw.task_id, body["id"])
+        self.assertEqual(tw.name, body["task"])
+        self.assertEqual(tw.id, body["id"])
         self.assertEqual(tw.args, body["args"])
         us = from_utf8(us)
         if sys.version_info < (2, 6):
@@ -668,11 +668,11 @@ class test_TaskRequest(Case):
                     "f": "x",
                     "logfile": "some_logfile",
                     "loglevel": 10,
-                    "task_id": tw.task_id,
+                    "task_id": tw.id,
                     "task_retries": 0,
                     "task_is_eager": False,
                     "delivery_info": {"exchange": None, "routing_key": None},
-                    "task_name": tw.task_name})
+                    "task_name": tw.name})
 
     def _test_on_failure(self, exception):
         app = app_or_default()

+ 5 - 5
celery/tests/test_worker/test_worker_mediator.py

@@ -14,8 +14,8 @@ from celery.tests.utils import Case
 
 class MockTask(object):
     hostname = "harness.com"
-    task_id = 1234
-    task_name = "mocktask"
+    id = 1234
+    name = "mocktask"
 
     def __init__(self, value, **kwargs):
         self.value = value
@@ -23,7 +23,7 @@ class MockTask(object):
     on_ack = Mock()
 
     def revoked(self):
-        if self.task_id in revoked_tasks:
+        if self.id in revoked_tasks:
             self.on_ack()
             return True
         return False
@@ -117,8 +117,8 @@ class test_Mediator(Case):
 
         m = Mediator(ready_queue, mycallback)
         t = MockTask("Jerry Seinfeld")
-        t.task_id = uuid()
-        revoked_tasks.add(t.task_id)
+        t.id = uuid()
+        revoked_tasks.add(t.id)
         ready_queue.put(t)
 
         m.body()

+ 2 - 2
celery/tests/test_worker/test_worker_state.py

@@ -85,8 +85,8 @@ class test_Persistent(StateResetCase):
 
 class SimpleReq(object):
 
-    def __init__(self, task_name):
-        self.task_name = task_name
+    def __init__(self, name):
+        self.name = name
 
 
 class test_state(StateResetCase):

+ 3 - 3
celery/worker/buckets.py

@@ -72,9 +72,9 @@ class TaskBucket(object):
     def put(self, request):
         """Put a :class:`~celery.worker.job.Request` into
         the appropiate bucket."""
-        if request.task_name not in self.buckets:
-            self.add_bucket_for_type(request.task_name)
-        self.buckets[request.task_name].put_nowait(request)
+        if request.name not in self.buckets:
+            self.add_bucket_for_type(request.name)
+        self.buckets[request.name].put_nowait(request)
         with self.mutex:
             self.not_empty.notify()
     put_nowait = put

+ 2 - 2
celery/worker/consumer.py

@@ -371,8 +371,8 @@ class Consumer(object):
             self.logger.info("Got task from broker: %s", task.shortinfo())
 
         if self.event_dispatcher.enabled:
-            self.event_dispatcher.send("task-received", uuid=task.task_id,
-                    name=task.task_name, args=safe_repr(task.args),
+            self.event_dispatcher.send("task-received", uuid=task.id,
+                    name=task.name, args=safe_repr(task.args),
                     kwargs=safe_repr(task.kwargs),
                     retries=task.request_dict.get("retries", 0),
                     eta=task.eta and task.eta.isoformat(),

+ 1 - 1
celery/worker/control.py

@@ -41,7 +41,7 @@ def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
     if terminate:
         signum = _signals.signum(signal or "TERM")
         for request in state.active_requests:
-            if request.task_id == task_id:
+            if request.id == task_id:
                 action = "terminated (%s)" % (signum, )
                 request.terminate(panel.consumer.pool, signal=signum)
                 break

+ 4 - 5
celery/worker/mediator.py

@@ -70,16 +70,15 @@ class Mediator(bgThread):
             return
 
         if self._does_debug:
-            self.logger.debug(
-                "Mediator: Running callback for task: %s[%s]" % (
-                    task.task_name, task.task_id))
+            self.logger.debug("Mediator: Running callback for task: %s[%s]",
+                              task.name, task.id)
 
         try:
             self.callback(task)
         except Exception, exc:
             self.logger.error("Mediator callback raised exception %r",
                               exc, exc_info=True,
-                              extra={"data": {"id": task.task_id,
-                                              "name": task.task_name,
+                              extra={"data": {"id": task.id,
+                                              "name": task.name,
                                               "hostname": task.hostname}})
     move = body   # XXX compat

+ 1 - 1
celery/worker/state.py

@@ -55,7 +55,7 @@ task_reserved = reserved_requests.add
 def task_accepted(request):
     """Updates global state when a task has been accepted."""
     active_requests.add(request)
-    total_count[request.task_name] += 1
+    total_count[request.name] += 1
 
 
 def task_ready(request):

+ 3 - 3
funtests/suite/test_basic.py

@@ -30,7 +30,7 @@ class test_basic(WorkerCase):
     def test_dump_active(self, sleep=1):
         r1 = tasks.sleeptask.delay(sleep)
         r2 = tasks.sleeptask.delay(sleep)
-        self.ensure_accepted(r1.task_id)
+        self.ensure_accepted(r1.id)
         active = self.inspect().active(safe=True)
         self.assertTrue(active)
         active = active[self.worker.hostname]
@@ -43,7 +43,7 @@ class test_basic(WorkerCase):
         r2 = tasks.sleeptask.delay(sleep)
         r3 = tasks.sleeptask.delay(sleep)
         r4 = tasks.sleeptask.delay(sleep)
-        self.ensure_accepted(r1.task_id)
+        self.ensure_accepted(r1.id)
         reserved = self.inspect().reserved(safe=True)
         self.assertTrue(reserved)
         reserved = reserved[self.worker.hostname]
@@ -53,7 +53,7 @@ class test_basic(WorkerCase):
     def test_dump_schedule(self, countdown=1):
         r1 = tasks.add.apply_async((2, 2), countdown=countdown)
         r2 = tasks.add.apply_async((2, 2), countdown=countdown)
-        self.ensure_scheduled(r1.task_id, interval=0.1)
+        self.ensure_scheduled(r1.id, interval=0.1)
         schedule = self.inspect().scheduled(safe=True)
         self.assertTrue(schedule)
         schedule = schedule[self.worker.hostname]