Browse Source

Everything now conforms to pep8.py

Ask Solem 16 years ago
parent
commit
6b81d06f56

+ 2 - 2
celery/backends/base.py

@@ -13,7 +13,7 @@ class BaseBackend(object):
     def mark_as_done(self, task_id, result):
         """Mark task as successfully executed."""
         return self.store_result(task_id, result, status="DONE")
-    
+
     def mark_as_failure(self, task_id, exc):
         """Mark task as executed with failure. Stores the execption."""
         return self.store_result(task_id, exc, status="FAILURE")
@@ -32,7 +32,7 @@ class BaseBackend(object):
         if result is None:
             return True
         return result
-        
+
     def get_result(self, task_id):
         """Get the result of a task."""
         raise NotImplementedError(

+ 1 - 1
celery/backends/database.py

@@ -9,7 +9,7 @@ class Backend(BaseBackend):
     def __init__(self, *args, **kwargs):
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
-   
+
     def store_result(self, task_id, result, status):
         """Mark task as done (executed)."""
         result = self.prepare_result(result)

+ 2 - 3
celery/backends/tyrant.py

@@ -15,7 +15,6 @@ try:
 except ImportError:
     import pickle
 
-    
 
 class Backend(BaseBackend):
     """Tokyo Cabinet based task backend store.
@@ -39,7 +38,7 @@ class Backend(BaseBackend):
         :setting:`TT_HOST` or :setting:`TT_PORT` is not set.
 
         """
-        self.tyrant_host = kwargs.get("tyrant_host", 
+        self.tyrant_host = kwargs.get("tyrant_host",
                             getattr(settings, "TT_HOST", self.tyrant_host))
         self.tyrant_port = kwargs.get("tyrant_port",
                             getattr(settings, "TT_PORT", self.tyrant_port))
@@ -49,7 +48,7 @@ class Backend(BaseBackend):
                 "set the TT_HOST and TT_PORT settings in your settings.py")
         super(Backend, self).__init__(*args, **kwargs)
         self._cache = {}
-    
+
     def get_server(self):
         """Get :class:`pytyrant.PyTyrant`` instance with the current
         server configuration."""

+ 1 - 1
celery/bin/celeryd.py

@@ -4,7 +4,7 @@
 .. program:: celeryd
 
 .. cmdoption:: -c, --concurrency
-            
+
     Number of child processes processing the queue.
 
 .. cmdoption:: -f, --logfile

+ 2 - 2
celery/conf.py

@@ -16,11 +16,11 @@ DEFAULT_DAEMON_CONCURRENCY = 10
 
 # If the queue is empty, this is the time *in seconds* the daemon sleeps
 # until it wakes up to check if there's any new messages on the queue.
-DEFAULT_QUEUE_WAKEUP_AFTER = 0.3 
+DEFAULT_QUEUE_WAKEUP_AFTER = 0.3
 
 # As long as the queue is empty, the daemon logs a "Queue is empty" message
 # every ``EMPTY_MSG_EMIT_EVERY`` *seconds*.
-DEFAULT_EMPTY_MSG_EMIT_EVERY = 5 
+DEFAULT_EMPTY_MSG_EMIT_EVERY = 5
 
 DEFAULT_DAEMON_PID_FILE = "celeryd.pid"
 

+ 6 - 5
celery/datastructures.py

@@ -11,13 +11,14 @@ class PositionQueue(UserList):
     """A positional queue of a specific length, with slots that are either
     filled or unfilled. When all of the positions are filled, the queue
     is considered :meth:`full`.
-   
+
     :param length: The number of items required for the queue to be filled.
 
     """
 
     class UnfilledPosition(object):
         """Describes an unfilled slot."""
+
         def __init__(self, position):
             self.position = position
 
@@ -39,7 +40,7 @@ class PositionQueue(UserList):
         return filter(lambda v: not isinstance(v, self.UnfilledPosition),
                       self.data)
 
-        
+
 class TaskProcessQueue(UserList):
     """Queue of running child processes, which starts waiting for the
     processes to finish when the queue limit is reached.
@@ -72,10 +73,10 @@ class TaskProcessQueue(UserList):
         self.logger = logger
         self.done_msg = done_msg
         self.data = []
-        
+
     def add(self, result, task_name, task_id):
         """Add a process to the queue.
-        
+
         If the queue is full, it will start to collect return values from
         the tasks executed. When all return values has been collected,
         it deletes the current queue and is ready to accept new processes.
@@ -86,7 +87,7 @@ class TaskProcessQueue(UserList):
         :param task_name: Name of the task executed.
 
         :param task_id: Id of the task executed.
-        
+
         """
         self.data.append([result, task_name, task_id])
 

+ 1 - 1
celery/discovery.py

@@ -22,5 +22,5 @@ def find_related_module(app, related_name):
         related_module = getattr(module, related_name)
     except AttributeError:
         return None
-   
+
     return related_module

+ 1 - 1
celery/log.py

@@ -11,7 +11,7 @@ def setup_logger(loglevel=DAEMON_LOG_LEVEL, logfile=None, format=LOG_FORMAT,
         **kwargs):
     """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
     ``stderr`` is used.
-    
+
     Returns logger object.
     """
     logger = multiprocessing.get_logger()

+ 5 - 5
celery/managers.py

@@ -6,7 +6,7 @@ from datetime import datetime, timedelta
 
 class TaskManager(models.Manager):
     """Manager for :class:`celery.models.Task` models."""
-    
+
     def get_task(self, task_id):
         """Get task meta for task by ``task_id``."""
         task, created = self.get_or_create(task_id=task_id)
@@ -30,9 +30,9 @@ class TaskManager(models.Manager):
 
         :param task_id: task id
 
-        :param result: The return value of the task, or an exception 
+        :param result: The return value of the task, or an exception
             instance raised by the task.
-       
+
         :param status: Task status. See
             :meth:`celery.result.AsyncResult.get_status` for a list of
             possible status values.
@@ -52,8 +52,8 @@ class PeriodicTaskManager(models.Manager):
 
     def get_waiting_tasks(self):
         """Get all waiting periodic tasks.
-        
-        :returns: list of :class:`celery.models.PeriodicTaskMeta` objects. 
+
+        :returns: list of :class:`celery.models.PeriodicTaskMeta` objects.
         """
         periodic_tasks = tasks.get_all_periodic()
         waiting = []

+ 2 - 3
celery/messaging.py

@@ -11,7 +11,7 @@ import uuid
 class NoProcessConsumer(Consumer):
     """A consumer that raises an error if used with wait callbacks (i.e.
     it doesn't support :meth:`carrot.messaging.Consumer.wait``)."""
-    
+
     def receive(self, message_data, message):
         raise NotImplementedError(
                 "This consumer doesn't support process_next() or wait()")
@@ -27,13 +27,12 @@ class TaskPublisher(Publisher):
         return self._delay_task(task_name=task_name, args=task_args,
                                 kwargs=task_kwargs)
 
-
     def delay_task_in_set(self, task_name, taskset_id, task_args,
             task_kwargs):
         """Delay a task which part of a task set."""
         return self._delay_task(task_name=task_name, part_of_set=taskset_id,
                                 args=task_args, kwargs=task_kwargs)
-    
+
     def requeue_task(self, task_name, task_id, task_args, task_kwargs,
             part_of_set=None):
         """Requeue a failed task."""

+ 2 - 1
celery/models.py

@@ -10,6 +10,7 @@ class RetryQueue(object):
     queue = Queue()
 
     class Item(object):
+
         def __init__(self, task_name, task_id, args, kwargs):
             self.task_name = task_name
             self.task_id = task_id
@@ -44,7 +45,7 @@ TASK_STATUS_DONE = "DONE"
 TASK_STATUSES = (TASK_STATUS_PENDING, TASK_STATUS_RETRY,
                  TASK_STATUS_FAILURE, TASK_STATUS_DONE)
 TASK_STATUSES_CHOICES = zip(TASK_STATUSES, TASK_STATUSES)
-                
+
 
 class TaskMeta(models.Model):
     task_id = models.CharField(_(u"task id"), max_length=255, unique=True)

+ 12 - 11
celery/platform.py

@@ -17,13 +17,14 @@ DAEMON_MAXFD = 1024
 
 # The standard I/O file descriptors are redirected to /dev/null by default.
 if (hasattr(os, "devnull")):
-   REDIRECT_TO = os.devnull
+    REDIRECT_TO = os.devnull
 else:
-   REDIRECT_TO = "/dev/null"
+    REDIRECT_TO = "/dev/null"
 
 
 class PIDFile(object):
     """Manages a pid file."""
+
     def __init__(self, pidfile):
         self.pidfile = pidfile
 
@@ -36,7 +37,7 @@ class PIDFile(object):
 
     def check(self):
         """Check the status of the pidfile.
-        
+
         If the pidfile exists, and the process is not running, it will
         remove the stale pidfile and continue as normal. If the process
         *is* running, it will exit the program with an error message.
@@ -48,8 +49,8 @@ class PIDFile(object):
                 os.kill(pid, 0)
             except os.error, e:
                 if e.errno == errno.ESRCH:
-                   sys.stderr.write("Stale pidfile exists. removing it.\n")
-                   self.remove()
+                    sys.stderr.write("Stale pidfile exists. removing it.\n")
+                    self.remove()
             else:
                 raise SystemExit("celeryd is already running.")
 
@@ -59,10 +60,10 @@ class PIDFile(object):
 
     def write(self, pid=None):
         """Write a pidfile.
-        
+
         If ``pid`` is not specified the pid of the current process
         will be used.
-        
+
         """
         if not pid:
             pid = os.getpid()
@@ -83,7 +84,7 @@ def daemonize(pidfile):
     try:
         pid = os.fork()
     except OSError, e:
-        raise Exception, "%s [%d]" % (e.strerror, e.errno)
+        raise Exception("%s [%d]" % (e.strerror, e.errno))
 
     if pid == 0: # child
         os.setsid()
@@ -91,7 +92,7 @@ def daemonize(pidfile):
         try:
             pid = os.fork() # second child
         except OSError, e:
-            raise Exception, "%s [%d]" % (e.strerror, e.errno)
+            raise Exception("%s [%d]" % (e.strerror, e.errno))
 
         if pid == 0: # second child
             #os.chdir(DAEMON_WORKDIR)
@@ -105,7 +106,7 @@ def daemonize(pidfile):
     maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
     if (maxfd == resource.RLIM_INFINITY):
         maxfd = DAEMON_MAXFD
-  
+
     # Iterate through and close all file descriptors.
     for fd in range(0, maxfd):
         try:
@@ -115,7 +116,7 @@ def daemonize(pidfile):
 
     os.open(REDIRECT_TO, os.O_RDWR)
     # Duplicate standard input to standard output and standard error.
-    os.dup2(0, 1) 
+    os.dup2(0, 1)
     os.dup2(0, 2)
 
     return 0

+ 4 - 4
celery/registry.py

@@ -25,14 +25,14 @@ class TaskRegistry(UserDict):
 
     def register(self, task, task_name=None):
         """Register a task in the task registry.
-        
+
         Task can either be a regular function, or a class inheriting
         from :class:`celery.task.Task`.
 
         :keyword task_name: Required if the task is a regular function.
 
         :raises AlreadyRegistered: if the task is already registered.
-        
+
         """
         is_class = False
         if hasattr(task, "run"):
@@ -41,7 +41,7 @@ class TaskRegistry(UserDict):
         if task_name in self.data:
             raise self.AlreadyRegistered(
                     "Task with name %s is already registered." % task_name)
-       
+
         if is_class:
             self.data[task_name] = task() # instantiate Task class
         else:
@@ -50,7 +50,7 @@ class TaskRegistry(UserDict):
 
     def unregister(self, task_name):
         """Unregister task by name.
-       
+
         :param task_name: name of the task to unregister.
 
         :raises NotRegistered: if the task has not been registered.

+ 16 - 15
celery/result.py

@@ -8,15 +8,15 @@ from celery.backends import default_backend
 
 class BaseAsyncResult(object):
     """Base class for pending result, takes ``backend`` argument.
-    
+
     .. attribute:: task_id
 
         The unique identifier for this task.
 
     .. attribute:: backend
-       
+
         The task result backend used.
-    
+
     """
 
     def __init__(self, task_id, backend):
@@ -32,9 +32,9 @@ class BaseAsyncResult(object):
 
     def is_done(self):
         """Returns ``True`` if the task executed successfully.
-        
+
         :rtype: bool
-        
+
         """
         return self.backend.is_done(self.task_id)
 
@@ -44,16 +44,16 @@ class BaseAsyncResult(object):
 
     def wait(self, timeout=None):
         """Wait for task, and return the result when it arrives.
-       
+
         :keyword timeout: How long to wait in seconds, before the
             operation times out.
-        
+
         :raises celery.timer.TimeoutError: if ``timeout`` is not ``None`` and
             the result does not arrive within ``timeout`` seconds.
-        
+
         If the remote call raised an exception then that
         exception will be re-raised.
-        
+
         """
         return self.backend.wait_for(self.task_id, timeout=timeout)
 
@@ -61,8 +61,8 @@ class BaseAsyncResult(object):
         """Returns ``True`` if the task executed successfully, or raised
         an exception. If the task is still pending, or is waiting for retry
         then ``False`` is returned.
-        
-        :rtype: bool 
+
+        :rtype: bool
 
         """
         status = self.backend.get_status(self.task_id)
@@ -82,7 +82,7 @@ class BaseAsyncResult(object):
     @property
     def result(self):
         """When the task is executed, this contains the return value.
-       
+
         If the task resulted in failure, this will be the exception instance
         raised.
         """
@@ -93,7 +93,7 @@ class BaseAsyncResult(object):
     @property
     def status(self):
         """The current status of the task.
-       
+
         Can be one of the following:
 
             *PENDING*
@@ -123,13 +123,14 @@ class AsyncResult(BaseAsyncResult):
     """Pending task result using the default backend.
 
     .. attribute:: task_id
-    
+
         The unique identifier for this task.
 
     .. attribute:: backend
-    
+
         Instance of :class:`celery.backends.DefaultBackend`.
 
     """
+
     def __init__(self, task_id):
         super(AsyncResult, self).__init__(task_id, backend=default_backend)

+ 27 - 29
celery/task.py

@@ -24,13 +24,13 @@ def delay_task(task_name, *args, **kwargs):
     :param task_name: the name of a task registered in the task registry.
 
     :param \*args: positional arguments to pass on to the task.
-    
+
     :param \*\*kwargs: keyword arguments to pass on to the task.
-    
-    :raises celery.registry.NotRegistered: exception if no such task 
+
+    :raises celery.registry.NotRegistered: exception if no such task
         has been registered in the task registry.
 
-    :rtype: :class:`celery.result.AsyncResult`. 
+    :rtype: :class:`celery.result.AsyncResult`.
 
     Example
 
@@ -70,11 +70,9 @@ def discard_all():
     return discarded_count
 
 
-
-
 def is_done(task_id):
     """Returns ``True`` if task with ``task_id`` has been executed.
-   
+
     :rtype: bool
 
     """
@@ -88,26 +86,26 @@ class Task(object):
     which is the actual method the ``celery`` daemon executes.
 
     The :meth:`run` method supports both positional, and keyword arguments.
-    
+
     .. attribute:: name
 
         *REQUIRED* All subclasses of :class:`Task` has to define the
         :attr:`name` attribute. This is the name of the task, registered
         in the task registry, and passed to :func:`delay_task`.
-        
+
     .. attribute:: type
 
         The type of task, currently this can be ``regular``, or ``periodic``,
         however if you want a periodic task, you should subclass
         :class:`PeriodicTask` instead.
-        
+
     :raises NotImplementedError: if the :attr:`name` attribute is not set.
 
     The resulting class is callable, which if called will apply the
     :meth:`run` method.
 
     Examples
-    
+
     This is a simple task just logging a message,
 
         >>> from celery.task import tasks, Task
@@ -152,9 +150,9 @@ class Task(object):
 
     def run(self, *args, **kwargs):
         """*REQUIRED* The actual task.
-        
+
         All subclasses of :class:`Task` must define the run method.
-        
+
         :raises NotImplementedError: by default, so you have to override
             this method in your subclass.
 
@@ -165,13 +163,13 @@ class Task(object):
         """Get process-aware logger object.
 
         See :func:`celery.log.setup_logger`.
-        
+
         """
         return setup_logger(**kwargs)
 
     def get_publisher(self):
         """Get a celery task message publisher.
-        
+
         :rtype: :class:`celery.messaging.TaskPublisher`.
 
         Please be sure to close the AMQP connection when you're done
@@ -186,7 +184,7 @@ class Task(object):
 
     def get_consumer(self):
         """Get a celery task message consumer.
-       
+
         :rtype: :class:`celery.messaging.TaskConsumer`.
 
         Please be sure to close the AMQP connection when you're done
@@ -214,7 +212,7 @@ class Task(object):
         :param \*args: positional arguments passed on to the task.
 
         :param \*\*kwargs: keyword arguments passed on to the task.
-        
+
         :rtype: :class:`celery.result.AsyncResult`
 
         See :func:`delay_task`.
@@ -245,7 +243,7 @@ class TaskSet(object):
     .. attribute:: total
 
         Total number of tasks in this task set.
-   
+
     Example
 
         >>> from djangofeeds.tasks import RefreshFeedTask
@@ -256,7 +254,7 @@ class TaskSet(object):
 
         >>> taskset_id, subtask_ids = taskset.run()
         >>> list_of_return_values = taskset.join()
-        
+
 
     """
 
@@ -303,13 +301,13 @@ class TaskSet(object):
                                                      taskset_id=taskset_id,
                                                      task_args=arg,
                                                      task_kwargs=kwarg)
-            subtask_ids.append(subtask_id) 
+            subtask_ids.append(subtask_id)
         amqp_connection.close()
         return taskset_id, subtask_ids
 
     def iterate(self):
         """Iterate over the results returned after calling :meth:`run`.
-        
+
         If any of the tasks raises an exception, the exception will
         be re-raised.
 
@@ -379,9 +377,9 @@ class TaskSet(object):
     def map_async(cls, func, args, timeout=None):
         """Distribute processing of the arguments and collect the results
         asynchronously.
-        
+
         :returns: :class:`celery.result.AsyncResult` instance.
-        
+
         """
         serfunc = pickle.dumps(func)
         return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
@@ -415,7 +413,7 @@ tasks.register(AsynchronousMapTask)
 def dmap_async(func, args, timeout=None):
     """Distribute processing of the arguments and collect the results
     asynchronously.
-    
+
     :returns: :class:`celery.result.AsyncResult` object.
 
     Example
@@ -438,7 +436,7 @@ class PeriodicTask(Task):
     """A periodic task is a task that behaves like a :manpage:`cron` job.
 
     .. attribute:: run_every
-    
+
         *REQUIRED* Defines how often the task is run (its interval),
         it can be either a :class:`datetime.timedelta` object or an
         integer specifying the time in seconds.
@@ -485,13 +483,13 @@ class ExecuteRemoteTask(Task):
 
     The object must be pickleable, so you can't use lambdas or functions
     defined in the REPL (that is the python shell, or ``ipython``).
-    
+
     """
     name = "celery.execute_remote"
 
     def run(self, ser_callable, fargs, fkwargs, **kwargs):
         """
-        :param ser_callable: A pickled function or callable object. 
+        :param ser_callable: A pickled function or callable object.
 
         :param fargs: Positional arguments to apply to the function.
 
@@ -505,7 +503,7 @@ tasks.register(ExecuteRemoteTask)
 
 def execute_remote(func, *args, **kwargs):
     """Execute arbitrary function/object remotely.
-        
+
     :param func: A callable function or object.
 
     :param \*args: Positional arguments to apply to the function.
@@ -516,7 +514,7 @@ def execute_remote(func, *args, **kwargs):
     defined in the REPL (the objects must have an associated module).
 
     :returns: class:`celery.result.AsyncResult`.
-    
+
     """
     return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
 

+ 1 - 0
celery/tests/__init__.py

@@ -1,4 +1,5 @@
 from djangox.test.depth import alltests
 
+
 def suite():
     return alltests(__file__, __name__)

+ 1 - 2
celery/tests/test_backends/test_database.py

@@ -18,7 +18,7 @@ class TestDatabaseBackend(unittest.TestCase):
         self.assertFalse(b.is_done(tid))
         self.assertEquals(b.get_status(tid), "PENDING")
         self.assertEquals(b.get_result(tid), '')
-        
+
         b.mark_as_done(tid, 42)
         self.assertTrue(b.is_done(tid))
         self.assertEquals(b.get_status(tid), "DONE")
@@ -43,4 +43,3 @@ class TestDatabaseBackend(unittest.TestCase):
         self.assertFalse(b.is_done(tid3))
         self.assertEquals(b.get_status(tid3), "FAILURE")
         self.assertTrue(isinstance(b.get_result(tid3), KeyError))
-        

+ 1 - 1
celery/tests/test_conf.py

@@ -38,7 +38,7 @@ class TestConf(unittest.TestCase):
                              getattr(conf, result_var),
                              "Default setting %s is written to %s" % (
                                  default_var, result_var))
-            
+
     def test_configuration_cls(self):
         for setting_name, result_var, default_var in SETTING_VARS:
             self.assertDefaultSetting(setting_name, result_var, default_var)

+ 0 - 1
celery/tests/test_discovery.py

@@ -16,4 +16,3 @@ class TestDiscovery(unittest.TestCase):
     def test_discovery(self):
         if "someapp" in settings.INSTALLED_APPS:
             self.assertDiscovery()
-

+ 0 - 6
celery/tests/test_log.py

@@ -48,9 +48,3 @@ class TestLog(unittest.TestCase):
         emergency_error(sio, "Testing emergency error facility")
         self.assertEquals(sio.getvalue().rpartition(":")[2].strip(),
                              "Testing emergency error facility")
-
-
-
-
-
-

+ 2 - 3
celery/tests/test_models.py

@@ -41,7 +41,7 @@ class TestModels(unittest.TestCase):
         # Have to avoid save() because it applies the auto_now=True.
         TaskMeta.objects.filter(task_id=m1.task_id).update(
                 date_done=datetime.now() - timedelta(days=10))
-        
+
         expired = TaskMeta.objects.get_all_expired()
         self.assertTrue(m1 in expired)
         self.assertFalse(m2 in expired)
@@ -49,7 +49,7 @@ class TestModels(unittest.TestCase):
 
         TaskMeta.objects.delete_expired()
         self.assertFalse(m1 in TaskMeta.objects.all())
-       
+
     def test_periodic_taskmeta(self):
         tasks.register(TestPeriodicTask)
         p = self.createPeriodicTaskMeta(TestPeriodicTask.name)
@@ -63,4 +63,3 @@ class TestModels(unittest.TestCase):
         self.assertTrue(isinstance(p.task, TestPeriodicTask))
 
         p.delay()
-

+ 2 - 1
celery/tests/test_pickle.py

@@ -1,11 +1,13 @@
 import unittest
 import pickle
 
+
 class RegularException(Exception):
     pass
 
 
 class ArgOverrideException(Exception):
+
     def __init__(self, message, status_code=10):
         self.status_code = status_code
         super(ArgOverrideException, self).__init__(message, status_code)
@@ -30,7 +32,6 @@ class TestPickle(unittest.TestCase):
         self.assertTrue(isinstance(exception, RegularException))
         self.assertEquals(exception.args, ("RegularException raised", ))
 
-
     def test_pickle_arg_override_exception(self):
 
         e = None

+ 1 - 1
celery/tests/test_registry.py

@@ -31,7 +31,7 @@ class TestTaskRegistry(unittest.TestCase):
         r.register(task)
         self.assertTrue(task.name in r)
         self.assertRaises(r.AlreadyRegistered, r.register, task)
-    
+
     def assertRegisterUnregisterFunc(self, r, task, task_name):
         self.assertRaises(r.NotRegistered, r.unregister, task_name)
         r.register(task, task_name)

+ 3 - 2
celery/tests/test_task.py

@@ -10,8 +10,8 @@ from celery import messaging
 from celery.backends import default_backend
 
 
-# Task run functions can't be closures/lambdas, as they're pickled.
 def return_True(self, **kwargs):
+    # Task run functions can't be closures/lambdas, as they're pickled.
     return True
 
 
@@ -49,6 +49,7 @@ class TestCeleryTasks(unittest.TestCase):
             self.assertEquals(task_kwargs.get(arg_name), arg_value)
 
     def test_incomplete_task_cls(self):
+
         class IncompleteTask(task.Task):
             name = "c.unittest.t.itask"
 
@@ -62,7 +63,7 @@ class TestCeleryTasks(unittest.TestCase):
                 "Task class is callable()")
         self.assertTrue(T1()(),
                 "Task class runs run() when called")
-        
+
         # task without name raises NotImplementedError
         T2 = self.createTaskCls("T2")
         self.assertRaises(NotImplementedError, T2)

+ 6 - 6
celery/timer.py

@@ -14,9 +14,9 @@ class EventTimer(object):
     """Do something at an interval.
 
     .. attribute:: interval
-    
+
         How often we call the event (in seconds).
-  
+
     .. attribute:: event
 
         The event callable to run every ``interval`` seconds.
@@ -24,7 +24,7 @@ class EventTimer(object):
     .. attribute:: last_triggered
 
         The last time, in unix timestamp format, the event was executed.
-    
+
     """
 
     def __init__(self, event, interval=None):
@@ -34,7 +34,7 @@ class EventTimer(object):
 
     def tick(self):
         """Run a event timer clock tick.
-       
+
         When the interval has run, the event will be triggered.
         If interval is not set, the event will never be triggered.
 
@@ -50,7 +50,7 @@ class EventTimer(object):
 class TimeoutTimer(object):
     """A timer that raises :exc:`TimeoutError` exception when the
     time has run out.
-   
+
     .. attribute:: timeout
 
         The timeout in seconds.
@@ -58,7 +58,7 @@ class TimeoutTimer(object):
     .. attribute:: time_start
 
         The time when the timeout timer instance was constructed.
-    
+
     """
 
     def __init__(self, timeout):

+ 0 - 2
celery/views.py

@@ -20,5 +20,3 @@ def task_status(request, task_id):
                         "result": async_result.get_result(),
     }}
     return HttpResponse(simplejson.dumps(response_data))
-
-

+ 14 - 14
celery/worker.py

@@ -28,7 +28,7 @@ class UnknownTask(Exception):
 def jail(task_id, callable_, args, kwargs):
     """Wraps the task in a jail which saves the status and result
     of the task execution to the task meta backend.
-   
+
     If the call results in an exception, it saves the exception as the task
     result, and sets the task status to ``FAILURE``.
 
@@ -62,9 +62,9 @@ class TaskWrapper(object):
 
         If the message is not a proper task it raises
         :class:`UnknownTask` exception.
-        
+
         :rtype: :class:`TaskWrapper` instance.
-        
+
         """
         message_data = simplejson.loads(message.body)
         task_name = message_data["task"]
@@ -99,7 +99,7 @@ class TaskWrapper(object):
 
     def execute_using_pool(self, pool, loglevel, logfile):
         """Like ``execute``, but using the ``multiprocessing`` pool.
-       
+
         :rtype: :class:`multiprocessing.AsyncResult` instance.
         """
         task_func_kwargs = self.extend_kwargs_with_logging(loglevel, logfile)
@@ -141,7 +141,7 @@ class TaskDaemon(object):
         The :class:`logging.Logger` instance used for logging.
 
     .. attribute:: pool
-        
+
         The :class:`multiprocessing.Pool` instance used.
 
     .. attribute:: task_consumer
@@ -154,7 +154,7 @@ class TaskDaemon(object):
     logfile = DAEMON_LOG_FILE
     queue_wakeup_after = QUEUE_WAKEUP_AFTER
     empty_msg_emit_every = EMPTY_MSG_EMIT_EVERY
-    
+
     def __init__(self, concurrency=None, logfile=None, loglevel=None,
             queue_wakeup_after=None):
         self.loglevel = loglevel or self.loglevel
@@ -184,12 +184,12 @@ class TaskDaemon(object):
 
     def receive_message(self):
         """Receive the next message from the Task consumer queue.
-       
+
         Tries to reset the AMQP connection if not available.
         Returns ``None`` if no message is waiting on the queue.
 
         :rtype: :class:`carrot.messaging.Message` instance.
-        
+
         """
         self.connection_diagnostics()
         message = self.task_consumer.fetch()
@@ -209,7 +209,7 @@ class TaskDaemon(object):
 
     def fetch_next_task(self):
         """Fetch the next task from the AMQP broker.
-       
+
         Raises :class:`EmptyQueue` exception if there is no messages
         waiting on the queue.
 
@@ -228,8 +228,8 @@ class TaskDaemon(object):
     def execute_next_task(self):
         """Execute the next task on the queue using the multiprocessing
         pool.
-        
-        Catches all exceptions and logs them with level ``logging.CRITICAL``. 
+
+        Catches all exceptions and logs them with level ``logging.CRITICAL``.
 
         """
         task, message = self.fetch_next_task()
@@ -240,13 +240,13 @@ class TaskDaemon(object):
         except Exception, error:
             self.logger.critical("Worker got exception %s: %s\n%s" % (
                 error.__class__, error, traceback.format_exc()))
-            return 
+            return
 
         return result, task.task_name, task.task_id
 
     def run_periodic_tasks(self):
         """Schedule all waiting periodic tasks for execution.
-       
+
         :rtype: list of :class:`celery.models.PeriodicTaskMeta` objects.
         """
         waiting_tasks = PeriodicTaskMeta.objects.get_waiting_tasks()
@@ -288,5 +288,5 @@ class TaskDaemon(object):
                 self.logger.critical("Message queue raised %s: %s\n%s" % (
                              e.__class__, e, traceback.format_exc()))
                 continue
-           
+
             results.add(result, task_name, task_id)