Browse Source

PEP8ify (using new version of pep8, 0.6.0)

Ask Solem 14 years ago
parent
commit
19559a53b5
68 changed files with 137 additions and 114 deletions
  1. 1 1
      celery/apps/worker.py
  2. 3 1
      celery/backends/amqp.py
  3. 1 0
      celery/backends/cache.py
  4. 4 3
      celery/backends/cassandra.py
  5. 0 1
      celery/backends/database.py
  6. 2 2
      celery/beat.py
  7. 1 0
      celery/bin/camqadm.py
  8. 1 0
      celery/bin/celeryctl.py
  9. 11 5
      celery/concurrency/processes/pool.py
  10. 1 1
      celery/concurrency/threads.py
  11. 23 20
      celery/conf.py
  12. 1 1
      celery/datastructures.py
  13. 1 1
      celery/decorators.py
  14. 1 2
      celery/events/cursesmon.py
  15. 1 1
      celery/events/state.py
  16. 1 0
      celery/exceptions.py
  17. 2 2
      celery/execute/__init__.py
  18. 3 3
      celery/log.py
  19. 1 1
      celery/management/commands/celeryd.py
  20. 1 1
      celery/messaging.py
  21. 1 0
      celery/models.py
  22. 1 1
      celery/schedules.py
  23. 0 1
      celery/serialization.py
  24. 3 3
      celery/task/base.py
  25. 1 1
      celery/task/control.py
  26. 1 1
      celery/task/http.py
  27. 3 2
      celery/task/sets.py
  28. 1 0
      celery/tests/__init__.py
  29. 2 0
      celery/tests/compat.py
  30. 2 2
      celery/tests/test_bin/test_celerybeat.py
  31. 4 2
      celery/tests/test_bin/test_celeryd.py
  32. 1 1
      celery/tests/test_buckets.py
  33. 2 2
      celery/tests/test_concurrency_processes.py
  34. 0 2
      celery/tests/test_events_state.py
  35. 2 2
      celery/tests/test_result.py
  36. 0 1
      celery/tests/test_states.py
  37. 7 9
      celery/tests/test_task.py
  38. 2 2
      celery/tests/test_task_sets.py
  39. 3 2
      celery/tests/test_worker.py
  40. 1 1
      celery/tests/test_worker_control.py
  41. 1 1
      celery/tests/test_worker_job.py
  42. 4 5
      celery/utils/__init__.py
  43. 3 2
      celery/utils/compat.py
  44. 1 1
      celery/utils/dispatch/signal.py
  45. 5 2
      celery/utils/functional.py
  46. 1 1
      celery/utils/info.py
  47. 1 0
      celery/utils/term.py
  48. 2 1
      celery/utils/timer2.py
  49. 1 1
      celery/worker/buckets.py
  50. 1 1
      celery/worker/control/__init__.py
  51. 1 1
      celery/worker/control/builtins.py
  52. 1 1
      celery/worker/control/registry.py
  53. 3 3
      celery/worker/controllers.py
  54. 1 1
      celery/worker/heartbeat.py
  55. 0 1
      celery/worker/job.py
  56. 3 3
      celery/worker/listener.py
  57. 1 1
      celery/worker/state.py
  58. 1 1
      contrib/release/sphinx-to-rst.py
  59. 2 2
      docs/_ext/literals_to_xrefs.py
  60. 0 0
      docs/homepage/examplerun.rst
  61. 0 0
      docs/homepage/exampletask.rst
  62. 1 1
      docs/slidesource/slide-example1-result.py
  63. 1 1
      examples/celery_http_gateway/manage.py
  64. 1 1
      examples/httpexample/manage.py
  65. 1 0
      funtests/config.py
  66. 1 0
      funtests/test_basic.py
  67. 1 0
      pavement.py
  68. 1 1
      setup.py

+ 1 - 1
celery/apps/worker.py

@@ -12,7 +12,7 @@ from celery import signals
 from celery.exceptions import ImproperlyConfigured
 from celery.routes import Router
 from celery.task import discard_all
-from celery.utils import info,get_full_cls_name, LOG_LEVELS
+from celery.utils import info, get_full_cls_name, LOG_LEVELS
 from celery.worker import WorkController
 
 

+ 3 - 1
celery/backends/amqp.py

@@ -76,7 +76,9 @@ class AMQPBackend(BaseDictBackend):
         if self.expires is not None:
             self.expires = int(self.expires)
             # WARNING: Requries RabbitMQ 2.1.0 or higher.
-            self.queue_arguments["x-expires"] = self.expires
+            # x-expires must be a signed-int, or long describing the
+            # expiry time in milliseconds.
+            self.queue_arguments["x-expires"] = int(self.expires * 1000.0)
         super(AMQPBackend, self).__init__(**kwargs)
 
     def _create_publisher(self, task_id, connection):

+ 1 - 0
celery/backends/cache.py

@@ -44,6 +44,7 @@ backends = {"memcache": get_best_memcache,
             "pylibmc": get_best_memcache,
             "memory": DummyClient}
 
+
 class CacheBackend(KeyValueStoreBackend):
     _client = None
 

+ 4 - 3
celery/backends/cassandra.py

@@ -2,7 +2,7 @@
 try:
     import pycassa
     from thrift import Thrift
-    C = __import__('cassandra').ttypes # FIXME: Namespace kludge
+    C = __import__('cassandra').ttypes          # FIXME Namespace kludge
 except ImportError:
     pycassa = None
 
@@ -156,8 +156,9 @@ class CassandraBackend(BaseDictBackend):
 
         cf = self._get_column_family()
         column_parent = C.ColumnParent(cf.column_family)
-        slice_pred = C.SlicePredicate(slice_range=C.SliceRange('', end_column,
-                                                               count=2**30))
+        slice_pred = C.SlicePredicate(
+                        slice_range=C.SliceRange('', end_column,
+                                                 count=2 ** 30))
         columns = cf.client.multiget_slice(cf.keyspace, self._index_keys,
                                            column_parent, slice_pred,
                                            pycassa.ConsistencyLevel.DCQUORUM)

+ 0 - 1
celery/backends/database.py

@@ -14,7 +14,6 @@ except ImportError:
         "See http://pypi.python.org/pypi/SQLAlchemy")
 
 
-
 class DatabaseBackend(BaseDictBackend):
     """The database result backend."""
 

+ 2 - 2
celery/beat.py

@@ -194,7 +194,7 @@ class Scheduler(UserDict):
                     entry.name, exc))
         return result
 
-    def send_task(self, *args, **kwargs): # pragma: no cover
+    def send_task(self, *args, **kwargs):               # pragma: no cover
         return send_task(*args, **kwargs)
 
     def setup_schedule(self):
@@ -313,7 +313,7 @@ class Service(object):
     def stop(self, wait=False):
         self.logger.info("Celerybeat: Shutting down...")
         self._shutdown.set()
-        wait and self._stopped.wait() # block until shutdown done.
+        wait and self._stopped.wait()           # block until shutdown done.
 
     @property
     def scheduler(self):

+ 1 - 0
celery/bin/camqadm.py

@@ -39,6 +39,7 @@ Example:
     -> queue.delete myqueue yes no
 """
 
+
 def say(m):
     sys.stderr.write("%s\n" % (m, ))
 

+ 1 - 0
celery/bin/celeryctl.py

@@ -175,6 +175,7 @@ class result(Command):
         self.out(self.prettify(result.get())[1])
 result = command(result)
 
+
 class inspect(Command):
     choices = {"active": 10,
                "scheduled": 1.0,

+ 11 - 5
celery/concurrency/processes/pool.py

@@ -48,6 +48,7 @@ SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
 # Exceptions
 #
 
+
 class WorkerLostError(Exception):
     """The worker processing a job has exited prematurely."""
     pass
@@ -58,6 +59,7 @@ class WorkerLostError(Exception):
 
 job_counter = itertools.count()
 
+
 def mapstar(args):
     return map(*args)
 
@@ -191,7 +193,7 @@ class TaskHandler(PoolThread):
             else:
                 if set_length:
                     debug('doing set_length()')
-                    set_length(i+1)
+                    set_length(i + 1)
                 continue
             break
         else:
@@ -287,7 +289,7 @@ class TimeoutHandler(PoolThread):
                 elif i not in dirty and _timed_out(ack_time, t_soft):
                     _on_soft_timeout(job, i)
 
-            time.sleep(0.5) # Don't waste CPU cycles.
+            time.sleep(0.5)                     # Don't waste CPU cycles.
 
         debug('timeout handler exiting')
 
@@ -360,7 +362,6 @@ class ResultHandler(PoolThread):
 
                 on_state_change(task)
 
-
         if putlock is not None:
             try:
                 putlock.release()
@@ -757,6 +758,7 @@ DynamicPool = Pool
 # Class whose instances are returned by `Pool.apply_async()`
 #
 
+
 class ApplyResult(object):
 
     def __init__(self, cache, callback, accept_callback=None,
@@ -833,6 +835,7 @@ class ApplyResult(object):
 # Class whose instances are returned by `Pool.map_async()`
 #
 
+
 class MapResult(ApplyResult):
 
     def __init__(self, cache, chunksize, length, callback):
@@ -848,12 +851,12 @@ class MapResult(ApplyResult):
             self._number_left = 0
             self._ready = True
         else:
-            self._number_left = length//chunksize + bool(length % chunksize)
+            self._number_left = length // chunksize + bool(length % chunksize)
 
     def _set(self, i, success_result):
         success, result = success_result
         if success:
-            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
+            self._value[i * self._chunksize:(i + 1) * self._chunksize] = result
             self._number_left -= 1
             if self._number_left == 0:
                 if self._callback:
@@ -899,6 +902,7 @@ class MapResult(ApplyResult):
 # Class whose instances are returned by `Pool.imap()`
 #
 
+
 class IMapIterator(object):
 
     def __init__(self, cache):
@@ -972,6 +976,7 @@ class IMapIterator(object):
 # Class whose instances are returned by `Pool.imap_unordered()`
 #
 
+
 class IMapUnorderedIterator(IMapIterator):
 
     def _set(self, i, obj):
@@ -989,6 +994,7 @@ class IMapUnorderedIterator(IMapIterator):
 #
 #
 
+
 class ThreadPool(Pool):
 
     from multiprocessing.dummy import Process as DummyProcess

+ 1 - 1
celery/concurrency/threads.py

@@ -61,7 +61,7 @@ class TaskPool(object):
 
         if isinstance(ret_value, ExceptionInfo):
             if isinstance(ret_value.exception, (
-                    SystemExit, KeyboardInterrupt)): # pragma: no cover
+                    SystemExit, KeyboardInterrupt)):        # pragma: no cover
                 raise ret_value.exception
             [errback(ret_value) for errback in errbacks]
         else:

+ 23 - 20
celery/conf.py

@@ -46,29 +46,29 @@ _DEFAULTS = {
     "CELERY_DEFAULT_QUEUE": "celery",
     "CELERY_DEFAULT_EXCHANGE": "celery",
     "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
-    "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
+    "CELERY_DEFAULT_DELIVERY_MODE": 2,              # persistent
     "CELERY_ACKS_LATE": False,
     "CELERYD_POOL_PUTLOCKS": True,
     "CELERYD_POOL": "celery.concurrency.processes.TaskPool",
     "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
     "CELERYD_ETA_SCHEDULER": "celery.utils.timer2.Timer",
     "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
-    "CELERYD_CONCURRENCY": 0, # defaults to cpu count
+    "CELERYD_CONCURRENCY": 0,                       # defaults to cpu count
     "CELERYD_PREFETCH_MULTIPLIER": 4,
     "CELERYD_LOG_FORMAT": DEFAULT_PROCESS_LOG_FMT,
     "CELERYD_TASK_LOG_FORMAT": DEFAULT_TASK_LOG_FMT,
     "CELERYD_LOG_COLOR": False,
     "CELERYD_LOG_LEVEL": "WARN",
-    "CELERYD_LOG_FILE": None, # stderr
+    "CELERYD_LOG_FILE": None,                       # stderr
     "CELERYBEAT_SCHEDULE": {},
     "CELERYD_STATE_DB": None,
     "CELERYD_ETA_SCHEDULER_PRECISION": 1,
     "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
-    "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
+    "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60,         # five minutes.
     "CELERYBEAT_LOG_LEVEL": "INFO",
-    "CELERYBEAT_LOG_FILE": None, # stderr
+    "CELERYBEAT_LOG_FILE": None,                    # stderr
     "CELERYMON_LOG_LEVEL": "INFO",
-    "CELERYMON_LOG_FILE": None, # stderr
+    "CELERYMON_LOG_FILE": None,                     # stderr
     "CELERYMON_LOG_FORMAT": DEFAULT_LOG_FMT,
     "CELERY_BROADCAST_QUEUE": "celeryctl",
     "CELERY_BROADCAST_EXCHANGE": "celeryctl",
@@ -103,6 +103,7 @@ _DEPRECATION_FMT = """
 %s is deprecated in favor of %s and is scheduled for removal in celery v1.4.
 """.strip()
 
+
 def prepare(m, source=settings, defaults=_DEFAULTS):
 
     def _get(name, default=None, compat=None):
@@ -120,11 +121,11 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
                 pass
         return default
 
-    # <--- Task                                        <-   --   --- - ----- -- #
+    # <--- Task                                    <-   --   --- - ----- -- #
     m.ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
     m.EAGER_PROPAGATES_EXCEPTIONS = _get("CELERY_EAGER_PROPAGATES_EXCEPTIONS")
     m.RESULT_BACKEND = _get("CELERY_RESULT_BACKEND", compat=["CELERY_BACKEND"])
-    m.CELERY_BACKEND = RESULT_BACKEND # FIXME Remove in 1.4
+    m.CELERY_BACKEND = RESULT_BACKEND                   # FIXME Remove in 1.4
     m.CACHE_BACKEND = _get("CELERY_CACHE_BACKEND") or _get("CACHE_BACKEND")
     m.CACHE_BACKEND_OPTIONS = _get("CELERY_CACHE_BACKEND_OPTIONS") or {}
     m.TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
@@ -138,15 +139,15 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
     if isinstance(m.TASK_RESULT_EXPIRES, int):
         m.TASK_RESULT_EXPIRES = timedelta(seconds=m.TASK_RESULT_EXPIRES)
 
-    # <--- SQLAlchemy                                  <-   --   --- - ----- -- #
+    # <--- SQLAlchemy                              <-   --   --- - ----- -- #
     m.RESULT_DBURI = _get("CELERY_RESULT_DBURI")
     m.RESULT_ENGINE_OPTIONS = _get("CELERY_RESULT_ENGINE_OPTIONS")
 
-    # <--- Client                                      <-   --   --- - ----- -- #
+    # <--- Client                                  <-   --   --- - ----- -- #
 
     m.MAX_CACHED_RESULTS = _get("CELERY_MAX_CACHED_RESULTS")
 
-    # <--- Worker                                      <-   --   --- - ----- -- #
+    # <--- Worker                                  <-   --   --- - ----- -- #
 
     m.SEND_EVENTS = _get("CELERY_SEND_EVENTS")
     m.DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
@@ -154,7 +155,8 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
     m.CELERYD_TASK_TIME_LIMIT = _get("CELERYD_TASK_TIME_LIMIT")
     m.CELERYD_TASK_SOFT_TIME_LIMIT = _get("CELERYD_TASK_SOFT_TIME_LIMIT")
     m.CELERYD_MAX_TASKS_PER_CHILD = _get("CELERYD_MAX_TASKS_PER_CHILD")
-    m.STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
+    m.STORE_ERRORS_EVEN_IF_IGNORED = \
+            _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
     m.CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS",
                                            False,
                                     compat=["SEND_CELERY_TASK_ERROR_EMAILS"])
@@ -180,7 +182,7 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
     m.CELERYD_ETA_SCHEDULER = _get("CELERYD_ETA_SCHEDULER")
     m.CELERYD_ETA_SCHEDULER_PRECISION = _get("CELERYD_ETA_SCHEDULER_PRECISION")
 
-    # :--- Email settings                               <-   --   --- - ----- -- #
+    # :--- Email settings                           <-   --   --- - ----- -- #
     m.ADMINS = _get("ADMINS")
     m.SERVER_EMAIL = _get("SERVER_EMAIL")
     m.EMAIL_HOST = _get("EMAIL_HOST")
@@ -188,7 +190,7 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
     m.EMAIL_HOST_PASSWORD = _get("EMAIL_HOST_PASSWORD")
     m.EMAIL_PORT = _get("EMAIL_PORT")
 
-    # :--- Broker connections                           <-   --   --- - ----- -- #
+    # :--- Broker connections                       <-   --   --- - ----- -- #
     m.BROKER_HOST = _get("BROKER_HOST")
     m.BROKER_PORT = _get("BROKER_PORT")
     m.BROKER_USER = _get("BROKER_USER")
@@ -206,7 +208,7 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
                             _get("BROKER_BACKEND") or \
                                 _get("CARROT_BACKEND")
 
-    # <--- Message routing                             <-   --   --- - ----- -- #
+    # <--- Message routing                         <-   --   --- - ----- -- #
     m.DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
     m.DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
     m.DEFAULT_EXCHANGE = _get("CELERY_DEFAULT_EXCHANGE")
@@ -218,13 +220,13 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
                                        "binding_key": DEFAULT_ROUTING_KEY}}
     m.CREATE_MISSING_QUEUES = _get("CELERY_CREATE_MISSING_QUEUES")
     m.ROUTES = routes.prepare(_get("CELERY_ROUTES") or [])
-    # :--- Broadcast queue settings                     <-   --   --- - ----- -- #
+    # :--- Broadcast queue settings                 <-   --   --- - ----- -- #
 
     m.BROADCAST_QUEUE = _get("CELERY_BROADCAST_QUEUE")
     m.BROADCAST_EXCHANGE = _get("CELERY_BROADCAST_EXCHANGE")
     m.BROADCAST_EXCHANGE_TYPE = _get("CELERY_BROADCAST_EXCHANGE_TYPE")
 
-# :--- Event queue settings                         <-   --   --- - ----- -- #
+    # :--- Event queue settings                     <-   --   --- - ----- -- #
 
     m.EVENT_QUEUE = _get("CELERY_EVENT_QUEUE")
     m.EVENT_EXCHANGE = _get("CELERY_EVENT_EXCHANGE")
@@ -232,26 +234,27 @@ def prepare(m, source=settings, defaults=_DEFAULTS):
     m.EVENT_ROUTING_KEY = _get("CELERY_EVENT_ROUTING_KEY")
     m.EVENT_SERIALIZER = _get("CELERY_EVENT_SERIALIZER")
 
-# :--- AMQP Backend settings                        <-   --   --- - ----- -- #
+    # :--- AMQP Backend settings                    <-   --   --- - ----- -- #
 
     m.RESULT_EXCHANGE = _get("CELERY_RESULT_EXCHANGE")
     m.RESULT_EXCHANGE_TYPE = _get("CELERY_RESULT_EXCHANGE_TYPE")
     m.RESULT_SERIALIZER = _get("CELERY_RESULT_SERIALIZER")
     m.RESULT_PERSISTENT = _get("CELERY_RESULT_PERSISTENT")
 
-# :--- Celery Beat                                  <-   --   --- - ----- -- #
+    # :--- Celery Beat                              <-   --   --- - ----- -- #
     m.CELERYBEAT_LOG_LEVEL = _get("CELERYBEAT_LOG_LEVEL")
     m.CELERYBEAT_LOG_FILE = _get("CELERYBEAT_LOG_FILE")
     m.CELERYBEAT_SCHEDULE = _get("CELERYBEAT_SCHEDULE")
     m.CELERYBEAT_SCHEDULE_FILENAME = _get("CELERYBEAT_SCHEDULE_FILENAME")
     m.CELERYBEAT_MAX_LOOP_INTERVAL = _get("CELERYBEAT_MAX_LOOP_INTERVAL")
 
-# :--- Celery Monitor                               <-   --   --- - ----- -- #
+    # :--- Celery Monitor                           <-   --   --- - ----- -- #
     m.CELERYMON_LOG_LEVEL = _get("CELERYMON_LOG_LEVEL")
     m.CELERYMON_LOG_FILE = _get("CELERYMON_LOG_FILE")
 
 prepare(sys.modules[__name__])
 
+
 def _init_queues(queues):
     """Convert configuration mapping to a table of queues digestible
     by a :class:`carrot.messaging.ConsumerSet`."""

+ 1 - 1
celery/datastructures.py

@@ -215,7 +215,7 @@ class LimitedSet(object):
                 if not self.expires or time.time() > when + self.expires:
                     try:
                         self.pop_value(value)
-                    except TypeError: # pragma: no cover
+                    except TypeError:                   # pragma: no cover
                         continue
             break
 

+ 1 - 1
celery/decorators.py

@@ -60,7 +60,7 @@ def task(*args, **options):
                             __module__=fun.__module__,
                             __doc__=fun.__doc__)
             T = type(fun.__name__, (base, ), cls_dict)()
-            return registry.tasks[T.name] # global instance.
+            return registry.tasks[T.name]               # global instance.
 
         return _create_task_cls
 

+ 1 - 2
celery/events/cursesmon.py

@@ -167,7 +167,7 @@ class CursesMonitor(object):
             while True:
                 ch = self.win.getch(x, y + i)
                 if ch != -1:
-                    if ch in (10, curses.KEY_ENTER): # enter
+                    if ch in (10, curses.KEY_ENTER):                # enter
                         break
                     if ch in (27, ):
                         buffer = str()
@@ -302,7 +302,6 @@ class CursesMonitor(object):
         else:
             win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
 
-
         # Workers
         if self.workers:
             win.addstr(my - 4, x, self.online_str, curses.A_BOLD)

+ 1 - 1
celery/events/state.py

@@ -10,7 +10,7 @@ from celery import states
 from celery.datastructures import AttributeDict, LocalCache
 from celery.utils import kwdict
 
-HEARTBEAT_EXPIRE = 150 # 2 minutes, 30 seconds
+HEARTBEAT_EXPIRE = 150                  # 2 minutes, 30 seconds
 
 
 class Element(AttributeDict):

+ 1 - 0
celery/exceptions.py

@@ -8,6 +8,7 @@ UNREGISTERED_FMT = """
 Task of kind %s is not registered, please make sure it's imported.
 """.strip()
 
+
 class QueueNotFound(KeyError):
     """Task routed to a queue not in CELERY_QUEUES."""
     pass

+ 2 - 2
celery/execute/__init__.py

@@ -90,7 +90,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
     if conf.ALWAYS_EAGER:
         return apply(task, args, kwargs, task_id=task_id)
 
-    task = tasks[task.name] # get instance from registry
+    task = tasks[task.name]                 # get instance from registry
 
     options = dict(extract_exec_options(task), **options)
     options = router.route(options, task.name, args, kwargs)
@@ -178,7 +178,7 @@ def apply(task, args, kwargs, **options):
     retries = options.get("retries", 0)
     throw = options.pop("throw", conf.EAGER_PROPAGATES_EXCEPTIONS)
 
-    task = tasks[task.name] # Make sure we get the instance, not class.
+    task = tasks[task.name]     # make sure we get the instance, not class.
 
     default_kwargs = {"task_name": task.name,
                       "task_id": task_id,

+ 3 - 3
celery/log.py

@@ -138,7 +138,7 @@ def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
 def _setup_logger(logger, logfile, format, colorize,
         formatter=ColorFormatter, **kwargs):
 
-    if logger.handlers: # Logger already configured
+    if logger.handlers:                 # already configured
         return logger
 
     handler = _detect_handler(logfile)
@@ -203,7 +203,7 @@ class LoggingProxy(object):
         ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
         infinite loops."""
 
-        def wrap_handler(handler): # pragma: no cover
+        def wrap_handler(handler):                  # pragma: no cover
 
             class WithSafeHandleError(logging.Handler):
 
@@ -226,7 +226,7 @@ class LoggingProxy(object):
 
     def write(self, data):
         if getattr(self._thread, "recurse_protection", False):
-            # Logger is logging back to this file, so stop recursing.
+            # logger is logging back to this file, so stop recursing.
             return
         """Write message to logging object."""
         data = data.strip()

+ 1 - 1
celery/management/commands/celeryd.py

@@ -5,7 +5,7 @@ Start the celery daemon from the Django management command.
 """
 from django.core.management.base import BaseCommand
 
-import celery.models # <-- shows upgrade instructions at exit.
+import celery.models            # <-- shows upgrade instructions at exit.
 
 
 class Command(BaseCommand):

+ 1 - 1
celery/messaging.py

@@ -62,7 +62,7 @@ class TaskPublisher(Publisher):
         task_args = task_args or []
         task_kwargs = task_kwargs or {}
         now = None
-        if countdown: # Convert countdown to ETA.
+        if countdown:                       # convert countdown to ETA.
             now = datetime.now()
             eta = now + timedelta(seconds=countdown)
 

+ 1 - 0
celery/models.py

@@ -9,6 +9,7 @@ import atexit
 
 from django.core.exceptions import ImproperlyConfigured
 
+
 @atexit.register
 def _display_help():
     import sys

+ 1 - 1
celery/schedules.py

@@ -39,7 +39,7 @@ class schedule(object):
 
 class crontab_parser(object):
     """Parser for crontab expressions. Any expression of the form 'groups'
-    (see BNF grammar below) is accepted and expanded to a set of numbers. 
+    (see BNF grammar below) is accepted and expanded to a set of numbers.
     These numbers represent the units of time that the crontab needs to
     run on::
 

+ 0 - 1
celery/serialization.py

@@ -71,7 +71,6 @@ def find_nearest_pickleable_exception(exc):
         # Use inspect.getmro() to traverse bases instead.
         getmro_ = lambda: inspect.getmro(cls)
 
-
     for supercls in getmro_():
         if supercls in unwanted_base_classes:
             # only BaseException and object, from here on down,

+ 3 - 3
celery/task/base.py

@@ -232,8 +232,8 @@ class Task(object):
     store_errors_even_if_ignored = conf.STORE_ERRORS_EVEN_IF_IGNORED
     send_error_emails = conf.CELERY_SEND_TASK_ERROR_EMAILS
     error_whitelist = conf.CELERY_TASK_ERROR_WHITELIST
-    disable_error_emails = False # FIXME
-    max_retries = 3
+    disable_error_emails = False                                    # FIXME
+    max_retries = 5
     default_retry_delay = 3 * 60
     serializer = conf.TASK_SERIALIZER
     rate_limit = conf.DEFAULT_RATE_LIMIT
@@ -426,7 +426,7 @@ class Task(object):
         if kwargs.get("task_is_eager", False):
             result = self.apply(args=args, kwargs=kwargs, **options)
             if isinstance(result, EagerResult):
-                return result.get() # propogates exceptions.
+                return result.get()                 # propogates exceptions.
             return result
 
         self.apply_async(args=args, kwargs=kwargs, **options)

+ 1 - 1
celery/task/control.py

@@ -142,7 +142,7 @@ class inspect(object):
         return self._request("disable_events")
 
     def diagnose(self):
-        diagnose_timeout = self.timeout * 0.85 # 15% of timeout
+        diagnose_timeout = self.timeout * 0.85              # 15% of timeout
         return self._request("diagnose", timeout=diagnose_timeout)
 
     def ping(self):

+ 1 - 1
celery/task/http.py

@@ -125,7 +125,7 @@ class HttpDispatch(object):
         """Makes an HTTP request and returns the response."""
         request = urllib2.Request(url, params, headers=self.http_headers)
         request.headers.update(self.http_headers)
-        response = urllib2.urlopen(request) # user catches errors.
+        response = urllib2.urlopen(request)         # user catches errors.
         return response.read()
 
     def dispatch(self):

+ 3 - 2
celery/task/sets.py

@@ -33,6 +33,7 @@ Thank you for your patience!
 
 """
 
+
 class subtask(AttributeDict):
     """Class that wraps the arguments and execution options
     for a single task invocation.
@@ -124,8 +125,8 @@ class TaskSet(UserList):
         >>> list_of_return_values = taskset_result.join()
 
     """
-    _task = None # compat
-    _task_name = None # compat
+    _task = None                                                # compat
+    _task_name = None                                           # compat
 
     def __init__(self, task=None, tasks=None):
         if task is not None:

+ 1 - 0
celery/tests/__init__.py

@@ -6,6 +6,7 @@ config = os.environ.setdefault("CELERY_TEST_CONFIG_MODULE",
 os.environ["CELERY_CONFIG_MODULE"] = config
 os.environ["CELERY_LOADER"] = "default"
 
+
 def teardown():
     import os
     if os.path.exists("test.db"):

+ 2 - 0
celery/tests/compat.py

@@ -69,8 +69,10 @@ class catch_warnings(object):
         self._showwarning = self._module.showwarning
         if self._record:
             log = []
+
             def showwarning(*args, **kwargs):
                 log.append(WarningMessage(*args, **kwargs))
+
             self._module.showwarning = showwarning
             return log
         else:

+ 2 - 2
celery/tests/test_bin/test_celerybeat.py

@@ -108,6 +108,6 @@ class test_div(unittest.TestCase):
             MockBeat.running = False
 
     def test_parse_options(self):
-        options,args = celerybeat.BeatCommand().parse_options(
-                "celerybeat", ["-s", "foo"])
+        options, args = celerybeat.BeatCommand().parse_options(
+                            "celerybeat", ["-s", "foo"])
         self.assertEqual(options.schedule, "foo")

+ 4 - 2
celery/tests/test_bin/test_celeryd.py

@@ -21,6 +21,7 @@ from celery.tests.utils import execute_context
 
 patch.ensure_process_aware_logger()
 
+
 def disable_stdouts(fun):
 
     @wraps(fun)
@@ -140,8 +141,8 @@ class test_Worker(unittest.TestCase):
 
     @disable_stdouts
     def test_on_listener_ready(self):
-
         worker_ready_sent = [False]
+
         def on_worker_ready(**kwargs):
             worker_ready_sent[0] = True
 
@@ -226,10 +227,11 @@ class test_signal_handlers(unittest.TestCase):
     def test_worker_int_handler(self):
         worker = self._Worker()
         handlers = self.psig(cd.install_worker_int_handler, worker)
-
         next_handlers = {}
+
         def i(sig, handler):
             next_handlers[sig] = handler
+
         p = platform.install_signal_handler
         platform.install_signal_handler = i
         try:

+ 1 - 1
celery/tests/test_buckets.py

@@ -261,7 +261,7 @@ class test_TaskBucket(unittest.TestCase):
             for i, job in enumerate(jobs):
                 sys.stderr.write("0")
                 self.assertTrue(b.get(), job)
-            self.assertEqual(i+1, len(jobs))
+            self.assertEqual(i + 1, len(jobs))
         finally:
             self.registry.unregister(TaskD)
 

+ 2 - 2
celery/tests/test_concurrency_processes.py

@@ -61,8 +61,8 @@ class test_TaskPool(unittest.TestCase):
         self.assertTrue(_pool.terminated)
 
     def test_on_ready_exception(self):
-
         scratch = [None]
+
         def errback(retval):
             scratch[0] = retval
 
@@ -72,8 +72,8 @@ class test_TaskPool(unittest.TestCase):
         self.assertEqual(exc, scratch[0])
 
     def test_on_ready_value(self):
-
         scratch = [None]
+
         def callback(retval):
             scratch[0] = retval
 

+ 0 - 2
celery/tests/test_events_state.py

@@ -245,7 +245,6 @@ class test_State(unittest.TestCase):
         s.freeze_while(work)
         self.assertState(s)
 
-
     def test_freeze_thaw__not_buffering(self):
         s = State()
         r = ev_snapshot(s)
@@ -322,4 +321,3 @@ class test_State(unittest.TestCase):
         s = State(callback=callback)
         s.event({"type": "worker-online"})
         self.assertTrue(scratch.get("recv"))
-

+ 2 - 2
celery/tests/test_result.py

@@ -109,7 +109,7 @@ class TestAsyncResult(unittest.TestCase):
         self.assertIsInstance(nok2_res.result, KeyError)
 
     def test_get_timeout(self):
-        res = AsyncResult(self.task4["id"]) # has RETRY status
+        res = AsyncResult(self.task4["id"])             # has RETRY status
         self.assertRaises(TimeoutError, res.get, timeout=0.1)
 
         pending_res = AsyncResult(gen_unique_id())
@@ -117,7 +117,7 @@ class TestAsyncResult(unittest.TestCase):
 
     @skip_if_quick
     def test_get_timeout_longer(self):
-        res = AsyncResult(self.task4["id"]) # has RETRY status
+        res = AsyncResult(self.task4["id"])             # has RETRY status
         self.assertRaises(TimeoutError, res.get, timeout=1)
 
     def test_ready(self):

+ 0 - 1
celery/tests/test_states.py

@@ -26,4 +26,3 @@ class test_state_precedence(unittest.TestCase):
         self.assertLess(state("CRASHED"), state(states.SUCCESS))
         self.assertLess(state("CRASHED"), state(states.FAILURE))
         self.assertTrue(state(states.REVOKED) < state("CRASHED"))
-

+ 7 - 9
celery/tests/test_task.py

@@ -376,7 +376,6 @@ class TestTaskSet(unittest.TestCase):
         ])
         self.assertEqual(ts.total, 9)
 
-
         consumer = IncrementCounterTask().get_consumer()
         consumer.discard_all()
         taskset_res = ts.apply_async()
@@ -725,7 +724,7 @@ class test_crontab_is_due(unittest.TestCase):
     def test_every_hour_execution_is_due(self):
         due, remaining = HourlyPeriodic().is_due(datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 60*60)
+        self.assertEquals(remaining, 60 * 60)
 
     @patch_crontab_nowfun(HourlyPeriodic, datetime(2010, 5, 10, 10, 29))
     def test_every_hour_execution_is_not_due(self):
@@ -738,14 +737,14 @@ class test_crontab_is_due(unittest.TestCase):
         due, remaining = QuarterlyPeriodic().is_due(
                             datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 15*60)
+        self.assertEquals(remaining, 15 * 60)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 30))
     def test_second_quarter_execution_is_due(self):
         due, remaining = QuarterlyPeriodic().is_due(
                             datetime(2010, 5, 10, 6, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 15*60)
+        self.assertEquals(remaining, 15 * 60)
 
     @patch_crontab_nowfun(QuarterlyPeriodic, datetime(2010, 5, 10, 10, 14))
     def test_first_quarter_execution_is_not_due(self):
@@ -765,23 +764,22 @@ class test_crontab_is_due(unittest.TestCase):
     def test_daily_execution_is_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 9, 7, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 24*60*60)
+        self.assertEquals(remaining, 24 * 60 * 60)
 
     @patch_crontab_nowfun(DailyPeriodic, datetime(2010, 5, 10, 10, 30))
     def test_daily_execution_is_not_due(self):
         due, remaining = DailyPeriodic().is_due(datetime(2010, 5, 10, 7, 30))
         self.assertFalse(due)
-        self.assertEquals(remaining, 21*60*60)
+        self.assertEquals(remaining, 21 * 60 * 60)
 
     @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 6, 7, 30))
     def test_weekly_execution_is_due(self):
         due, remaining = WeeklyPeriodic().is_due(datetime(2010, 4, 30, 7, 30))
         self.assertTrue(due)
-        self.assertEquals(remaining, 7*24*60*60)
+        self.assertEquals(remaining, 7 * 24 * 60 * 60)
 
     @patch_crontab_nowfun(WeeklyPeriodic, datetime(2010, 5, 7, 10, 30))
     def test_weekly_execution_is_not_due(self):
         due, remaining = WeeklyPeriodic().is_due(datetime(2010, 5, 6, 7, 30))
         self.assertFalse(due)
-        self.assertEquals(remaining, 6*24*60*60 - 3*60*60)
-
+        self.assertEquals(remaining, 6 * 24 * 60 * 60 - 3 * 60 * 60)

+ 2 - 2
celery/tests/test_task_sets.py

@@ -81,8 +81,8 @@ class test_subtask(unittest.TestCase):
     def test_is_JSON_serializable(self):
         s = MockTask.subtask((2, ), {"cache": True},
                 {"routing_key": "CPU-bound"})
-        s.args = list(s.args) # tuples are not preserved
-                              # but this doesn't matter.
+        s.args = list(s.args)               # tuples are not preserved
+                                            # but this doesn't matter.
         self.assertEqual(s,
                          subtask(simplejson.loads(simplejson.dumps(s))))
 

+ 3 - 2
celery/tests/test_worker.py

@@ -178,8 +178,8 @@ class test_CarrotListener(unittest.TestCase):
 
         it = l._mainloop()
         self.assertTrue(it.next(), "draining")
-
         records = {}
+
         def create_recorder(key):
             def _recorder(*args, **kwargs):
                 records[key] = True
@@ -417,10 +417,10 @@ class test_CarrotListener(unittest.TestCase):
                 return self.wait_method
 
         called_back = [False]
+
         def init_callback(listener):
             called_back[0] = True
 
-
         l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
@@ -439,6 +439,7 @@ class test_CarrotListener(unittest.TestCase):
         l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
+
         def raises_socket_error(limit=None):
             yield True
             l.iterations = 1

+ 1 - 1
celery/tests/test_worker_control.py

@@ -16,7 +16,7 @@ from celery.worker.state import revoked
 hostname = socket.gethostname()
 
 
-@task(rate_limit=200) # for extra info in dump_tasks
+@task(rate_limit=200)                   # for extra info in dump_tasks
 def mytask():
     pass
 

+ 1 - 1
celery/tests/test_worker_job.py

@@ -42,7 +42,7 @@ def mytask(i, **kwargs):
     return i ** i
 
 
-@task_dec # traverses coverage for decorator without parens
+@task_dec               # traverses coverage for decorator without parens
 def mytask_no_kwargs(i):
     return i ** i
 

+ 4 - 5
celery/utils/__init__.py

@@ -18,7 +18,7 @@ from carrot.utils import rpartition
 from dateutil.parser import parse as parse_iso8601
 
 from celery.utils.compat import all, any, defaultdict
-from celery.utils.timeutils import timedelta_seconds # was here before
+from celery.utils.timeutils import timedelta_seconds        # was here before
 from celery.utils.functional import curry
 
 
@@ -27,7 +27,6 @@ LOG_LEVELS["FATAL"] = logging.FATAL
 LOG_LEVELS[logging.FATAL] = "FATAL"
 
 
-
 class promise(object):
     """A promise.
 
@@ -238,7 +237,7 @@ def repeatlast(it):
     yield the last value infinitely."""
     for item in it:
         yield item
-    while 1: # pragma: no cover
+    while 1:                                            # pragma: no cover
         yield item
 
 
@@ -346,7 +345,7 @@ def get_cls_by_name(name, aliases={}):
     """
 
     if not isinstance(name, basestring):
-        return name # already a class
+        return name                                     # already a class
 
     name = aliases.get(name) or name
     module_name, _, cls_name = rpartition(name, ".")
@@ -374,7 +373,7 @@ def abbr(S, max, ellipsis="..."):
     if S is None:
         return "???"
     if len(S) > max:
-        return ellipsis and (S[:max-len(ellipsis)] + ellipsis) or S[:max]
+        return ellipsis and (S[:max - len(ellipsis)] + ellipsis) or S[:max]
     return S
 
 

+ 3 - 2
celery/utils/compat.py

@@ -287,7 +287,7 @@ except ImportError:
             return "defaultdict(%s, %s)" % (self.default_factory,
                                             dict.__repr__(self))
     import collections
-    collections.defaultdict = defaultdict # Pickle needs this.
+    collections.defaultdict = defaultdict               # Pickle needs this.
 
 ############## logging.LoggerAdapter ########################################
 import inspect
@@ -410,11 +410,12 @@ try:
     from itertools import izip_longest
 except ImportError:
     import itertools
+
     def izip_longest(*args, **kwds):
         fillvalue = kwds.get("fillvalue")
 
         def sentinel(counter=([fillvalue] * (len(args) - 1)).pop):
-            yield counter() # yields the fillvalue, or raises IndexError
+            yield counter()     # yields the fillvalue, or raises IndexError
 
         fillers = itertools.repeat(fillvalue)
         iters = [itertools.chain(it, sentinel(), fillers)

+ 1 - 1
celery/utils/dispatch/signal.py

@@ -4,7 +4,7 @@ import weakref
 try:
     set
 except NameError:
-    from sets import Set as set # Python 2.3 fallback
+    from sets import Set as set                     # Python 2.3 fallback
 
 from celery.utils.dispatch import saferef
 

+ 5 - 2
celery/utils/functional.py

@@ -67,12 +67,13 @@
 # update_wrapper() and wraps() are tools to help write
 # wrapper functions that can handle naive introspection
 
+
 def _compat_curry(fun, *args, **kwargs):
     """New function with partial application of the given arguments
     and keywords."""
 
     def _curried(*addargs, **addkwargs):
-        return fun(*(args+addargs), **dict(kwargs, **addkwargs))
+        return fun(*(args + addargs), **dict(kwargs, **addkwargs))
     return _curried
 
 
@@ -83,6 +84,8 @@ except ImportError:
 
 WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__')
 WRAPPER_UPDATES = ('__dict__',)
+
+
 def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
         updated=WRAPPER_UPDATES):
     """Update a wrapper function to look like the wrapped function
@@ -100,7 +103,7 @@ def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
     for attr in assigned:
         try:
             setattr(wrapper, attr, getattr(wrapped, attr))
-        except TypeError: # Python 2.3 doesn't allow assigning to __name__.
+        except TypeError:   # Python 2.3 doesn't allow assigning to __name__.
             pass
     for attr in updated:
         getattr(wrapper, attr).update(getattr(wrapped, attr))

+ 1 - 1
celery/utils/info.py

@@ -20,7 +20,7 @@ def humanize_seconds(secs, prefix=""):
     for unit, divider, formatter in TIME_UNITS:
         if secs >= divider:
             w = secs / divider
-            punit = w > 1 and unit+"s" or unit
+            punit = w > 1 and (unit + "s") or unit
             return "%s%s %s" % (prefix, formatter(w), punit)
     return "now"
 

+ 1 - 0
celery/utils/term.py

@@ -16,6 +16,7 @@ RESET_SEQ = "\033[0m"
 COLOR_SEQ = "\033[1;%dm"
 fg = lambda s: COLOR_SEQ % s
 
+
 class colored(object):
 
     def __init__(self, *s, **kwargs):

+ 2 - 1
celery/utils/timer2.py

@@ -22,6 +22,7 @@ __docformat__ = "restructuredtext"
 
 DEFAULT_MAX_INTERVAL = 2
 
+
 class TimedFunctionFailed(UserWarning):
     pass
 
@@ -119,7 +120,7 @@ class Schedule(object):
     @property
     def queue(self):
         events = list(self._queue)
-        return map(heapq.heappop, [events]*len(events))
+        return map(heapq.heappop, [events] * len(events))
 
 
 class Timer(Thread):

+ 1 - 1
celery/worker/buckets.py

@@ -66,7 +66,7 @@ class TaskBucket(object):
     def _get_immediate(self):
         try:
             return self.immediate.popleft()
-        except IndexError: # Empty
+        except IndexError:                                      # Empty
             raise QueueEmpty()
 
     def _get(self):

+ 1 - 1
celery/worker/control/__init__.py

@@ -36,7 +36,7 @@ class ControlDispatch(object):
             ...         ControlDispatch().dispatch_from_message(control)
 
         """
-        message = dict(message) # don't modify callers message.
+        message = dict(message)             # don't modify callers message.
         command = message.pop("command")
         destination = message.pop("destination", None)
         reply_to = message.pop("reply_to", None)

+ 1 - 1
celery/worker/control/builtins.py

@@ -29,7 +29,7 @@ def revoke(panel, task_id, task_name=None, **kwargs):
     """Revoke task by task id."""
     revoked.add(task_id)
     backend = default_backend
-    if task_name: # Use custom task backend (if any)
+    if task_name:                       # use custom task backend (if any)
         try:
             backend = tasks[task_name].backend
         except KeyError:

+ 1 - 1
celery/worker/control/registry.py

@@ -2,7 +2,7 @@ from UserDict import UserDict
 
 
 class Panel(UserDict):
-    data = dict() # Global registry.
+    data = dict()                       # global registry.
 
     def __init__(self, logger, listener, hostname=None):
         self.logger = logger

+ 3 - 3
celery/worker/controllers.py

@@ -46,15 +46,15 @@ class Mediator(threading.Thread):
         self.logger.debug(
             "Mediator: Running callback for task: %s[%s]" % (
                 task.task_name, task.task_id))
-        self.callback(task) # execute
+        self.callback(task)                 # execute
 
     def run(self):
         while not self._shutdown.isSet():
             self.move()
-        self._stopped.set() # indicate that we are stopped
+        self._stopped.set()                 # indicate that we are stopped
 
     def stop(self):
         """Gracefully shutdown the thread."""
         self._shutdown.set()
-        self._stopped.wait() # block until this thread is done
+        self._stopped.wait()                # block until this thread is done
         self.join(1e100)

+ 1 - 1
celery/worker/heartbeat.py

@@ -63,6 +63,6 @@ class Heart(threading.Thread):
             return
         self._state = "CLOSE"
         self._shutdown.set()
-        self._stopped.wait() # block until this thread is done
+        self._stopped.wait()            # block until this thread is done
         if self.isAlive():
             self.join(1e100)

+ 0 - 1
celery/worker/job.py

@@ -274,7 +274,6 @@ class TaskRequest(object):
         delivery_info = dict((key, _delivery_info.get(key))
                                 for key in WANTED_DELIVERY_INFO)
 
-
         if not hasattr(kwargs, "items"):
             raise InvalidTaskError("Task kwargs must be a dictionary.")
 

+ 3 - 3
celery/worker/listener.py

@@ -315,7 +315,7 @@ class CarrotListener(object):
     def maybe_conn_error(self, fun):
         try:
             fun()
-        except Exception: # TODO kombu.connection_errors
+        except Exception:                   # TODO kombu.connection_errors
             pass
 
     def close_connection(self):
@@ -380,7 +380,7 @@ class CarrotListener(object):
         # QoS: Reset prefetch window.
         self.qos = QoS(self.task_consumer,
                        self.initial_prefetch_count, self.logger)
-        self.qos.update() # enable prefetch_count QoS.
+        self.qos.update()                       # enable prefetch_count QoS.
 
         self.task_consumer.on_decode_error = self.on_decode_error
         self.broadcast_consumer = BroadcastConsumer(self.connection,
@@ -427,7 +427,7 @@ class CarrotListener(object):
         def _establish_connection():
             """Establish a connection to the broker."""
             conn = establish_connection()
-            conn.connect() # Connection is established lazily, so connect.
+            conn.connect()                              # evaluate connection
             return conn
 
         if not conf.BROKER_CONNECTION_RETRY:

+ 1 - 1
celery/worker/state.py

@@ -8,7 +8,7 @@ REVOKES_MAX = 10000
 
 # How many seconds a revoke will be active before
 # being expired when the max limit has been exceeded.
-REVOKE_EXPIRES = 3600 # One hour.
+REVOKE_EXPIRES = 3600  # (one hour)
 
 """
 .. data:: active_requests

+ 1 - 1
contrib/release/sphinx-to-rst.py

@@ -48,7 +48,7 @@ TO_RST_MAP = {RE_CODE_BLOCK: replace_code_block,
 
 
 def _process(lines):
-    lines = list(lines) # non-destructive
+    lines = list(lines)                         # non-destructive
     for i, line in enumerate(lines):
         for regex, alt in TO_RST_MAP.items():
             if callable(alt):

+ 2 - 2
docs/_ext/literals_to_xrefs.py

@@ -68,8 +68,8 @@ def fixliterals(fname):
             new.append(m.group(0))
             continue
 
-        sys.stdout.write("\n"+"-"*80+"\n")
-        sys.stdout.write(data[prev_start+1:m.start()])
+        sys.stdout.write("\n" + "-" * 80 + "\n")
+        sys.stdout.write(data[prev_start + 1:m.start()])
         sys.stdout.write(colorize(m.group(0), fg="red"))
         sys.stdout.write(data[m.end():next_end])
         sys.stdout.write("\n\n")

+ 0 - 0
docs/homepage/examplerun.py → docs/homepage/examplerun.rst


+ 0 - 0
docs/homepage/exampletask.py → docs/homepage/exampletask.rst


+ 1 - 1
docs/slidesource/slide-example1-result.py

@@ -6,5 +6,5 @@ res.get()
 
 res = MyTask.apply_async(args=[8, 4],
                          countdown=5)
-res.get() # Is executed after 5 seconds.
+res.get()   # Is executed after 5 seconds.
 #32

+ 1 - 1
examples/celery_http_gateway/manage.py

@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 from django.core.management import execute_manager
 try:
-    import settings # Assumed to be in the same directory.
+    import settings     # Assumed to be in the same directory.
 except ImportError:
     import sys
     sys.stderr.write(

+ 1 - 1
examples/httpexample/manage.py

@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 from django.core.management import execute_manager
 try:
-    import settings # Assumed to be in the same directory.
+    import settings     # Assumed to be in the same directory.
 except ImportError:
     import sys
     sys.stderr.write(

+ 1 - 0
funtests/config.py

@@ -21,6 +21,7 @@ CELERYD_LOG_COLOR = False
 
 CELERY_IMPORTS = ("celery.tests.functional.tasks", )
 
+
 @atexit.register
 def teardown_testdb():
     import os

+ 1 - 0
funtests/test_basic.py

@@ -6,6 +6,7 @@ from celery.tests.functional.case import WorkerCase
 
 from celery.task.control import broadcast
 
+
 class test_basic(WorkerCase):
 
     def test_started(self):

+ 1 - 0
pavement.py

@@ -6,6 +6,7 @@ options(
         sphinx=Bunch(builddir=".build"),
 )
 
+
 def sphinx_builddir(options):
     return path("docs") / options.sphinx.builddir / "html"
 

+ 1 - 1
setup.py

@@ -68,7 +68,7 @@ setup(
     zip_safe=False,
     install_requires=install_requires,
     tests_require=['nose', 'nose-cover3', 'unittest2', 'simplejson'],
-    cmdclass = {"quicktest": QuickRunTests},
+    cmdclass={"quicktest": QuickRunTests},
     test_suite="nose.collector",
     classifiers=[
         "Development Status :: 5 - Production/Stable",