Browse Source

Adds Task.execution_strategy

Ask Solem 13 years ago
parent
commit
3f6c390310
3 changed files with 30 additions and 10 deletions
  1. 12 0
      celery/app/task/__init__.py
  2. 15 7
      celery/worker/consumer.py
  3. 3 3
      funtests/benchmarks/bench_worker.py

+ 12 - 0
celery/app/task/__init__.py

@@ -256,6 +256,18 @@ class BaseTask(object):
         """The body of the task executed by workers."""
         raise NotImplementedError("Tasks must define the run method.")
 
+    def execution_strategy(self, app, logger, hostname, eventer):
+        from celery.worker.job import TaskRequest
+        create = TaskRequest.from_message
+
+        def handle_message(message, body, ack):
+            return create(message, body, ack,
+                          app=app, logger=logger,
+                          hostname=hostname, eventer=eventer)
+
+        return handle_message
+
+
     @classmethod
     def get_logger(self, loglevel=None, logfile=None, propagate=False,
             **kwargs):

+ 15 - 7
celery/worker/consumer.py

@@ -86,6 +86,7 @@ import warnings
 from ..app import app_or_default
 from ..datastructures import AttributeDict
 from ..exceptions import NotRegistered
+from ..registry import tasks
 from ..utils import noop
 from ..utils import timer2
 from ..utils.encoding import safe_repr
@@ -295,6 +296,15 @@ class Consumer(object):
         self.channel_errors = conninfo.channel_errors
 
         self._does_info = self.logger.isEnabledFor(logging.INFO)
+        self.strategies = {}
+
+    def update_strategies(self, eventer):
+        S = self.strategies
+        for task in tasks.itervalues():
+            S[task.name] = task.execution_strategy(self.app,
+                                                   self.logger,
+                                                   self.hostname,
+                                                   eventer)
 
     def start(self):
         """Start the consumer.
@@ -414,7 +424,7 @@ class Consumer(object):
                         self._message_report(body, message), exc)
 
         try:
-            body["task"]
+            name = body["task"]
         except (KeyError, TypeError):
             warnings.warn(RuntimeWarning(
                 "Received and deleted unknown message. Wrong destination?!? \
@@ -424,12 +434,7 @@ class Consumer(object):
             return
 
         try:
-            task = TaskRequest.from_message(message, body, ack,
-                                            app=self.app,
-                                            logger=self.logger,
-                                            hostname=self.hostname,
-                                            eventer=self.event_dispatcher)
-
+            task = self.strategies[name](message, body, ack)
         except NotRegistered, exc:
             self.logger.error(UNKNOWN_TASK_ERROR, exc, safe_repr(body),
                               exc_info=sys.exc_info())
@@ -604,6 +609,9 @@ class Consumer(object):
             self.event_dispatcher.copy_buffer(prev_event_dispatcher)
             self.event_dispatcher.flush()
 
+        # reload all task's execution strategies.
+        self.update_strategies(self.event_dispatcher)
+
         # Restart heartbeat thread.
         self.restart_heartbeat()
 

+ 3 - 3
funtests/benchmarks/bench_worker.py

@@ -10,16 +10,16 @@ from celery import Celery
 DEFAULT_ITS = 20000
 
 celery = Celery(__name__)
-celery.conf.update(#BROKER_TRANSPORT="librabbitmq",
+celery.conf.update(BROKER_TRANSPORT="librabbitmq",
                    BROKER_POOL_LIMIT=10,
                    CELERY_PREFETCH_MULTIPLIER=0,
                    CELERY_DISABLE_RATE_LIMITS=True,
-                   #CELERY_DEFAULT_DELIVERY_MODE="transient",
+                   CELERY_DEFAULT_DELIVERY_MODE="transient",
                    CELERY_QUEUES = {
                        "bench.worker": {
                            "exchange": "bench.worker",
                            "routing_key": "bench.worker",
-                           #"no_ack": True,
+                           "no_ack": True,
                         }
                    },
                    CELERY_TASK_SERIALIZER="json",