Parcourir la source

celery worker command must patch eventlet/gevent as early as possible. Closes #843

Ask Solem il y a 12 ans
Parent
commit
97301fb7dc
3 fichiers modifiés avec 48 ajouts et 7 suppressions
  1. 37 0
      celery/bin/base.py
  2. 6 0
      celery/bin/celery.py
  3. 5 7
      celery/bin/celeryd.py

+ 37 - 0
celery/bin/base.py

@@ -161,11 +161,38 @@ class Command(object):
         """
         if argv is None:
             argv = list(sys.argv)
+        # Should we load any special concurrency environment?
+        pool_option = self.with_pool_option(argv)
+        if pool_option:
+            self.maybe_patch_concurrency(argv, *pool_option)
+
+        # Dump version and exit if '--version' arg set.
         self.early_version(argv)
         argv = self.setup_app_from_commandline(argv)
         prog_name = os.path.basename(argv[0])
         return self.handle_argv(prog_name, argv[1:])
 
+    def _find_option_with_arg(self, argv, short_opts=None, long_opts=None):
+        for i, arg in enumerate(argv):
+            if arg.startswith('-'):
+                if long_opts and arg.startswith('--'):
+                    name, val = arg.split('=', 1)
+                    if name in long_opts:
+                        return val
+                if short_opts and arg in short_opts:
+                    return argv[i + 1]
+        raise KeyError('|'.join(short_opts or [] + long_opts or []))
+
+    def maybe_patch_concurrency(self, argv, short_opts=None, long_opts=None):
+        try:
+            pool = self._find_option_with_arg(argv, short_opts, long_opts)
+        except KeyError:
+            pass
+        else:
+            from celery import concurrency
+            # set up eventlet/gevent environments ASAP.
+            concurrency.get_implementation(pool)
+
     def usage(self, command):
         """Returns the command-line usage string for this app."""
         return '%%prog [options] %s' % (self.args, )
@@ -330,6 +357,16 @@ class Command(object):
                     line.strip()).replace('`', ''))
         return options
 
+    def with_pool_option(self, argv):
+        """Returns tuple of ``(short_opts, long_opts)`` if the command
+        supports a pool argument, and used to monkey patch eventlet/gevent
+        environments as early as possible.
+
+        E.g::
+              has_pool_option = (['-P'], ['--pool'])
+        """
+        pass
+
     def _get_default_app(self, *args, **kwargs):
         from celery.app import default_app
         return default_app._get_current_object()  # omit proxy

+ 6 - 0
celery/bin/celery.py

@@ -885,6 +885,12 @@ class CeleryCommand(BaseCommand):
             ])
         return '\n'.join(ret).strip()
 
+    def with_pool_option(self, argv):
+        if len(argv) > 1 and argv[1] == 'worker':
+            # this command supports custom pools
+            # that may have to be loaded as early as possible.
+            return (['-P'], ['--pool'])
+
 
 def determine_exit_status(ret):
     if isinstance(ret, int):

+ 5 - 7
celery/bin/celeryd.py

@@ -133,13 +133,6 @@ class WorkerCommand(Command):
     def execute_from_commandline(self, argv=None):
         if argv is None:
             argv = list(sys.argv)
-        try:
-            pool = argv[argv.index('-P') + 1]
-        except ValueError:
-            pass
-        else:
-            # set up eventlet/gevent environments ASAP.
-            concurrency.get_implementation(pool)
         return super(WorkerCommand, self).execute_from_commandline(argv)
 
     def run(self, *args, **kwargs):
@@ -161,6 +154,11 @@ class WorkerCommand(Command):
                       if isinstance(l, basestring))))
         return self.app.Worker(**kwargs).run()
 
+    def with_pool_option(self, argv):
+        # this command support custom pools
+        # that may have to be loaded as early as possible.
+        return (['-P'], ['--pool'])
+
     def get_options(self):
         conf = self.app.conf
         return (