Ver Fonte

PEP8ify (using new version of pep8, 0.6.0)

Ask Solem há 14 anos atrás
pai
commit
c408d07b47
74 ficheiros alterados com 186 adições e 160 exclusões
  1. 1 1
      celery/app/__init__.py
  2. 1 4
      celery/app/amqp.py
  3. 2 2
      celery/backends/__init__.py
  4. 1 0
      celery/backends/cache.py
  5. 4 3
      celery/backends/cassandra.py
  6. 0 2
      celery/backends/database.py
  7. 2 2
      celery/beat.py
  8. 2 0
      celery/bin/camqadm.py
  9. 1 0
      celery/bin/celeryctl.py
  10. 11 5
      celery/concurrency/processes/pool.py
  11. 1 1
      celery/concurrency/threads.py
  12. 2 2
      celery/conf.py
  13. 1 1
      celery/datastructures.py
  14. 1 2
      celery/events/cursesmon.py
  15. 1 1
      celery/events/state.py
  16. 1 0
      celery/exceptions.py
  17. 2 2
      celery/execute/__init__.py
  18. 0 1
      celery/loaders/default.py
  19. 4 5
      celery/log.py
  20. 1 1
      celery/management/commands/celeryd.py
  21. 3 1
      celery/messaging.py
  22. 1 0
      celery/models.py
  23. 0 1
      celery/serialization.py
  24. 5 3
      celery/task/base.py
  25. 3 6
      celery/task/control.py
  26. 1 1
      celery/task/http.py
  27. 3 2
      celery/task/sets.py
  28. 1 0
      celery/tests/__init__.py
  29. 2 0
      celery/tests/compat.py
  30. 2 2
      celery/tests/test_backends/test_database.py
  31. 2 2
      celery/tests/test_bin/test_celerybeat.py
  32. 11 8
      celery/tests/test_bin/test_celeryd.py
  33. 1 1
      celery/tests/test_buckets.py
  34. 2 2
      celery/tests/test_concurrency_processes.py
  35. 0 2
      celery/tests/test_events_state.py
  36. 4 3
      celery/tests/test_loaders.py
  37. 7 6
      celery/tests/test_result.py
  38. 10 9
      celery/tests/test_routes.py
  39. 0 1
      celery/tests/test_states.py
  40. 4 5
      celery/tests/test_task.py
  41. 8 6
      celery/tests/test_task_http.py
  42. 6 5
      celery/tests/test_task_sets.py
  43. 3 3
      celery/tests/test_utils_info.py
  44. 3 2
      celery/tests/test_worker.py
  45. 6 5
      celery/tests/test_worker_control.py
  46. 9 7
      celery/tests/test_worker_job.py
  47. 9 7
      celery/tests/utils.py
  48. 4 4
      celery/utils/__init__.py
  49. 3 2
      celery/utils/compat.py
  50. 1 1
      celery/utils/dispatch/signal.py
  51. 5 2
      celery/utils/functional.py
  52. 1 0
      celery/utils/term.py
  53. 2 1
      celery/utils/timer2.py
  54. 1 3
      celery/utils/timeutils.py
  55. 1 1
      celery/worker/buckets.py
  56. 1 1
      celery/worker/control/__init__.py
  57. 1 1
      celery/worker/control/builtins.py
  58. 1 1
      celery/worker/control/registry.py
  59. 3 3
      celery/worker/controllers.py
  60. 1 1
      celery/worker/heartbeat.py
  61. 2 3
      celery/worker/job.py
  62. 3 3
      celery/worker/listener.py
  63. 1 1
      celery/worker/state.py
  64. 1 1
      contrib/release/sphinx-to-rst.py
  65. 2 2
      docs/_ext/literals_to_xrefs.py
  66. 0 0
      docs/homepage/examplerun.rst
  67. 0 0
      docs/homepage/exampletask.rst
  68. 1 1
      docs/slidesource/slide-example1-result.py
  69. 1 1
      examples/celery_http_gateway/manage.py
  70. 1 1
      examples/httpexample/manage.py
  71. 1 0
      funtests/config.py
  72. 1 0
      funtests/test_basic.py
  73. 1 0
      pavement.py
  74. 1 1
      setup.py

+ 1 - 1
celery/app/__init__.py

@@ -123,7 +123,7 @@ class App(base.BaseApp):
                                 __module__=fun.__module__,
                                 __doc__=fun.__doc__)
                 T = type(fun.__name__, (base, ), cls_dict)()
-                return registry.tasks[T.name] # global instance.
+                return registry.tasks[T.name]             # global instance.
 
             return _create_task_cls
 

+ 1 - 4
celery/app/amqp.py

@@ -9,8 +9,6 @@ from celery import routes
 from celery import signals
 from celery.utils import gen_unique_id, mitemgetter, textindent
 
-
-
 MSG_OPTIONS = ("mandatory", "priority", "immediate",
                "routing_key", "serializer", "delivery_mode")
 QUEUE_FORMAT = """
@@ -81,7 +79,6 @@ class Queues(UserDict):
         return cls(queues)
 
 
-
 class TaskPublisher(messaging.Publisher):
     auto_declare = False
 
@@ -99,7 +96,7 @@ class TaskPublisher(messaging.Publisher):
         task_args = task_args or []
         task_kwargs = task_kwargs or {}
         now = None
-        if countdown: # Convert countdown to ETA.
+        if countdown:                           # Convert countdown to ETA.
             now = datetime.now()
             eta = now + timedelta(seconds=countdown)
 

+ 2 - 2
celery/backends/__init__.py

@@ -1,4 +1,4 @@
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.utils import get_cls_by_name
 from celery.utils.functional import curry
 
@@ -17,7 +17,7 @@ _backend_cache = {}
 
 def get_backend_cls(backend, loader=None):
     """Get backend class by name/alias"""
-    loader = loader or default_app.loader
+    loader = loader or app_or_default().loader
     if backend not in _backend_cache:
         aliases = dict(BACKEND_ALIASES, **loader.override_backends)
         _backend_cache[backend] = get_cls_by_name(backend, aliases)

+ 1 - 0
celery/backends/cache.py

@@ -43,6 +43,7 @@ backends = {"memcache": get_best_memcache,
             "pylibmc": get_best_memcache,
             "memory": DummyClient}
 
+
 class CacheBackend(KeyValueStoreBackend):
     _client = None
 

+ 4 - 3
celery/backends/cassandra.py

@@ -2,7 +2,7 @@
 try:
     import pycassa
     from thrift import Thrift
-    C = __import__('cassandra').ttypes # FIXME: Namespace kludge
+    C = __import__('cassandra').ttypes          # FIXME Namespace kludge
 except ImportError:
     pycassa = None
 
@@ -152,8 +152,9 @@ class CassandraBackend(BaseDictBackend):
 
         cf = self._get_column_family()
         column_parent = C.ColumnParent(cf.column_family)
-        slice_pred = C.SlicePredicate(slice_range=C.SliceRange('', end_column,
-                                                               count=2**30))
+        slice_pred = C.SlicePredicate(
+                            slice_range=C.SliceRange('', end_column,
+                                                     count=2 ** 30))
         columns = cf.client.multiget_slice(cf.keyspace, self._index_keys,
                                            column_parent, slice_pred,
                                            pycassa.ConsistencyLevel.DCQUORUM)

+ 0 - 2
celery/backends/database.py

@@ -13,7 +13,6 @@ except ImportError:
         "See http://pypi.python.org/pypi/SQLAlchemy")
 
 
-
 class DatabaseBackend(BaseDictBackend):
     """The database result backend."""
 
@@ -30,7 +29,6 @@ class DatabaseBackend(BaseDictBackend):
                     "Missing connection string! Do you have "
                     "CELERY_RESULT_DBURI set to a real value?")
 
-
     def ResultSession(self):
         return ResultSession(dburi=self.dburi, **self.engine_options)
 

+ 2 - 2
celery/beat.py

@@ -195,7 +195,7 @@ class Scheduler(UserDict):
                     entry.name, exc))
         return result
 
-    def send_task(self, *args, **kwargs): # pragma: no cover
+    def send_task(self, *args, **kwargs):               # pragma: no cover
         return self.app.send_task(*args, **kwargs)
 
     def setup_schedule(self):
@@ -316,7 +316,7 @@ class Service(object):
     def stop(self, wait=False):
         self.logger.info("Celerybeat: Shutting down...")
         self._shutdown.set()
-        wait and self._stopped.wait() # block until shutdown done.
+        wait and self._stopped.wait()           # block until shutdown done.
 
     @property
     def scheduler(self):

+ 2 - 0
celery/bin/camqadm.py

@@ -37,6 +37,7 @@ Example:
     -> queue.delete myqueue yes no
 """
 
+
 def say(m):
     sys.stderr.write("%s\n" % (m, ))
 
@@ -362,6 +363,7 @@ class AMQPAdminCommand(Command):
         options["app"] = self.app
         return AMQPAdmin(*args, **options).run()
 
+
 def camqadm(*args, **options):
     AMQPAdmin(*args, **options).run()
 

+ 1 - 0
celery/bin/celeryctl.py

@@ -167,6 +167,7 @@ class result(Command):
         self.out(self.prettify(result.get())[1])
 result = command(result)
 
+
 class inspect(Command):
     choices = {"active": 10,
                "scheduled": 1.0,

+ 11 - 5
celery/concurrency/processes/pool.py

@@ -48,6 +48,7 @@ SIG_SOFT_TIMEOUT = getattr(signal, "SIGUSR1", None)
 # Exceptions
 #
 
+
 class WorkerLostError(Exception):
     """The worker processing a job has exited prematurely."""
     pass
@@ -58,6 +59,7 @@ class WorkerLostError(Exception):
 
 job_counter = itertools.count()
 
+
 def mapstar(args):
     return map(*args)
 
@@ -191,7 +193,7 @@ class TaskHandler(PoolThread):
             else:
                 if set_length:
                     debug('doing set_length()')
-                    set_length(i+1)
+                    set_length(i + 1)
                 continue
             break
         else:
@@ -295,7 +297,7 @@ class TimeoutHandler(PoolThread):
                 elif i not in dirty and _timed_out(ack_time, t_soft):
                     _on_soft_timeout(job, i)
 
-            time.sleep(0.5) # Don't waste CPU cycles.
+            time.sleep(0.5)                     # Don't waste CPU cycles.
 
         debug('timeout handler exiting')
 
@@ -368,7 +370,6 @@ class ResultHandler(PoolThread):
 
                 on_state_change(task)
 
-
         if putlock is not None:
             try:
                 putlock.release()
@@ -765,6 +766,7 @@ DynamicPool = Pool
 # Class whose instances are returned by `Pool.apply_async()`
 #
 
+
 class ApplyResult(object):
 
     def __init__(self, cache, callback, accept_callback=None,
@@ -841,6 +843,7 @@ class ApplyResult(object):
 # Class whose instances are returned by `Pool.map_async()`
 #
 
+
 class MapResult(ApplyResult):
 
     def __init__(self, cache, chunksize, length, callback):
@@ -856,12 +859,12 @@ class MapResult(ApplyResult):
             self._number_left = 0
             self._ready = True
         else:
-            self._number_left = length//chunksize + bool(length % chunksize)
+            self._number_left = length // chunksize + bool(length % chunksize)
 
     def _set(self, i, success_result):
         success, result = success_result
         if success:
-            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
+            self._value[i * self._chunksize:(i + 1) * self._chunksize] = result
             self._number_left -= 1
             if self._number_left == 0:
                 if self._callback:
@@ -907,6 +910,7 @@ class MapResult(ApplyResult):
 # Class whose instances are returned by `Pool.imap()`
 #
 
+
 class IMapIterator(object):
 
     def __init__(self, cache):
@@ -980,6 +984,7 @@ class IMapIterator(object):
 # Class whose instances are returned by `Pool.imap_unordered()`
 #
 
+
 class IMapUnorderedIterator(IMapIterator):
 
     def _set(self, i, obj):
@@ -997,6 +1002,7 @@ class IMapUnorderedIterator(IMapIterator):
 #
 #
 
+
 class ThreadPool(Pool):
 
     from multiprocessing.dummy import Process as DummyProcess

+ 1 - 1
celery/concurrency/threads.py

@@ -61,7 +61,7 @@ class TaskPool(object):
 
         if isinstance(ret_value, ExceptionInfo):
             if isinstance(ret_value.exception, (
-                    SystemExit, KeyboardInterrupt)): # pragma: no cover
+                    SystemExit, KeyboardInterrupt)):    # pragma: no cover
                 raise ret_value.exception
             [errback(ret_value) for errback in errbacks]
         else:

+ 2 - 2
celery/conf.py

@@ -6,10 +6,10 @@ Use :mod:`celery.defaults` instead.
 
 
 """
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.app.defaults import DEFAULTS as _DEFAULTS
 
-conf = default_app.conf
+conf = app_or_default().conf
 
 ALWAYS_EAGER = conf.CELERY_ALWAYS_EAGER
 EAGER_PROPAGATES_EXCEPTIONS = conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS

+ 1 - 1
celery/datastructures.py

@@ -253,7 +253,7 @@ class LimitedSet(object):
                 if not self.expires or time.time() > when + self.expires:
                     try:
                         self.pop_value(value)
-                    except TypeError: # pragma: no cover
+                    except TypeError:                   # pragma: no cover
                         continue
             break
 

+ 1 - 2
celery/events/cursesmon.py

@@ -168,7 +168,7 @@ class CursesMonitor(object):
             while True:
                 ch = self.win.getch(x, y + i)
                 if ch != -1:
-                    if ch in (10, curses.KEY_ENTER): # enter
+                    if ch in (10, curses.KEY_ENTER):            # enter
                         break
                     if ch in (27, ):
                         buffer = str()
@@ -303,7 +303,6 @@ class CursesMonitor(object):
         else:
             win.addstr(my - 5, x, "No task selected", curses.A_NORMAL)
 
-
         # Workers
         if self.workers:
             win.addstr(my - 4, x, self.online_str, curses.A_BOLD)

+ 1 - 1
celery/events/state.py

@@ -10,7 +10,7 @@ from celery import states
 from celery.datastructures import AttributeDict, LocalCache
 from celery.utils import kwdict
 
-HEARTBEAT_EXPIRE = 150 # 2 minutes, 30 seconds
+HEARTBEAT_EXPIRE = 150                      # 2 minutes, 30 seconds
 
 
 class Element(AttributeDict):

+ 1 - 0
celery/exceptions.py

@@ -8,6 +8,7 @@ UNREGISTERED_FMT = """
 Task of kind %s is not registered, please make sure it's imported.
 """.strip()
 
+
 class QueueNotFound(KeyError):
     """Task routed to a queue not in CELERY_QUEUES."""
     pass

+ 2 - 2
celery/execute/__init__.py

@@ -1,4 +1,4 @@
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.registry import tasks
 
 
@@ -17,7 +17,7 @@ def apply(task, *args, **kwargs):
 def send_task(*args, **kwargs):
     """Deprecated. See :meth:`celery.app.App.send_task`."""
     # FIXME Deprecate!
-    return default_app.send_task(*args, **kwargs)
+    return app_or_default().send_task(*args, **kwargs)
 
 
 def delay_task(task_name, *args, **kwargs):

+ 0 - 1
celery/loaders/default.py

@@ -45,7 +45,6 @@ class Loader(BaseLoader):
 
         return settings
 
-
     def read_configuration(self):
         """Read configuration from ``celeryconfig.py`` and configure
         celery and Django so it can be used by regular Python."""

+ 4 - 5
celery/log.py

@@ -10,7 +10,7 @@ from multiprocessing import current_process
 from multiprocessing import util as mputil
 
 from celery import signals
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.utils import noop
 from celery.utils.compat import LoggerAdapter
 from celery.utils.patch import ensure_process_aware_logger
@@ -167,7 +167,7 @@ class Logging(object):
     def _setup_logger(self, logger, logfile, format, colorize,
             formatter=ColorFormatter, **kwargs):
 
-        if logger.handlers: # Logger already configured
+        if logger.handlers:                 # Logger already configured
             return logger
 
         handler = self._detect_handler(logfile)
@@ -176,7 +176,7 @@ class Logging(object):
         return logger
 
 
-_default_logging = Logging(default_app)
+_default_logging = Logging(app_or_default())
 setup_logging_subsystem = _default_logging.setup_logging_subsystem
 get_default_logger = _default_logging.get_default_logger
 setup_logger = _default_logging.setup_logger
@@ -185,7 +185,6 @@ get_task_logger = _default_logging.get_task_logger
 redirect_stdouts_to_logger = _default_logging.redirect_stdouts_to_logger
 
 
-
 class LoggingProxy(object):
     """Forward file object to :class:`logging.Logger` instance.
 
@@ -209,7 +208,7 @@ class LoggingProxy(object):
         ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
         infinite loops."""
 
-        def wrap_handler(handler): # pragma: no cover
+        def wrap_handler(handler):                  # pragma: no cover
 
             class WithSafeHandleError(logging.Handler):
 

+ 1 - 1
celery/management/commands/celeryd.py

@@ -5,7 +5,7 @@ Start the celery daemon from the Django management command.
 """
 from django.core.management.base import BaseCommand
 
-import celery.models # <-- shows upgrade instructions at exit.
+import celery.models            # <-- shows upgrade instructions at exit.
 
 
 class Command(BaseCommand):

+ 3 - 1
celery/messaging.py

@@ -4,12 +4,14 @@ Sending and Receiving Messages
 
 """
 
-from celery.app import app_or_default, default_app
+from celery.app import app_or_default
 
+default_app = app_or_default()
 TaskPublisher = default_app.amqp.TaskPublisher
 ConsumerSet = default_app.amqp.ConsumerSet
 TaskConsumer = default_app.amqp.TaskConsumer
 
+
 def establish_connection(**kwargs):
     """Establish a connection to the message broker."""
     # FIXME: # Deprecate

+ 1 - 0
celery/models.py

@@ -9,6 +9,7 @@ import atexit
 
 from django.core.exceptions import ImproperlyConfigured
 
+
 @atexit.register
 def _display_help():
     import sys

+ 0 - 1
celery/serialization.py

@@ -71,7 +71,6 @@ def find_nearest_pickleable_exception(exc):
         # Use inspect.getmro() to traverse bases instead.
         getmro_ = lambda: inspect.getmro(cls)
 
-
     for supercls in getmro_():
         if supercls in unwanted_base_classes:
             # only BaseException and object, from here on down,

+ 5 - 3
celery/task/base.py

@@ -30,6 +30,7 @@ extract_exec_options = mattrgetter("queue", "routing_key",
                                    "mandatory", "priority",
                                    "serializer", "delivery_mode")
 
+
 def _unpickle_task(name):
     return tasks[name]
 
@@ -238,7 +239,7 @@ class BaseTask(object):
     store_errors_even_if_ignored = False
     send_error_emails = False
     error_whitelist = ()
-    disable_error_emails = False # FIXME
+    disable_error_emails = False                            # FIXME
     max_retries = 3
     default_retry_delay = 3 * 60
     serializer = "pickle"
@@ -518,7 +519,7 @@ class BaseTask(object):
         if kwargs.get("task_is_eager", False):
             result = self.apply(args=args, kwargs=kwargs, **options)
             if isinstance(result, EagerResult):
-                return result.get() # propogates exceptions.
+                return result.get()             # propogates exceptions.
             return result
 
         self.apply_async(args=args, kwargs=kwargs, **options)
@@ -549,7 +550,8 @@ class BaseTask(object):
         throw = self.app.either("CELERY_EAGER_PROPAGATES_EXCEPTIONS",
                                 options.pop("throw", None))
 
-        task = tasks[self.name] # Make sure we get the instance, not class.
+        # Make sure we get the task instance, not class.
+        task = tasks[self.name]
 
         default_kwargs = {"task_name": task.name,
                           "task_id": task_id,

+ 3 - 6
celery/task/control.py

@@ -1,4 +1,4 @@
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.pidbox import BroadcastPublisher, ControlReplyConsumer
 from celery.utils import gen_unique_id
 
@@ -59,7 +59,7 @@ class Inspect(object):
         return self._request("disable_events")
 
     def diagnose(self):
-        diagnose_timeout = self.timeout * 0.85 # 15% of timeout
+        diagnose_timeout = self.timeout * 0.85              # 15% of timeout
         return self._request("diagnose", timeout=diagnose_timeout)
 
     def ping(self):
@@ -95,7 +95,6 @@ class Control(object):
         return self.app.with_default_connection(_do_discard)(
                 connection=connection, connect_timeout=connect_timeout)
 
-
     def revoke(self, task_id, destination=None, **kwargs):
         """Revoke a task by id.
 
@@ -117,7 +116,6 @@ class Control(object):
         return self.broadcast("revoke", destination=destination,
                               arguments={"task_id": task_id}, **kwargs)
 
-
     def ping(self, destination=None, timeout=1, **kwargs):
         """Ping workers.
 
@@ -137,7 +135,6 @@ class Control(object):
         return self.broadcast("ping", reply=True, destination=destination,
                               timeout=timeout, **kwargs)
 
-
     def rate_limit(self, task_name, rate_limit, destination=None, **kwargs):
         """Set rate limit for task by type.
 
@@ -215,7 +212,7 @@ class Control(object):
                 connection=connection, connect_timeout=connect_timeout)
 
 
-_default_control = Control(default_app)
+_default_control = Control(app_or_default())
 broadcast = _default_control.broadcast
 rate_limit = _default_control.rate_limit
 ping = _default_control.ping

+ 1 - 1
celery/task/http.py

@@ -125,7 +125,7 @@ class HttpDispatch(object):
         """Makes an HTTP request and returns the response."""
         request = urllib2.Request(url, params, headers=self.http_headers)
         request.headers.update(self.http_headers)
-        response = urllib2.urlopen(request) # user catches errors.
+        response = urllib2.urlopen(request)         # user catches errors.
         return response.read()
 
     def dispatch(self):

+ 3 - 2
celery/task/sets.py

@@ -32,6 +32,7 @@ Thank you for your patience!
 
 """
 
+
 class subtask(AttributeDict):
     """Class that wraps the arguments and execution options
     for a single task invocation.
@@ -123,8 +124,8 @@ class TaskSet(UserList):
         >>> list_of_return_values = taskset_result.join()
 
     """
-    _task = None # compat
-    _task_name = None # compat
+    _task = None                # compat
+    _task_name = None           # compat
 
     def __init__(self, task=None, tasks=None, app=None):
         if task is not None:

+ 1 - 0
celery/tests/__init__.py

@@ -6,6 +6,7 @@ config = os.environ.setdefault("CELERY_TEST_CONFIG_MODULE",
 os.environ["CELERY_CONFIG_MODULE"] = config
 os.environ["CELERY_LOADER"] = "default"
 
+
 def teardown():
     import os
     if os.path.exists("test.db"):

+ 2 - 0
celery/tests/compat.py

@@ -69,8 +69,10 @@ class catch_warnings(object):
         self._showwarning = self._module.showwarning
         if self._record:
             log = []
+
             def showwarning(*args, **kwargs):
                 log.append(WarningMessage(*args, **kwargs))
+
             self._module.showwarning = showwarning
             return log
         else:

+ 2 - 2
celery/tests/test_backends/test_database.py

@@ -5,7 +5,7 @@ from datetime import datetime
 from celery.exceptions import ImproperlyConfigured
 
 from celery import states
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.db.models import Task, TaskSet
 from celery.utils import gen_unique_id
 from celery.backends.database import DatabaseBackend
@@ -20,7 +20,7 @@ class SomeClass(object):
 class test_DatabaseBackend(unittest.TestCase):
 
     def test_missing_dburi_raises_ImproperlyConfigured(self):
-        conf = default_app.conf
+        conf = app_or_default().conf
         prev, conf.CELERY_RESULT_DBURI = conf.CELERY_RESULT_DBURI, None
         try:
             self.assertRaises(ImproperlyConfigured, DatabaseBackend)

+ 2 - 2
celery/tests/test_bin/test_celerybeat.py

@@ -4,7 +4,7 @@ import unittest2 as unittest
 
 from celery import beat
 from celery import platform
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.bin import celerybeat as celerybeat_bin
 from celery.apps import beat as beatapp
 
@@ -110,6 +110,6 @@ class test_div(unittest.TestCase):
 
     def test_parse_options(self):
         cmd = celerybeat_bin.BeatCommand()
-        cmd.app = default_app
+        cmd.app = app_or_default()
         options, args = cmd.parse_options("celerybeat", ["-s", "foo"])
         self.assertEqual(options.schedule, "foo")

+ 11 - 8
celery/tests/test_bin/test_celeryd.py

@@ -9,7 +9,7 @@ from StringIO import StringIO
 from celery import Celery
 from celery import platform
 from celery import signals
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.apps import worker as cd
 from celery.bin.celeryd import WorkerCommand, main as celeryd_main
 from celery.exceptions import ImproperlyConfigured
@@ -22,6 +22,7 @@ from celery.tests.utils import execute_context
 
 patch.ensure_process_aware_logger()
 
+
 def disable_stdouts(fun):
 
     @wraps(fun)
@@ -54,7 +55,7 @@ class test_Worker(unittest.TestCase):
 
     @disable_stdouts
     def test_queues_string(self):
-        celery = Celery()
+        celery = Celery(set_as_current=False)
         worker = celery.Worker(queues="foo,bar,baz")
         worker.init_queues()
         self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
@@ -123,8 +124,9 @@ class test_Worker(unittest.TestCase):
 
     @disable_stdouts
     def test_init_queues(self):
-        c = default_app.conf
-        p, default_app.amqp.queues = default_app.amqp.queues, {
+        app = app_or_default()
+        c = app.conf
+        p, app.amqp.queues = app.amqp.queues, {
                 "celery": {"exchange": "celery",
                            "binding_key": "celery"},
                 "video": {"exchange": "video",
@@ -148,12 +150,12 @@ class test_Worker(unittest.TestCase):
                                            "exchange_type": "direct"},
                                             worker.queues["image"])
         finally:
-            default_app.amqp.queues = p
+            app.amqp.queues = p
 
     @disable_stdouts
     def test_on_listener_ready(self):
-
         worker_ready_sent = [False]
+
         def on_worker_ready(**kwargs):
             worker_ready_sent[0] = True
 
@@ -186,7 +188,7 @@ class test_funs(unittest.TestCase):
     @disable_stdouts
     def test_parse_options(self):
         cmd = WorkerCommand()
-        cmd.app = default_app
+        cmd.app = app_or_default()
         opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
         self.assertEqual(opts.concurrency, 512)
 
@@ -239,10 +241,11 @@ class test_signal_handlers(unittest.TestCase):
     def test_worker_int_handler(self):
         worker = self._Worker()
         handlers = self.psig(cd.install_worker_int_handler, worker)
-
         next_handlers = {}
+
         def i(sig, handler):
             next_handlers[sig] = handler
+
         p = platform.install_signal_handler
         platform.install_signal_handler = i
         try:

+ 1 - 1
celery/tests/test_buckets.py

@@ -261,7 +261,7 @@ class test_TaskBucket(unittest.TestCase):
             for i, job in enumerate(jobs):
                 sys.stderr.write("0")
                 self.assertTrue(b.get(), job)
-            self.assertEqual(i+1, len(jobs))
+            self.assertEqual(i + 1, len(jobs))
         finally:
             self.registry.unregister(TaskD)
 

+ 2 - 2
celery/tests/test_concurrency_processes.py

@@ -61,8 +61,8 @@ class test_TaskPool(unittest.TestCase):
         self.assertTrue(_pool.terminated)
 
     def test_on_ready_exception(self):
-
         scratch = [None]
+
         def errback(retval):
             scratch[0] = retval
 
@@ -72,8 +72,8 @@ class test_TaskPool(unittest.TestCase):
         self.assertEqual(exc, scratch[0])
 
     def test_on_ready_value(self):
-
         scratch = [None]
+
         def callback(retval):
             scratch[0] = retval
 

+ 0 - 2
celery/tests/test_events_state.py

@@ -245,7 +245,6 @@ class test_State(unittest.TestCase):
         s.freeze_while(work)
         self.assertState(s)
 
-
     def test_freeze_thaw__not_buffering(self):
         s = State()
         r = ev_snapshot(s)
@@ -322,4 +321,3 @@ class test_State(unittest.TestCase):
         s = State(callback=callback)
         s.event({"type": "worker-online"})
         self.assertTrue(scratch.get("recv"))
-

+ 4 - 3
celery/tests/test_loaders.py

@@ -54,9 +54,10 @@ class TestDefaultLoader(unittest.TestCase):
 
     def test_wanted_module_item(self):
         self.assertTrue(default.wanted_module_item("FOO"))
-        self.assertTrue(default.wanted_module_item("foo"))
-        self.assertFalse(default.wanted_module_item("_foo"))
-        self.assertFalse(default.wanted_module_item("__foo"))
+        self.assertTrue(default.wanted_module_item("Foo"))
+        self.assertFalse(default.wanted_module_item("_FOO"))
+        self.assertFalse(default.wanted_module_item("__FOO"))
+        self.assertFalse(default.wanted_module_item("foo"))
 
     def test_read_configuration(self):
         from types import ModuleType

+ 7 - 6
celery/tests/test_result.py

@@ -3,7 +3,7 @@ from __future__ import generators
 import unittest2 as unittest
 
 from celery import states
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.utils import gen_unique_id
 from celery.utils.compat import all
 from celery.result import AsyncResult, TaskSetResult
@@ -18,14 +18,15 @@ def mock_task(name, status, result):
 
 
 def save_result(task):
+    app = app_or_default()
     traceback = "Some traceback"
     if task["status"] == states.SUCCESS:
-        default_app.backend.mark_as_done(task["id"], task["result"])
+        app.backend.mark_as_done(task["id"], task["result"])
     elif task["status"] == states.RETRY:
-        default_app.backend.mark_as_retry(task["id"], task["result"],
+        app.backend.mark_as_retry(task["id"], task["result"],
                 traceback=traceback)
     else:
-        default_app.backend.mark_as_failure(task["id"], task["result"],
+        app.backend.mark_as_failure(task["id"], task["result"],
                 traceback=traceback)
 
 
@@ -109,7 +110,7 @@ class TestAsyncResult(unittest.TestCase):
         self.assertIsInstance(nok2_res.result, KeyError)
 
     def test_get_timeout(self):
-        res = AsyncResult(self.task4["id"]) # has RETRY status
+        res = AsyncResult(self.task4["id"])             # has RETRY status
         self.assertRaises(TimeoutError, res.get, timeout=0.1)
 
         pending_res = AsyncResult(gen_unique_id())
@@ -117,7 +118,7 @@ class TestAsyncResult(unittest.TestCase):
 
     @skip_if_quick
     def test_get_timeout_longer(self):
-        res = AsyncResult(self.task4["id"]) # has RETRY status
+        res = AsyncResult(self.task4["id"])             # has RETRY status
         self.assertRaises(TimeoutError, res.get, timeout=1)
 
     def test_ready(self):

+ 10 - 9
celery/tests/test_routes.py

@@ -1,7 +1,7 @@
 import unittest2 as unittest
 
 from celery import routes
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.utils import maybe_promise
 from celery.utils.functional import wraps
 from celery.exceptions import QueueNotFound
@@ -18,12 +18,13 @@ def with_queues(**queues):
     def patch_fun(fun):
         @wraps(fun)
         def __inner(*args, **kwargs):
-            prev_queues = default_app.conf.CELERY_QUEUES
-            default_app.conf.CELERY_QUEUES = queues
+            app = app_or_default()
+            prev_queues = app.conf.CELERY_QUEUES
+            app.conf.CELERY_QUEUES = queues
             try:
                 return fun(*args, **kwargs)
             finally:
-                default_app.conf.CELERY_QUEUES = prev_queues
+                app.conf.CELERY_QUEUES = prev_queues
         return __inner
     return patch_fun
 
@@ -40,7 +41,7 @@ class test_MapRoute(unittest.TestCase):
 
     @with_queues(foo=a_queue, bar=b_queue)
     def test_route_for_task_expanded_route(self):
-        expand = E(default_app.conf.CELERY_QUEUES)
+        expand = E(app_or_default().conf.CELERY_QUEUES)
         route = routes.MapRoute({"celery.ping": {"queue": "foo"}})
         self.assertDictContainsSubset(a_queue,
                              expand(route.route_for_task("celery.ping")))
@@ -48,14 +49,14 @@ class test_MapRoute(unittest.TestCase):
 
     @with_queues(foo=a_queue, bar=b_queue)
     def test_route_for_task(self):
-        expand = E(default_app.conf.CELERY_QUEUES)
+        expand = E(app_or_default().conf.CELERY_QUEUES)
         route = routes.MapRoute({"celery.ping": b_queue})
         self.assertDictContainsSubset(b_queue,
                              expand(route.route_for_task("celery.ping")))
         self.assertIsNone(route.route_for_task("celery.awesome"))
 
     def test_expand_route_not_found(self):
-        expand = E(default_app.conf.CELERY_QUEUES)
+        expand = E(app_or_default().conf.CELERY_QUEUES)
         route = routes.MapRoute({"a": {"queue": "x"}})
         self.assertRaises(QueueNotFound, expand, route.route_for_task("a"))
 
@@ -70,7 +71,7 @@ class test_lookup_route(unittest.TestCase):
     def test_lookup_takes_first(self):
         R = routes.prepare(({"celery.ping": {"queue": "bar"}},
                             {"celery.ping": {"queue": "foo"}}))
-        router = routes.Router(R, default_app.conf.CELERY_QUEUES)
+        router = routes.Router(R, app_or_default().conf.CELERY_QUEUES)
         self.assertDictContainsSubset(b_queue,
                 router.route({}, "celery.ping",
                     args=[1, 2], kwargs={}))
@@ -79,7 +80,7 @@ class test_lookup_route(unittest.TestCase):
     def test_lookup_paths_traversed(self):
         R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
                             {"celery.ping": {"queue": "foo"}}))
-        router = routes.Router(R, default_app.conf.CELERY_QUEUES)
+        router = routes.Router(R, app_or_default().conf.CELERY_QUEUES)
         self.assertDictContainsSubset(a_queue,
                 router.route({}, "celery.ping",
                     args=[1, 2], kwargs={}))

+ 0 - 1
celery/tests/test_states.py

@@ -26,4 +26,3 @@ class test_state_precedence(unittest.TestCase):
         self.assertLess(state("CRASHED"), state(states.SUCCESS))
         self.assertLess(state("CRASHED"), state(states.FAILURE))
         self.assertTrue(state(states.REVOKED) < state("CRASHED"))
-

+ 4 - 5
celery/tests/test_task.py

@@ -6,7 +6,7 @@ from pyparsing import ParseException
 
 
 from celery import task
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.task.schedules import crontab, crontab_parser
 from celery.utils import timeutils
 from celery.utils import gen_unique_id
@@ -204,7 +204,7 @@ class MockPublisher(object):
 
     def __init__(self, *args, **kwargs):
         self.kwargs = kwargs
-        self.connection = default_app.broker_connection()
+        self.connection = app_or_default().broker_connection()
 
     def declare(self):
         self._declared = True
@@ -381,7 +381,6 @@ class TestTaskSet(unittest.TestCase):
         ])
         self.assertEqual(ts.total, 9)
 
-
         consumer = IncrementCounterTask().get_consumer()
         consumer.discard_all()
         taskset_res = ts.apply_async()
@@ -403,11 +402,11 @@ class TestTaskApply(unittest.TestCase):
         self.assertRaises(KeyError, RaisingTask.apply, throw=True)
 
     def test_apply_with_CELERY_EAGER_PROPAGATES_EXCEPTIONS(self):
-        default_app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
+        RaisingTask.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
         try:
             self.assertRaises(KeyError, RaisingTask.apply)
         finally:
-            default_app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
+            RaisingTask.app.conf.CELERY_EAGER_PROPAGATES_EXCEPTIONS = False
 
     def test_apply(self):
         IncrementCounterTask.count = 0

+ 8 - 6
celery/tests/test_task_http.py

@@ -167,7 +167,8 @@ class TestHttpDispatch(unittest.TestCase):
 class TestURL(unittest.TestCase):
 
     def test_URL_get_async(self):
-        def with_eager_tasks(_val):
+        http.HttpDispatchTask.app.conf.CELERY_ALWAYS_EAGER = True
+        try:
 
             def with_mock_urlopen(_val):
                 d = http.URL("http://example.com/mul").get_async(x=10, y=10)
@@ -175,11 +176,12 @@ class TestURL(unittest.TestCase):
 
             context = mock_urlopen(success_response(100))
             execute_context(context, with_mock_urlopen)
-
-        execute_context(eager_tasks(), with_eager_tasks)
+        finally:
+            http.HttpDispatchTask.app.conf.CELERY_ALWAYS_EAGER = False
 
     def test_URL_post_async(self):
-        def with_eager_tasks(_val):
+        http.HttpDispatchTask.app.conf.CELERY_ALWAYS_EAGER = True
+        try:
 
             def with_mock_urlopen(_val):
                 d = http.URL("http://example.com/mul").post_async(x=10, y=10)
@@ -187,5 +189,5 @@ class TestURL(unittest.TestCase):
 
             context = mock_urlopen(success_response(100))
             execute_context(context, with_mock_urlopen)
-
-        execute_context(eager_tasks(), with_eager_tasks)
+        finally:
+            http.HttpDispatchTask.app.conf.CELERY_ALWAYS_EAGER = False

+ 6 - 5
celery/tests/test_task_sets.py

@@ -2,7 +2,7 @@ import unittest2 as unittest
 
 import simplejson
 
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.task import Task
 from celery.task.sets import subtask, TaskSet
 
@@ -81,8 +81,8 @@ class test_subtask(unittest.TestCase):
     def test_is_JSON_serializable(self):
         s = MockTask.subtask((2, ), {"cache": True},
                 {"routing_key": "CPU-bound"})
-        s.args = list(s.args) # tuples are not preserved
-                              # but this doesn't matter.
+        s.args = list(s.args)                   # tuples are not preserved
+                                                # but this doesn't matter.
         self.assertEqual(s,
                          subtask(simplejson.loads(simplejson.dumps(s))))
 
@@ -130,6 +130,7 @@ class test_TaskSet(unittest.TestCase):
         self.assertEqual(len(ts), 3)
 
     def test_respects_ALWAYS_EAGER(self):
+        app = app_or_default()
 
         class MockTaskSet(TaskSet):
             applied = 0
@@ -139,11 +140,11 @@ class test_TaskSet(unittest.TestCase):
 
         ts = MockTaskSet([MockTask.subtask((i, i))
                         for i in (2, 4, 8)])
-        default_app.conf.CELERY_ALWAYS_EAGER = True
+        app.conf.CELERY_ALWAYS_EAGER = True
         try:
             ts.apply_async()
         finally:
-            default_app.conf.CELERY_ALWAYS_EAGER = False
+            app.conf.CELERY_ALWAYS_EAGER = False
         self.assertEqual(ts.applied, 1)
 
     def test_apply_async(self):

+ 3 - 3
celery/tests/test_utils_info.py

@@ -1,7 +1,7 @@
 import unittest2 as unittest
 
 from celery import Celery
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.utils import textindent
 from celery.utils.timeutils import humanize_seconds
 
@@ -59,9 +59,9 @@ class TestInfo(unittest.TestCase):
         self.assertEqual(textindent(RANDTEXT, 4), RANDTEXT_RES)
 
     def test_format_queues(self):
-        celery = Celery()
+        celery = Celery(set_as_current=False)
         celery.amqp.queues = QUEUES
         self.assertEqual(celery.amqp.queues.format(), QUEUE_FORMAT)
 
     def test_broker_info(self):
-        default_app.amqp.format_broker_info()
+        app_or_default().amqp.format_broker_info()

+ 3 - 2
celery/tests/test_worker.py

@@ -177,8 +177,8 @@ class test_CarrotListener(unittest.TestCase):
 
         it = l._mainloop()
         self.assertTrue(it.next(), "draining")
-
         records = {}
+
         def create_recorder(key):
             def _recorder(*args, **kwargs):
                 records[key] = True
@@ -417,10 +417,10 @@ class test_CarrotListener(unittest.TestCase):
                 return self.wait_method
 
         called_back = [False]
+
         def init_callback(listener):
             called_back[0] = True
 
-
         l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
@@ -439,6 +439,7 @@ class test_CarrotListener(unittest.TestCase):
         l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
+
         def raises_socket_error(limit=None):
             yield True
             l.iterations = 1

+ 6 - 5
celery/tests/test_worker_control.py

@@ -3,7 +3,7 @@ import unittest2 as unittest
 
 from celery.utils.timer2 import Timer
 
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.decorators import task
 from celery.registry import tasks
 from celery.task.builtins import PingTask
@@ -16,7 +16,7 @@ from celery.worker.state import revoked
 hostname = socket.gethostname()
 
 
-@task(rate_limit=200) # for extra info in dump_tasks
+@task(rate_limit=200)                   # for extra info in dump_tasks
 def mytask():
     pass
 
@@ -46,7 +46,7 @@ class Listener(object):
                                          args=(2, 2),
                                          kwargs={}))
         self.eta_schedule = Timer()
-        self.app = default_app
+        self.app = app_or_default()
         self.event_dispatcher = Dispatcher()
 
 
@@ -100,13 +100,14 @@ class test_ControlPanel(unittest.TestCase):
         self.assertFalse(panel.execute("dump_reserved"))
 
     def test_rate_limit_when_disabled(self):
-        default_app.conf.CELERY_DISABLE_RATE_LIMITS = True
+        app = app_or_default()
+        app.conf.CELERY_DISABLE_RATE_LIMITS = True
         try:
             e = self.panel.execute("rate_limit", kwargs=dict(
                  task_name=mytask.name, rate_limit="100/m"))
             self.assertIn("rate limits disabled", e.get("error"))
         finally:
-            default_app.conf.CELERY_DISABLE_RATE_LIMITS = False
+            app.conf.CELERY_DISABLE_RATE_LIMITS = False
 
     def test_rate_limit_invalid_rate_limit_string(self):
         e = self.panel.execute("rate_limit", kwargs=dict(

+ 9 - 7
celery/tests/test_worker_job.py

@@ -9,7 +9,7 @@ from StringIO import StringIO
 from carrot.backends.base import BaseMessage
 
 from celery import states
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.decorators import task as task_dec
 from celery.exceptions import RetryTaskError, NotRegistered
@@ -42,7 +42,7 @@ def mytask(i, **kwargs):
     return i ** i
 
 
-@task_dec # traverses coverage for decorator without parens
+@task_dec               # traverses coverage for decorator without parens
 def mytask_no_kwargs(i):
     return i ** i
 
@@ -126,14 +126,15 @@ class test_TaskRequest(unittest.TestCase):
 
     def test_send_email(self):
         from celery.worker import job
-        old_mail_admins = default_app.mail_admins
+        app = app_or_default()
+        old_mail_admins = app.mail_admins
         old_enable_mails = mytask.send_error_emails
         mail_sent = [False]
 
         def mock_mail_admins(*args, **kwargs):
             mail_sent[0] = True
 
-        default_app.mail_admins = mock_mail_admins
+        app.mail_admins = mock_mail_admins
         mytask.send_error_emails = True
         try:
             tw = TaskRequest(mytask.name, gen_unique_id(), [1], {"f": "x"})
@@ -151,7 +152,7 @@ class test_TaskRequest(unittest.TestCase):
             self.assertFalse(mail_sent[0])
 
         finally:
-            default_app.mail_admins = old_mail_admins
+            app.mail_admins = old_mail_admins
             mytask.send_error_emails = old_enable_mails
 
     def test_already_revoked(self):
@@ -445,6 +446,7 @@ class test_TaskRequest(unittest.TestCase):
                     "task_name": tw.task_name})
 
     def _test_on_failure(self, exception):
+        app = app_or_default()
         tid = gen_unique_id()
         tw = TaskRequest(mytask.name, tid, [4], {"f": "x"})
         try:
@@ -457,7 +459,7 @@ class test_TaskRequest(unittest.TestCase):
         tw.logger = setup_logger(logfile=logfh, loglevel=logging.INFO,
                                  root=False)
 
-        default_app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
+        app.conf.CELERY_SEND_TASK_ERROR_EMAILS = True
 
         tw.on_failure(exc_info)
         logvalue = logfh.getvalue()
@@ -465,7 +467,7 @@ class test_TaskRequest(unittest.TestCase):
         self.assertIn(tid, logvalue)
         self.assertIn("ERROR", logvalue)
 
-        default_app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
+        app.conf.CELERY_SEND_TASK_ERROR_EMAILS = False
 
     def test_on_failure(self):
         self._test_on_failure(Exception("Inside unit tests"))

+ 9 - 7
celery/tests/utils.py

@@ -7,7 +7,7 @@ from StringIO import StringIO
 
 from nose import SkipTest
 
-from celery.app import default_app
+from celery.app import app_or_default
 from celery.utils.functional import wraps
 
 
@@ -70,25 +70,27 @@ from celery.utils import noop
 
 @contextmanager
 def eager_tasks():
+    app = app_or_default()
 
-    prev = default_app.conf.CELERY_ALWAYS_EAGER
-    default_app.conf.CELERY_ALWAYS_EAGER = True
+    prev = app.conf.CELERY_ALWAYS_EAGER
+    app.conf.CELERY_ALWAYS_EAGER = True
 
     yield True
 
-    default_app.conf.CELERY_ALWAYS_EAGER = prev
+    app.conf.CELERY_ALWAYS_EAGER = prev
 
 
 def with_eager_tasks(fun):
 
     @wraps(fun)
     def _inner(*args, **kwargs):
-        prev = default_app.conf.CELERY_ALWAYS_EAGER
-        default_app.conf.CELERY_ALWAYS_EAGER = True
+        app = app_or_default()
+        prev = app.conf.CELERY_ALWAYS_EAGER
+        app.conf.CELERY_ALWAYS_EAGER = True
         try:
             return fun(*args, **kwargs)
         finally:
-            default_app.conf.CELERY_ALWAYS_EAGER = prev
+            app.conf.CELERY_ALWAYS_EAGER = prev
 
 
 def with_environ(env_name, env_value):

+ 4 - 4
celery/utils/__init__.py

@@ -17,7 +17,7 @@ from itertools import islice
 from carrot.utils import rpartition
 
 from celery.utils.compat import all, any, defaultdict
-from celery.utils.timeutils import timedelta_seconds # was here before
+from celery.utils.timeutils import timedelta_seconds        # was here before
 from celery.utils.functional import curry
 
 
@@ -227,7 +227,7 @@ def repeatlast(it):
     yield the last value infinitely."""
     for item in it:
         yield item
-    while 1: # pragma: no cover
+    while 1:            # pragma: no cover
         yield item
 
 
@@ -335,7 +335,7 @@ def get_cls_by_name(name, aliases={}):
     """
 
     if not isinstance(name, basestring):
-        return name # already a class
+        return name                                 # already a class
 
     name = aliases.get(name) or name
     module_name, _, cls_name = rpartition(name, ".")
@@ -365,7 +365,7 @@ def abbr(S, max, ellipsis="..."):
     if S is None:
         return "???"
     if len(S) > max:
-        return ellipsis and (S[:max-len(ellipsis)] + ellipsis) or S[:max]
+        return ellipsis and (S[:max - len(ellipsis)] + ellipsis) or S[:max]
     return S
 
 

+ 3 - 2
celery/utils/compat.py

@@ -287,7 +287,7 @@ except ImportError:
             return "defaultdict(%s, %s)" % (self.default_factory,
                                             dict.__repr__(self))
     import collections
-    collections.defaultdict = defaultdict # Pickle needs this.
+    collections.defaultdict = defaultdict           # Pickle needs this.
 
 ############## logging.LoggerAdapter ########################################
 import inspect
@@ -410,11 +410,12 @@ try:
     from itertools import izip_longest
 except ImportError:
     import itertools
+
     def izip_longest(*args, **kwds):
         fillvalue = kwds.get("fillvalue")
 
         def sentinel(counter=([fillvalue] * (len(args) - 1)).pop):
-            yield counter() # yields the fillvalue, or raises IndexError
+            yield counter()     # yields the fillvalue, or raises IndexError
 
         fillers = itertools.repeat(fillvalue)
         iters = [itertools.chain(it, sentinel(), fillers)

+ 1 - 1
celery/utils/dispatch/signal.py

@@ -4,7 +4,7 @@ import weakref
 try:
     set
 except NameError:
-    from sets import Set as set # Python 2.3 fallback
+    from sets import Set as set                 # Python 2.3 fallback
 
 from celery.utils.dispatch import saferef
 

+ 5 - 2
celery/utils/functional.py

@@ -67,12 +67,13 @@
 # update_wrapper() and wraps() are tools to help write
 # wrapper functions that can handle naive introspection
 
+
 def _compat_curry(fun, *args, **kwargs):
     """New function with partial application of the given arguments
     and keywords."""
 
     def _curried(*addargs, **addkwargs):
-        return fun(*(args+addargs), **dict(kwargs, **addkwargs))
+        return fun(*(args + addargs), **dict(kwargs, **addkwargs))
     return _curried
 
 
@@ -83,6 +84,8 @@ except ImportError:
 
 WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__')
 WRAPPER_UPDATES = ('__dict__',)
+
+
 def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
         updated=WRAPPER_UPDATES):
     """Update a wrapper function to look like the wrapped function
@@ -100,7 +103,7 @@ def _compat_update_wrapper(wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS,
     for attr in assigned:
         try:
             setattr(wrapper, attr, getattr(wrapped, attr))
-        except TypeError: # Python 2.3 doesn't allow assigning to __name__.
+        except TypeError:   # Python 2.3 doesn't allow assigning to __name__.
             pass
     for attr in updated:
         getattr(wrapper, attr).update(getattr(wrapped, attr))

+ 1 - 0
celery/utils/term.py

@@ -16,6 +16,7 @@ RESET_SEQ = "\033[0m"
 COLOR_SEQ = "\033[1;%dm"
 fg = lambda s: COLOR_SEQ % s
 
+
 class colored(object):
 
     def __init__(self, *s, **kwargs):

+ 2 - 1
celery/utils/timer2.py

@@ -22,6 +22,7 @@ __docformat__ = "restructuredtext"
 
 DEFAULT_MAX_INTERVAL = 2
 
+
 class TimedFunctionFailed(UserWarning):
     pass
 
@@ -119,7 +120,7 @@ class Schedule(object):
     @property
     def queue(self):
         events = list(self._queue)
-        return map(heapq.heappop, [events]*len(events))
+        return map(heapq.heappop, [events] * len(events))
 
 
 class Timer(Thread):

+ 1 - 3
celery/utils/timeutils.py

@@ -112,7 +112,7 @@ def humanize_seconds(secs, prefix=""):
     for unit, divider, formatter in TIME_UNITS:
         if secs >= divider:
             w = secs / divider
-            punit = w > 1 and unit+"s" or unit
+            punit = w > 1 and (unit + "s") or unit
             return "%s%s %s" % (prefix, formatter(w), punit)
     return "now"
 
@@ -124,5 +124,3 @@ def maybe_iso8601(dt):
     if isinstance(dt, datetime):
         return dt
     return parse_iso8601(dt)
-
-

+ 1 - 1
celery/worker/buckets.py

@@ -66,7 +66,7 @@ class TaskBucket(object):
     def _get_immediate(self):
         try:
             return self.immediate.popleft()
-        except IndexError: # Empty
+        except IndexError:                      # Empty
             raise QueueEmpty()
 
     def _get(self):

+ 1 - 1
celery/worker/control/__init__.py

@@ -40,7 +40,7 @@ class ControlDispatch(object):
             ...         ControlDispatch().dispatch_from_message(control)
 
         """
-        message = dict(message) # don't modify callers message.
+        message = dict(message)             # don't modify callers message.
         command = message.pop("command")
         destination = message.pop("destination", None)
         reply_to = message.pop("reply_to", None)

+ 1 - 1
celery/worker/control/builtins.py

@@ -27,7 +27,7 @@ def revoke(panel, task_id, task_name=None, **kwargs):
     app = panel.app
     revoked.add(task_id)
     backend = app.backend
-    if task_name: # Use custom task backend (if any)
+    if task_name:                           # use custom task backend (if any)
         try:
             backend = tasks[task_name].backend
         except KeyError:

+ 1 - 1
celery/worker/control/registry.py

@@ -4,7 +4,7 @@ from celery.app import app_or_default
 
 
 class Panel(UserDict):
-    data = dict() # Global registry.
+    data = dict()                               # Global registry.
 
     def __init__(self, logger, listener, hostname=None, app=None):
         self.app = app_or_default(app)

+ 3 - 3
celery/worker/controllers.py

@@ -48,15 +48,15 @@ class Mediator(threading.Thread):
         self.logger.debug(
             "Mediator: Running callback for task: %s[%s]" % (
                 task.task_name, task.task_id))
-        self.callback(task) # execute
+        self.callback(task)                 # execute
 
     def run(self):
         while not self._shutdown.isSet():
             self.move()
-        self._stopped.set() # indicate that we are stopped
+        self._stopped.set()                 # indicate that we are stopped
 
     def stop(self):
         """Gracefully shutdown the thread."""
         self._shutdown.set()
-        self._stopped.wait() # block until this thread is done
+        self._stopped.wait()                # block until this thread is done
         self.join(1e100)

+ 1 - 1
celery/worker/heartbeat.py

@@ -63,6 +63,6 @@ class Heart(threading.Thread):
             return
         self._state = "CLOSE"
         self._shutdown.set()
-        self._stopped.wait() # block until this thread is done
+        self._stopped.wait()            # block until this thread is done
         if self.isAlive():
             self.join(1e100)

+ 2 - 3
celery/worker/job.py

@@ -7,7 +7,7 @@ import warnings
 from datetime import datetime
 
 from celery import platform
-from celery.app import app_or_default, default_app
+from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.execute.trace import TaskTrace
 from celery.registry import tasks
@@ -73,7 +73,7 @@ class WorkerTaskTrace(TaskTrace):
     """
 
     def __init__(self, *args, **kwargs):
-        self.loader = kwargs.get("loader") or default_app.loader
+        self.loader = kwargs.get("loader") or app_or_default().loader
         self.hostname = kwargs.get("hostname") or socket.gethostname()
         super(WorkerTaskTrace, self).__init__(*args, **kwargs)
 
@@ -274,7 +274,6 @@ class TaskRequest(object):
         delivery_info = dict((key, _delivery_info.get(key))
                                 for key in WANTED_DELIVERY_INFO)
 
-
         if not hasattr(kwargs, "items"):
             raise InvalidTaskError("Task kwargs must be a dictionary.")
 

+ 3 - 3
celery/worker/listener.py

@@ -320,7 +320,7 @@ class CarrotListener(object):
     def maybe_conn_error(self, fun):
         try:
             fun()
-        except Exception: # TODO kombu.connection_errors
+        except Exception:                   # TODO kombu.connection_errors
             pass
 
     def close_connection(self):
@@ -387,7 +387,7 @@ class CarrotListener(object):
         # QoS: Reset prefetch window.
         self.qos = QoS(self.task_consumer,
                        self.initial_prefetch_count, self.logger)
-        self.qos.update() # enable prefetch_count QoS.
+        self.qos.update()                   # enable prefetch_count
 
         self.task_consumer.on_decode_error = self.on_decode_error
         self.broadcast_consumer = BroadcastConsumer(self.connection,
@@ -436,7 +436,7 @@ class CarrotListener(object):
         def _establish_connection():
             """Establish a connection to the broker."""
             conn = self.app.broker_connection()
-            conn.connect() # Connection is established lazily, so connect.
+            conn.connect()                      # evaluate connection
             return conn
 
         if not self.app.conf.BROKER_CONNECTION_RETRY:

+ 1 - 1
celery/worker/state.py

@@ -8,7 +8,7 @@ REVOKES_MAX = 10000
 
 # How many seconds a revoke will be active before
 # being expired when the max limit has been exceeded.
-REVOKE_EXPIRES = 3600 # One hour.
+REVOKE_EXPIRES = 3600                       # One hour.
 
 """
 .. data:: active_requests

+ 1 - 1
contrib/release/sphinx-to-rst.py

@@ -48,7 +48,7 @@ TO_RST_MAP = {RE_CODE_BLOCK: replace_code_block,
 
 
 def _process(lines):
-    lines = list(lines) # non-destructive
+    lines = list(lines)                                 # non-destructive
     for i, line in enumerate(lines):
         for regex, alt in TO_RST_MAP.items():
             if callable(alt):

+ 2 - 2
docs/_ext/literals_to_xrefs.py

@@ -68,8 +68,8 @@ def fixliterals(fname):
             new.append(m.group(0))
             continue
 
-        sys.stdout.write("\n"+"-"*80+"\n")
-        sys.stdout.write(data[prev_start+1:m.start()])
+        sys.stdout.write("\n" + "-" * 80 + "\n")
+        sys.stdout.write(data[prev_start + 1:m.start()])
         sys.stdout.write(colorize(m.group(0), fg="red"))
         sys.stdout.write(data[m.end():next_end])
         sys.stdout.write("\n\n")

+ 0 - 0
docs/homepage/examplerun.py → docs/homepage/examplerun.rst


+ 0 - 0
docs/homepage/exampletask.py → docs/homepage/exampletask.rst


+ 1 - 1
docs/slidesource/slide-example1-result.py

@@ -6,5 +6,5 @@ res.get()
 
 res = MyTask.apply_async(args=[8, 4],
                          countdown=5)
-res.get() # Is executed after 5 seconds.
+res.get()                               # Is executed after 5 seconds.
 #32

+ 1 - 1
examples/celery_http_gateway/manage.py

@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 from django.core.management import execute_manager
 try:
-    import settings # Assumed to be in the same directory.
+    import settings             # Assumed to be in the same directory.
 except ImportError:
     import sys
     sys.stderr.write(

+ 1 - 1
examples/httpexample/manage.py

@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 from django.core.management import execute_manager
 try:
-    import settings # Assumed to be in the same directory.
+    import settings                 # Assumed to be in the same directory.
 except ImportError:
     import sys
     sys.stderr.write(

+ 1 - 0
funtests/config.py

@@ -21,6 +21,7 @@ CELERYD_LOG_COLOR = False
 
 CELERY_IMPORTS = ("celery.tests.functional.tasks", )
 
+
 @atexit.register
 def teardown_testdb():
     import os

+ 1 - 0
funtests/test_basic.py

@@ -6,6 +6,7 @@ from celery.tests.functional.case import WorkerCase
 
 from celery.task.control import broadcast
 
+
 class test_basic(WorkerCase):
 
     def test_started(self):

+ 1 - 0
pavement.py

@@ -6,6 +6,7 @@ options(
         sphinx=Bunch(builddir=".build"),
 )
 
+
 def sphinx_builddir(options):
     return path("docs") / options.sphinx.builddir / "html"
 

+ 1 - 1
setup.py

@@ -68,7 +68,7 @@ setup(
     zip_safe=False,
     install_requires=install_requires,
     tests_require=['nose', 'nose-cover3', 'unittest2', 'simplejson'],
-    cmdclass = {"quicktest": QuickRunTests},
+    cmdclass={"quicktest": QuickRunTests},
     test_suite="nose.collector",
     classifiers=[
         "Development Status :: 5 - Production/Stable",