Browse Source

celery.worker.job.TaskWrapper renamed -> celery.worker.job.TaskRequest

Much better descripion of the class, amazing it took me so long to find
that name.
Ask Solem 14 years ago
parent
commit
5a899d75bf

+ 5 - 5
Changelog

@@ -705,7 +705,7 @@ Remote control commands
 
         >>> from celery.task.control import broadcast
         >>> broadcast("dump_reserved", reply=True)
-        [{'myworker1': [<TaskWrapper ....>]}]
+        [{'myworker1': [<TaskRequest ....>]}]
 
 * New remote control command: ``dump_schedule``
 
@@ -717,19 +717,19 @@ Remote control commands
         >>> broadcast("dump_schedule", reply=True)
         [{'w1': []},
          {'w3': []},
-         {'w2': ['0. 2010-05-12 11:06:00 pri0 <TaskWrapper:
+         {'w2': ['0. 2010-05-12 11:06:00 pri0 <TaskRequest
                     {name:"opalfeeds.tasks.refresh_feed_slice",
                      id:"95b45760-4e73-4ce8-8eac-f100aa80273a",
                      args:"(<Feeds freq_max:3600 freq_min:60
                                    start:2184.0 stop:3276.0>,)",
                      kwargs:"{'page': 2}"}>']},
-         {'w4': ['0. 2010-05-12 11:00:00 pri0 <TaskWrapper:
+         {'w4': ['0. 2010-05-12 11:00:00 pri0 <TaskRequest
                     {name:"opalfeeds.tasks.refresh_feed_slice",
                      id:"c053480b-58fb-422f-ae68-8d30a464edfe",
                      args:"(<Feeds freq_max:3600 freq_min:60
                                    start:1092.0 stop:2184.0>,)",
                      kwargs:"{\'page\': 1}"}>',
-                '1. 2010-05-12 11:12:00 pri0 <TaskWrapper:
+                '1. 2010-05-12 11:12:00 pri0 <TaskRequest
                     {name:"opalfeeds.tasks.refresh_feed_slice",
                      id:"ab8bc59e-6cf8-44b8-88d0-f1af57789758",
                      args:"(<Feeds freq_max:3600 freq_min:60
@@ -749,7 +749,7 @@ Fixes
   (http://github.com/ask/celery/issues/issue/98)
 
 * Now handles exceptions with unicode messages correctly in
-  ``TaskWrapper.on_failure``.
+  ``TaskRequest.on_failure``.
 
 * Database backend: ``TaskMeta.result``: default value should be ``None``
   not empty string.

+ 1 - 1
celery/task/base.py

@@ -484,7 +484,7 @@ class Task(object):
     def execute(self, wrapper, pool, loglevel, logfile):
         """The method the worker calls to execute the task.
 
-        :param wrapper: A :class:`celery.worker.job.TaskWrapper`.
+        :param wrapper: A :class:`~celery.worker.job.TaskRequest`.
         :param pool: A task pool.
         :param loglevel: Current loglevel.
         :param logfile: Name of the currently used logfile.

+ 9 - 5
celery/task/sets.py

@@ -1,10 +1,10 @@
+from UserList import UserList
+
 from celery import conf
-from celery.execute import apply_async
 from celery.messaging import establish_connection, with_connection
 from celery.messaging import TaskPublisher
-from celery.registry import tasks
 from celery.result import TaskSetResult
-from celery.utils import gen_unique_id, padlist
+from celery.utils import gen_unique_id
 
 
 class subtask(object):
@@ -36,7 +36,7 @@ class subtask(object):
                                      publisher=publisher, **self.options)
 
 
-class TaskSet(object):
+class TaskSet(UserList):
     """A task containing several subtasks, making it possible
     to track how many, or when all of the tasks has been completed.
 
@@ -72,7 +72,7 @@ class TaskSet(object):
             self.task = task
             self.task_name = task.name
 
-        self.tasks = tasks
+        self.data = list(tasks)
         self.total = len(self.tasks)
 
     @with_connection
@@ -129,3 +129,7 @@ class TaskSet(object):
         # This will be filled with EagerResults.
         return TaskSetResult(taskset_id, [task.apply(taskset_id)
                                             for task in self.tasks])
+
+    @property
+    def tasks(self):
+        return self.data

+ 6 - 6
celery/tests/test_worker.py

@@ -9,7 +9,7 @@ from carrot.backends.base import BaseMessage
 from celery import conf
 from celery.utils import gen_unique_id
 from celery.worker import WorkController
-from celery.worker.job import TaskWrapper
+from celery.worker.job import TaskRequest
 from celery.worker.buckets import FastQueue
 from celery.worker.listener import CarrotListener, QoS, RUN
 from celery.worker.scheduler import Scheduler
@@ -243,7 +243,7 @@ class TestCarrotListener(unittest.TestCase):
         l.receive_message(m.decode(), m)
 
         in_bucket = self.ready_queue.get_nowait()
-        self.assertIsInstance(in_bucket, TaskWrapper)
+        self.assertIsInstance(in_bucket, TaskRequest)
         self.assertEqual(in_bucket.task_name, foo_task.name)
         self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
         self.assertTrue(self.eta_schedule.empty())
@@ -325,7 +325,7 @@ class TestCarrotListener(unittest.TestCase):
         in_hold = self.eta_schedule.queue[0]
         self.assertEqual(len(in_hold), 4)
         eta, priority, task, on_accept = in_hold
-        self.assertIsInstance(task, TaskWrapper)
+        self.assertIsInstance(task, TaskRequest)
         self.assertTrue(callable(on_accept))
         self.assertEqual(task.task_name, foo_task.name)
         self.assertEqual(task.execute(), 2 * 4 * 8)
@@ -353,7 +353,7 @@ class TestWorkController(unittest.TestCase):
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
-        task = TaskWrapper.from_message(m, m.decode())
+        task = TaskRequest.from_message(m, m.decode())
         worker.process_task(task)
         worker.pool.stop()
 
@@ -363,7 +363,7 @@ class TestWorkController(unittest.TestCase):
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
-        task = TaskWrapper.from_message(m, m.decode())
+        task = TaskRequest.from_message(m, m.decode())
         worker.process_task(task)
         worker.pool.stop()
 
@@ -373,7 +373,7 @@ class TestWorkController(unittest.TestCase):
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
                            kwargs={})
-        task = TaskWrapper.from_message(m, m.decode())
+        task = TaskRequest.from_message(m, m.decode())
         worker.process_task(task)
         worker.pool.stop()
 

+ 18 - 18
celery/tests/test_worker_job.py

@@ -12,7 +12,7 @@ from celery.log import setup_logger
 from celery.task.base import Task
 from celery.utils import gen_unique_id
 from celery.result import AsyncResult
-from celery.worker.job import WorkerTaskTrace, TaskWrapper
+from celery.worker.job import WorkerTaskTrace, TaskRequest
 from celery.concurrency.processes import TaskPool
 from celery.backends import default_backend
 from celery.exceptions import RetryTaskError, NotRegistered
@@ -102,14 +102,14 @@ class MockEventDispatcher(object):
         self.sent.append(event)
 
 
-class TestTaskWrapper(unittest.TestCase):
+class TestTaskRequest(unittest.TestCase):
 
     def test_task_wrapper_repr(self):
-        tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
         self.assertTrue(repr(tw))
 
     def test_send_event(self):
-        tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
+        tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
         tw.eventer = MockEventDispatcher()
         tw.send_event("task-frobulated")
         self.assertIn("task-frobulated", tw.eventer.sent)
@@ -127,7 +127,7 @@ class TestTaskWrapper(unittest.TestCase):
         job.mail_admins = mock_mail_admins
         conf.CELERY_SEND_TASK_ERROR_EMAILS = True
         try:
-            tw = TaskWrapper(mytask.name, gen_unique_id(), [1], {"f": "x"})
+            tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
             try:
                 raise KeyError("foo")
             except KeyError:
@@ -206,14 +206,14 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_executed_bit(self):
         from celery.worker.job import AlreadyExecutedError
-        tw = TaskWrapper(mytask.name, gen_unique_id(), [], {})
+        tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
         self.assertFalse(tw.executed)
         tw._set_executed_bit()
         self.assertTrue(tw.executed)
         self.assertRaises(AlreadyExecutedError, tw._set_executed_bit)
 
     def test_task_wrapper_mail_attrs(self):
-        tw = TaskWrapper(mytask.name, gen_unique_id(), [], {})
+        tw = TaskRequest(mytask.name, gen_unique_id(), [], {})
         x = tw.success_msg % {"name": tw.task_name,
                               "id": tw.task_id,
                               "return_value": 10}
@@ -235,8 +235,8 @@ class TestTaskWrapper(unittest.TestCase):
         m = BaseMessage(body=simplejson.dumps(body), backend="foo",
                         content_type="application/json",
                         content_encoding="utf-8")
-        tw = TaskWrapper.from_message(m, m.decode())
-        self.assertIsInstance(tw, TaskWrapper)
+        tw = TaskRequest.from_message(m, m.decode())
+        self.assertIsInstance(tw, TaskRequest)
         self.assertEqual(tw.task_name, body["task"])
         self.assertEqual(tw.task_id, body["id"])
         self.assertEqual(tw.args, body["args"])
@@ -251,12 +251,12 @@ class TestTaskWrapper(unittest.TestCase):
         m = BaseMessage(body=simplejson.dumps(body), backend="foo",
                         content_type="application/json",
                         content_encoding="utf-8")
-        self.assertRaises(NotRegistered, TaskWrapper.from_message,
+        self.assertRaises(NotRegistered, TaskRequest.from_message,
                           m, m.decode())
 
     def test_execute(self):
         tid = gen_unique_id()
-        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
+        tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         self.assertEqual(tw.execute(), 256)
         meta = default_backend._get_task_meta_for(tid)
         self.assertEqual(meta["result"], 256)
@@ -264,7 +264,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_success_no_kwargs(self):
         tid = gen_unique_id()
-        tw = TaskWrapper(mytask_no_kwargs.name, tid, [4], {})
+        tw = TaskRequest(mytask_no_kwargs.name, tid, [4], {})
         self.assertEqual(tw.execute(), 256)
         meta = default_backend._get_task_meta_for(tid)
         self.assertEqual(meta["result"], 256)
@@ -272,7 +272,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_success_some_kwargs(self):
         tid = gen_unique_id()
-        tw = TaskWrapper(mytask_some_kwargs.name, tid, [4], {})
+        tw = TaskRequest(mytask_some_kwargs.name, tid, [4], {})
         self.assertEqual(tw.execute(logfile="foobaz.log"), 256)
         meta = default_backend._get_task_meta_for(tid)
         self.assertEqual(some_kwargs_scratchpad.get("logfile"), "foobaz.log")
@@ -281,7 +281,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_ack(self):
         tid = gen_unique_id()
-        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"},
+        tw = TaskRequest(mytask.name, tid, [4], {"f": "x"},
                         on_ack=on_ack)
         self.assertEqual(tw.execute(), 256)
         meta = default_backend._get_task_meta_for(tid)
@@ -291,7 +291,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_fail(self):
         tid = gen_unique_id()
-        tw = TaskWrapper(mytask_raising.name, tid, [4], {"f": "x"})
+        tw = TaskRequest(mytask_raising.name, tid, [4], {"f": "x"})
         self.assertIsInstance(tw.execute(), ExceptionInfo)
         meta = default_backend._get_task_meta_for(tid)
         self.assertEqual(meta["status"], states.FAILURE)
@@ -299,7 +299,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_execute_using_pool(self):
         tid = gen_unique_id()
-        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
+        tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
 
         class MockPool(object):
             target = None
@@ -326,7 +326,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def test_default_kwargs(self):
         tid = gen_unique_id()
-        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
+        tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         self.assertDictEqual(
                 tw.extend_with_default_kwargs(10, "some_logfile"), {
                     "f": "x",
@@ -340,7 +340,7 @@ class TestTaskWrapper(unittest.TestCase):
 
     def _test_on_failure(self, exception):
         tid = gen_unique_id()
-        tw = TaskWrapper(mytask.name, tid, [4], {"f": "x"})
+        tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         try:
             raise exception
         except Exception:

+ 17 - 11
celery/worker/job.py

@@ -135,16 +135,12 @@ def execute_and_trace(task_name, *args, **kwargs):
         platform.set_mp_process_title("celeryd")
 
 
-class TaskWrapper(object):
-    """Class wrapping a task to be passed around and finally
-    executed inside of the worker.
+class TaskRequest(object):
+    """A request for task execution.
 
     :param task_name: see :attr:`task_name`.
-
     :param task_id: see :attr:`task_id`.
-
     :param args: see :attr:`args`
-
     :param kwargs: see :attr:`kwargs`.
 
     .. attribute:: task_name
@@ -163,16 +159,25 @@ class TaskWrapper(object):
 
         Mapping of keyword arguments to apply to the task.
 
+    .. attribute:: on_ack
+
+        Callback called when the task should be acknowledged.
+
     .. attribute:: message
 
         The original message sent. Used for acknowledging the message.
 
-    .. attribute executed
+    .. attribute:: executed
 
         Set to ``True`` if the task has been executed.
         A task should only be executed once.
 
-    .. attribute acknowledged
+    .. attribute:: delivery_info
+
+        Additional delivery info, e.g. the contains the path
+        from producer to consumer.
+
+    .. attribute:: acknowledged
 
         Set to ``True`` if the task has been acknowledged.
 
@@ -202,8 +207,9 @@ class TaskWrapper(object):
         self._already_revoked = False
 
         for opt in ("success_msg", "fail_msg", "fail_email_subject",
-                "fail_email_body", "logger", "eventer"):
+                    "fail_email_body", "logger", "eventer"):
             setattr(self, opt, opts.get(opt, getattr(self, opt, None)))
+
         if not self.logger:
             self.logger = get_default_logger()
 
@@ -227,13 +233,13 @@ class TaskWrapper(object):
 
     @classmethod
     def from_message(cls, message, message_data, logger=None, eventer=None):
-        """Create a :class:`TaskWrapper` from a task message sent by
+        """Create a :class:`TaskRequest` from a task message sent by
         :class:`celery.messaging.TaskPublisher`.
 
         :raises UnknownTaskError: if the message does not describe a task,
             the message is also rejected.
 
-        :returns: :class:`TaskWrapper` instance.
+        :returns: :class:`TaskRequest` instance.
 
         """
         task_name = message_data["task"]

+ 2 - 2
celery/worker/listener.py

@@ -9,7 +9,7 @@ from carrot.connection import AMQPConnectionException
 
 from celery import conf
 from celery.utils import noop, retry_over_time
-from celery.worker.job import TaskWrapper, InvalidTaskError
+from celery.worker.job import TaskRequest, InvalidTaskError
 from celery.worker.control import ControlDispatch
 from celery.worker.heartbeat import Heart
 from celery.events import EventDispatcher
@@ -161,7 +161,7 @@ class CarrotListener(object):
         # Handle task
         if message_data.get("task"):
             try:
-                task = TaskWrapper.from_message(message, message_data,
+                task = TaskRequest.from_message(message, message_data,
                                                 logger=self.logger,
                                                 eventer=self.event_dispatcher)
             except NotRegistered, exc:

+ 2 - 2
docs/internals/worker.rst

@@ -36,7 +36,7 @@ CarrotListener
 Receives messages from the broker using ``carrot``.
 
 When a message is received it's converted into a
-:class:`celery.worker.job.TaskWrapper` object.
+:class:`celery.worker.job.TaskRequest` object.
 
 Tasks with an ETA are entered into the ``eta_schedule``, messages that can
 be immediately processed are moved directly to the ``ready_queue``.
@@ -53,7 +53,7 @@ Mediator
 --------
 The mediator simply moves tasks in the ``ready_queue`` over to the
 task pool for execution using
-:meth:`celery.worker.job.TaskWrapper.execute_using_pool`.
+:meth:`celery.worker.job.TaskRequest.execute_using_pool`.
 
 TaskPool
 --------