Browse Source

Remove unneeded meta argument from celery.pool handlers, + make on_ready handle on_return as well, as on_return is not really needed anymore.

Ask Solem 15 years ago
parent
commit
b3338bffe8
3 changed files with 18 additions and 36 deletions
  1. 9 21
      celery/pool.py
  2. 1 1
      celery/tests/test_worker_job.py
  3. 8 14
      celery/worker/job.py

+ 9 - 21
celery/pool.py

@@ -10,7 +10,7 @@ import multiprocessing
 
 from multiprocessing.pool import Pool, worker
 from celery.datastructures import ExceptionInfo
-from celery.utils import gen_unique_id
+from celery.utils import noop
 from functools import partial as curry
 from operator import isNumberType
 
@@ -222,7 +222,7 @@ class TaskPool(object):
                     dead_count))
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
-            errbacks=None, on_ack=None, meta=None):
+            errbacks=None, on_ack=noop):
         """Equivalent of the :func:``apply`` built-in function.
 
         All ``callbacks`` and ``errbacks`` should complete immediately since
@@ -233,11 +233,8 @@ class TaskPool(object):
         kwargs = kwargs or {}
         callbacks = callbacks or []
         errbacks = errbacks or []
-        meta = meta or {}
-
-        on_return = curry(self.on_return, callbacks, errbacks,
-                          on_ack, meta)
 
+        on_ready = curry(self.on_ready, callbacks, errbacks, on_ack)
 
         self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
             target, args, kwargs))
@@ -245,27 +242,18 @@ class TaskPool(object):
         self.replace_dead_workers()
 
         return self._pool.apply_async(target, args, kwargs,
-                                        callback=on_return)
-
-    def on_return(self, callbacks, errbacks, on_ack, meta, ret_value):
-        """What to do when the process returns."""
-
-        # Acknowledge the task as being processed.
-        if on_ack:
-            on_ack()
-
-        self.on_ready(callbacks, errbacks, meta, ret_value)
+                                        callback=on_ready)
 
-    def on_ready(self, callbacks, errbacks, meta, ret_value):
+    def on_ready(self, callbacks, errbacks, on_ack, ret_value):
         """What to do when a worker task is ready and its return value has
         been collected."""
+        # Acknowledge the task as being processed.
+        on_ack()
 
         if isinstance(ret_value, ExceptionInfo):
             if isinstance(ret_value.exception, (
                     SystemExit, KeyboardInterrupt)):
                 raise ret_value.exception
-            for errback in errbacks:
-                errback(ret_value, meta)
+            [errback(ret_value) for errback in errbacks]
         else:
-            for callback in callbacks:
-                callback(ret_value, meta)
+            [callback(ret_value) for callback in callbacks]

+ 1 - 1
celery/tests/test_worker_job.py

@@ -232,7 +232,7 @@ class TestTaskWrapper(unittest.TestCase):
         from celery import conf
         conf.SEND_CELERY_TASK_ERROR_EMAILS = True
 
-        tw.on_failure(exc_info, {"task_id": tid, "task_name": "cu.mytask"})
+        tw.on_failure(exc_info)
         logvalue = logfh.getvalue()
         self.assertTrue("cu.mytask" in logvalue)
         self.assertTrue(tid in logvalue)

+ 8 - 14
celery/worker/job.py

@@ -175,30 +175,25 @@ class TaskWrapper(object):
         wrapper = self._executeable(loglevel, logfile)
         return pool.apply_async(wrapper,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
-                on_ack=self.on_ack,
-                meta={"task_id": self.task_id, "task_name": self.task_name})
+                on_ack=self.on_ack)
     
-    def on_success(self, ret_value, meta):
+    def on_success(self, ret_value):
         """The handler used if the task was successfully processed (
         without raising an exception)."""
-        task_id = meta.get("task_id")
-        task_name = meta.get("task_name")
         msg = self.success_msg.strip() % {
-                "id": task_id,
-                "name": task_name,
+                "id": self.task_id,
+                "name": self.task_name,
                 "return_value": ret_value}
         self.logger.info(msg)
 
-    def on_failure(self, exc_info, meta):
+    def on_failure(self, exc_info):
         """The handler used if the task raised an exception."""
         from celery.conf import SEND_CELERY_TASK_ERROR_EMAILS
 
-        task_id = meta.get("task_id")
-        task_name = meta.get("task_name")
         context = {
             "hostname": socket.gethostname(),
-            "id": task_id,
-            "name": task_name,
+            "id": self.task_id,
+            "name": self.task_name,
             "exc": exc_info.exception,
             "traceback": exc_info.traceback,
             "args": self.args,
@@ -206,11 +201,10 @@ class TaskWrapper(object):
         }
         self.logger.error(self.fail_msg.strip() % context)
 
-        task_obj = tasks.get(task_name, object)
+        task_obj = tasks.get(self.task_name, object)
         send_error_email = SEND_CELERY_TASK_ERROR_EMAILS and not \
                 getattr(task_obj, "disable_error_emails", False)
         if send_error_email:
             subject = self.fail_email_subject.strip() % context
             body = self.fail_email_body.strip() % context
             mail_admins(subject, body, fail_silently=True)
-