Ask Solem 8 years ago
parent
commit
f1cb3b9bb4
9 changed files with 84 additions and 73 deletions
  1. 4 4
      celery/app/log.py
  2. 24 24
      celery/app/task.py
  3. 2 3
      celery/bin/base.py
  4. 13 13
      celery/bin/celery.py
  5. 7 7
      celery/canvas.py
  6. 2 2
      celery/result.py
  7. 16 16
      celery/task/base.py
  8. 11 4
      celery/worker/consumer/mingle.py
  9. 5 0
      setup.cfg

+ 4 - 4
celery/app/log.py

@@ -248,9 +248,9 @@ class Logging(object):
         return get_logger(name)
 
     @class_property
-    def already_setup(cls):
-        return cls._setup
+    def already_setup(self):
+        return self._setup
 
     @already_setup.setter  # noqa
-    def already_setup(cls, was_setup):
-        cls._setup = was_setup
+    def already_setup(self, was_setup):
+        self._setup = was_setup

+ 24 - 24
celery/app/task.py

@@ -300,30 +300,30 @@ class Task(object):
     # - until the task is actually used
 
     @classmethod
-    def bind(self, app):
-        was_bound, self.__bound__ = self.__bound__, True
-        self._app = app
+    def bind(cls, app):
+        was_bound, cls.__bound__ = cls.__bound__, True
+        cls._app = app
         conf = app.conf
-        self._exec_options = None  # clear option cache
+        cls._exec_options = None  # clear option cache
 
-        for attr_name, config_name in self.from_config:
-            if getattr(self, attr_name, None) is None:
-                setattr(self, attr_name, conf[config_name])
+        for attr_name, config_name in cls.from_config:
+            if getattr(cls, attr_name, None) is None:
+                setattr(cls, attr_name, conf[config_name])
 
         # decorate with annotations from config.
         if not was_bound:
-            self.annotate()
+            cls.annotate()
 
             from celery.utils.threads import LocalStack
-            self.request_stack = LocalStack()
+            cls.request_stack = LocalStack()
 
         # PeriodicTask uses this to add itself to the PeriodicTask schedule.
-        self.on_bound(app)
+        cls.on_bound(app)
 
         return app
 
     @classmethod
-    def on_bound(self, app):
+    def on_bound(cls, app):
         """Called when the task is bound to an app.
 
         Note:
@@ -333,33 +333,33 @@ class Task(object):
         pass
 
     @classmethod
-    def _get_app(self):
-        if self._app is None:
-            self._app = current_app
-        if not self.__bound__:
+    def _get_app(cls):
+        if cls._app is None:
+            cls._app = current_app
+        if not cls.__bound__:
             # The app property's __set__  method is not called
             # if Task.app is set (on the class), so must bind on use.
-            self.bind(self._app)
-        return self._app
+            cls.bind(cls._app)
+        return cls._app
     app = class_property(_get_app, bind)
 
     @classmethod
-    def annotate(self):
-        for d in resolve_all_annotations(self.app.annotations, self):
+    def annotate(cls):
+        for d in resolve_all_annotations(cls.app.annotations, cls):
             for key, value in items(d):
                 if key.startswith('@'):
-                    self.add_around(key[1:], value)
+                    cls.add_around(key[1:], value)
                 else:
-                    setattr(self, key, value)
+                    setattr(cls, key, value)
 
     @classmethod
-    def add_around(self, attr, around):
-        orig = getattr(self, attr)
+    def add_around(cls, attr, around):
+        orig = getattr(cls, attr)
         if getattr(orig, '__wrapped__', None):
             orig = orig.__wrapped__
         meth = around(orig)
         meth.__wrapped__ = orig
-        setattr(self, attr, meth)
+        setattr(cls, attr, meth)
 
     def __call__(self, *args, **kwargs):
         _task_stack.push(self)

+ 2 - 3
celery/bin/base.py

@@ -11,9 +11,8 @@ import json
 
 from collections import defaultdict
 from heapq import heappush
-from optparse import (
-    OptionParser, OptionGroup, IndentedHelpFormatter, make_option as Option,
-)
+from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
+from optparse import make_option as Option  # noqa
 from pprint import pformat
 
 from celery import VERSION_BANNER, Celery, maybe_patch_concurrency

+ 13 - 13
celery/bin/celery.py

@@ -564,11 +564,11 @@ class _RemoteControl(Command):
         super(_RemoteControl, self).__init__(*args, **kwargs)
 
     @classmethod
-    def get_command_info(self, command,
+    def get_command_info(cls, command,
                          indent=0, prefix='', color=None,
                          help=False, app=None, choices=None):
         if choices is None:
-            choices = self._choices_by_group(app)
+            choices = cls._choices_by_group(app)
         meta = choices[command]
         if help:
             help = '|' + text.indent(meta.help, indent + 4)
@@ -581,14 +581,14 @@ class _RemoteControl(Command):
         ])
 
     @classmethod
-    def list_commands(self, indent=0, prefix='',
+    def list_commands(cls, indent=0, prefix='',
                       color=None, help=False, app=None):
-        choices = self._choices_by_group(app)
+        choices = cls._choices_by_group(app)
         color = color if color else lambda x: x
         prefix = prefix + ' ' if prefix else ''
         return '\n'.join(
-            self.get_command_info(c, indent, prefix, color, help,
-                                  app=app, choices=choices)
+            cls.get_command_info(c, indent, prefix, color, help,
+                                 app=app, choices=choices)
             for c in sorted(choices))
 
     def usage(self, command):
@@ -676,14 +676,14 @@ class _RemoteControl(Command):
             args[:] = args[i:]
 
     @classmethod
-    def _choices_by_group(self, app):
+    def _choices_by_group(cls, app):
         from celery.worker.control import Panel
         # need to import task modules for custom user-remote control commands.
         app.loader.import_default_modules()
 
         return {
             name: info for name, info in items(Panel.meta)
-            if info.type == self.control_group and info.visible
+            if info.type == cls.control_group and info.visible
         }
 
     @cached_property
@@ -1157,11 +1157,11 @@ class CeleryCommand(Command):
             sys.exit(EX_FAILURE)
 
     @classmethod
-    def get_command_info(self, command, indent=0,
+    def get_command_info(cls, command, indent=0,
                          color=None, colored=None, app=None):
         colored = term.colored() if colored is None else colored
         colored = colored.names[color] if color else lambda x: x
-        obj = self.commands[command]
+        obj = cls.commands[command]
         cmd = 'celery {0}'.format(colored(command))
         if obj.leaf:
             return '|' + text.indent(cmd, indent)
@@ -1173,7 +1173,7 @@ class CeleryCommand(Command):
         ])
 
     @classmethod
-    def list_commands(self, indent=0, colored=None, app=None):
+    def list_commands(cls, indent=0, colored=None, app=None):
         colored = term.colored() if colored is None else colored
         white = colored.white
         ret = []
@@ -1181,8 +1181,8 @@ class CeleryCommand(Command):
             ret.extend([
                 text.indent('+ {0}: '.format(white(cls)), indent),
                 '\n'.join(
-                    self.get_command_info(command, indent + 4, color, colored,
-                                          app=app)
+                    cls.get_command_info(
+                        command, indent + 4, color, colored, app=app)
                     for command in commands),
                 ''
             ])

+ 7 - 7
celery/canvas.py

@@ -731,7 +731,7 @@ class chain(Signature):
         return last
 
     @classmethod
-    def from_dict(self, d, app=None):
+    def from_dict(cls, d, app=None):
         tasks = d['kwargs']['tasks']
         if tasks:
             if isinstance(tasks, tuple):  # aaaargh
@@ -824,9 +824,9 @@ class chunks(Signature):
         )
 
     @classmethod
-    def from_dict(self, d, app=None):
+    def from_dict(cls, d, app=None):
         return _upgrade(
-            d, chunks(*self._unpack_args(
+            d, chunks(*cls._unpack_args(
                 d['kwargs']), app=app, **d['options']),
         )
 
@@ -911,7 +911,7 @@ class group(Signature):
         self.subtask_type = 'group'
 
     @classmethod
-    def from_dict(self, d, app=None):
+    def from_dict(cls, d, app=None):
         return _upgrade(
             d, group(d['kwargs']['tasks'], app=app, **d['options']),
         )
@@ -1149,9 +1149,9 @@ class chord(Signature):
         self.parent_id = parent_id
 
     @classmethod
-    def from_dict(self, d, app=None):
-        args, d['kwargs'] = self._unpack_args(**d['kwargs'])
-        return _upgrade(d, self(*args, app=app, **d))
+    def from_dict(cls, d, app=None):
+        args, d['kwargs'] = cls._unpack_args(**d['kwargs'])
+        return _upgrade(d, cls(*args, app=app, **d))
 
     @staticmethod
     def _unpack_args(header=None, body=None, **kwargs):

+ 2 - 2
celery/result.py

@@ -865,10 +865,10 @@ class GroupResult(ResultSet):
         return self.results
 
     @classmethod
-    def restore(self, id, backend=None):
+    def restore(cls, id, backend=None):
         """Restore previously saved group result."""
         return (
-            backend or (self.app.backend if self.app else current_app.backend)
+            backend or (cls.app.backend if cls.app else current_app.backend)
         ).restore_group(id)
 
 

+ 16 - 16
celery/task/base.py

@@ -117,8 +117,8 @@ class TaskType(type):
         instance.bind(app)
         return instance.__class__
 
-    def __repr__(cls):
-        return _reprtask(cls)
+    def __repr__(self):
+        return _reprtask(self)
 
 
 @with_metaclass(TaskType)
@@ -159,25 +159,25 @@ class Task(BaseTask):
         locals()[name] = reclassmethod(getattr(BaseTask, name))
 
     @class_property
-    def request(cls):
-        return cls._get_request()
+    def request(self):
+        return self._get_request()
 
     @class_property
-    def backend(cls):
-        if cls._backend is None:
-            return cls.app.backend
-        return cls._backend
+    def backend(self):
+        if self._backend is None:
+            return self.app.backend
+        return self._backend
 
     @backend.setter
     def backend(cls, value):  # noqa
         cls._backend = value
 
     @classmethod
-    def get_logger(self, **kwargs):
-        return get_task_logger(self.name)
+    def get_logger(cls, **kwargs):
+        return get_task_logger(cls.name)
 
     @classmethod
-    def establish_connection(self):
+    def establish_connection(cls):
         """Deprecated method used to get a broker connection.
 
         Should be replaced with :meth:`@Celery.connection`
@@ -192,7 +192,7 @@ class Task(BaseTask):
             >>> with celery.connection_for_write() as conn:
             ...     pass
         """
-        return self._get_app().connection_for_write()
+        return cls._get_app().connection_for_write()
 
     def get_publisher(self, connection=None, exchange=None,
                       exchange_type=None, **options):
@@ -224,17 +224,17 @@ class Task(BaseTask):
         )
 
     @classmethod
-    def get_consumer(self, connection=None, queues=None, **kwargs):
+    def get_consumer(cls, connection=None, queues=None, **kwargs):
         """Get consumer for the queue this task is sent to.
 
         Deprecated!
 
         Should be replaced by :class:`@amqp.TaskConsumer`.
         """
-        Q = self._get_app().amqp
-        connection = connection or self.establish_connection()
+        Q = cls._get_app().amqp
+        connection = connection or cls.establish_connection()
         if queues is None:
-            queues = Q.queues[self.queue] if self.queue else Q.default_queue
+            queues = Q.queues[cls.queue] if cls.queue else Q.default_queue
         return Q.TaskConsumer(connection, queues, **kwargs)
 
 

+ 11 - 4
celery/worker/consumer/mingle.py

@@ -35,11 +35,11 @@ class Mingle(bootsteps.StartStopStep):
             return conn.transport.driver_type in self.compatible_transports
 
     def start(self, c):
+        self.sync()
+
+    def sync(self, c):
         info('mingle: searching for neighbors')
-        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
-        our_revoked = c.controller.state.revoked
-        replies = I.hello(c.hostname, our_revoked._data) or {}
-        replies.pop(c.hostname, None)  # delete my own response
+        replies = self.send_hello(c)
         if replies:
             info('mingle: sync with %s nodes',
                  len([reply for reply, value in items(replies) if value]))
@@ -49,6 +49,13 @@ class Mingle(bootsteps.StartStopStep):
         else:
             info('mingle: all alone')
 
+    def send_hello(self, c):
+        inspect = c.app.control.inspect(timeout=1.0, connection=c.connection)
+        our_revoked = c.controller.state.revoked
+        replies = inspect.hello(c.hostname, our_revoked._data) or {}
+        replies.pop(c.hostname, None)  # delete my own response
+        return replies
+
     def on_node_reply(self, c, nodename, reply):
         debug('mingle: processing reply from %s', nodename)
         try:

+ 5 - 0
setup.cfg

@@ -7,6 +7,11 @@ source-dir = docs/
 build-dir = docs/_build
 all_files = 1
 
+[flake8]
+# classes can be lowercase, arguments and variables can be uppercase
+# whenever it makes the code more readable.
+ignore = N806, N802, N801, N803
+
 [bdist_rpm]
 requires = pytz >= 2011b
            billiard >= 3.3.0.17