Ask Solem 15 years ago
parent
commit
fa338c65d8

+ 2 - 2
celery/backends/amqp.py

@@ -102,7 +102,7 @@ class Backend(BaseBackend):
         connection = self.connection
         consumer = self._consumer_for_task_id(task_id, connection)
         consumer.register_callback(callback)
-        
+
         try:
             consumer.iterconsume().next()
         finally:
@@ -114,7 +114,7 @@ class Backend(BaseBackend):
 
     def get_result(self, task_id):
         """Get the result for a task."""
-        result = self._get_task_meta_for(task_id) 
+        result = self._get_task_meta_for(task_id)
         if result["status"] == "FAILURE":
             return self.exception_to_python(result["result"])
         else:

+ 0 - 2
celery/bin/celeryd.py

@@ -255,13 +255,11 @@ def run_worker(concurrency=DAEMON_CONCURRENCY, detach=False,
         context.open()
         logger = setup_logger(loglevel, logfile)
         redirect_stdouts_to_logger(logger, loglevel)
-        
 
     # Run the worker init handler.
     # (Usually imports task modules and such.)
     current_loader.on_worker_init()
 
-
     def run_worker():
         worker = WorkController(concurrency=concurrency,
                                 loglevel=loglevel,

+ 1 - 3
celery/bin/celeryinit.py

@@ -1,10 +1,10 @@
 #!/usr/bin/env python
 
+
 def main():
     from celery.loaders.default import Loader
     loader = Loader()
     conf = loader.read_configuration()
-    #from django.core.management.commands.syncdb import Command as SyncDB
     from django.core.management import call_command, setup_environ
     print("Creating database tables...")
     setup_environ(conf)
@@ -12,5 +12,3 @@ def main():
 
 if __name__ == "__main__":
     main()
-
-

+ 3 - 2
celery/datastructures.py

@@ -113,6 +113,7 @@ class SharedCounter(object):
         >>> max_client = max_clients + 10 # NOT OK (unsafe)
 
     """
+
     def __init__(self, initial_value):
         self._value = initial_value
         self._modify_queue = Queue()
@@ -124,11 +125,11 @@ class SharedCounter(object):
     def decrement(self, n=1):
         """Decrement value."""
         self -= n
-        
+
     def _update_value(self):
         self._value += sum(consume_queue(self._modify_queue))
         return self._value
-    
+
     def __iadd__(self, y):
         """``self += y``"""
         self._modify_queue.put(y * +1)

+ 2 - 4
celery/execute.py

@@ -176,7 +176,7 @@ class ExecuteWrapper(object):
     """Wraps the task in a jail, which catches all exceptions, and
     saves the status and result of the task execution to the task
     meta backend.
-    
+
     If the call was successful, it saves the result to the task result
     backend, and sets the task status to ``"DONE"``.
 
@@ -187,7 +187,6 @@ class ExecuteWrapper(object):
     If the call results in an exception, it saves the exception as the task
     result, and sets the task status to ``"FAILURE"``.
 
-   
     :param fun: Callable object to execute.
     :param task_id: The unique id of the task.
     :param task_name: Name of the task.
@@ -197,7 +196,6 @@ class ExecuteWrapper(object):
     :returns: the function return value on success, or
         the exception instance on failure.
 
-    
     """
 
     def __init__(self, fun, task_id, task_name, args=None, kwargs=None):
@@ -223,7 +221,7 @@ class ExecuteWrapper(object):
 
         # Backend process cleanup
         default_backend.process_cleanup()
-      
+
         # Send pre-run signal.
         signals.task_prerun.send(sender=fun, task_id=task_id, task=fun,
                                  args=args, kwargs=kwargs)

+ 2 - 2
celery/loaders/djangoapp.py

@@ -11,10 +11,10 @@ class Loader(BaseLoader):
 
     def on_task_init(self, task_id, task):
         """This method is called before a task is executed.
-        
+
         Does everything necessary for Django to work in a long-living,
         multiprocessing environment.
-        
+
         """
         # See: http://groups.google.com/group/django-users/browse_thread/
         #       thread/78200863d0c07c6d/38402e76cf3233e8?hl=en&lnk=gst&

+ 3 - 3
celery/log.py

@@ -52,7 +52,7 @@ def emergency_error(logfile, message):
 
 
 def redirect_stdouts_to_logger(logger, loglevel=None):
-    """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a 
+    """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
     logging instance.
 
     :param logger: The :class:`logging.Logger` instance to redirect to.
@@ -87,9 +87,9 @@ class LoggingProxy(object):
 
     def writelines(self, sequence):
         """``writelines(sequence_of_strings) -> None``.
-        
+
         Write the strings to the file.
-    
+
         The sequence can be any iterable object producing strings.
         This is equivalent to calling :meth:`write` for each string.
 

+ 2 - 3
celery/task/base.py

@@ -299,7 +299,7 @@ class Task(object):
         Example
 
             >>> class TwitterPostStatusTask(Task):
-            ... 
+            ...
             ...     def run(self, username, password, message, **kwargs):
             ...         twitter = Twitter(username, password)
             ...         try:
@@ -335,7 +335,6 @@ class Task(object):
             message = "Retry in %d seconds." % options["countdown"]
             raise RetryTaskError(message, exc)
 
-
     def on_retry(self, exc):
         """Retry handler.
 
@@ -362,7 +361,7 @@ class Task(object):
         This is run by the worker when the task executed successfully.
 
         :param retval: The return value of the task.
-     
+
         """
         pass
 

+ 6 - 6
celery/tests/test_task.py

@@ -44,7 +44,7 @@ class RetryTask(task.Task):
     iterations = 0
 
     def run(self, arg1, arg2, kwarg=1, **kwargs):
-        self.__class__.iterations += 1 
+        self.__class__.iterations += 1
 
         retries = kwargs["task_retries"]
         if retries >= 3:
@@ -85,7 +85,7 @@ class TestTaskRetries(unittest.TestCase):
         result = RetryTask.apply([0xFF, 0xFFFF])
         self.assertEquals(result.get(), 0xFF)
         self.assertEquals(RetryTask.iterations, 4)
-    
+
     def test_retry_with_kwargs(self):
         RetryTaskCustomExc.max_retries = 3
         RetryTaskCustomExc.iterations = 0
@@ -97,7 +97,7 @@ class TestTaskRetries(unittest.TestCase):
         RetryTaskCustomExc.max_retries = 2
         RetryTaskCustomExc.iterations = 0
         result = RetryTaskCustomExc.apply([0xFF, 0xFFFF], {"kwarg": 0xF})
-        self.assertRaises(MyCustomException, 
+        self.assertRaises(MyCustomException,
                           result.get)
         self.assertEquals(RetryTaskCustomExc.iterations, 3)
 
@@ -105,17 +105,17 @@ class TestTaskRetries(unittest.TestCase):
         RetryTask.max_retries = 2
         RetryTask.iterations = 0
         result = RetryTask.apply([0xFF, 0xFFFF])
-        self.assertRaises(RetryTask.MaxRetriesExceededError, 
+        self.assertRaises(RetryTask.MaxRetriesExceededError,
                           result.get)
         self.assertEquals(RetryTask.iterations, 3)
 
         RetryTask.max_retries = 1
         RetryTask.iterations = 0
         result = RetryTask.apply([0xFF, 0xFFFF])
-        self.assertRaises(RetryTask.MaxRetriesExceededError, 
+        self.assertRaises(RetryTask.MaxRetriesExceededError,
                           result.get)
         self.assertEquals(RetryTask.iterations, 2)
-        
+
 
 class TestCeleryTasks(unittest.TestCase):
 

+ 1 - 0
celery/tests/test_worker_controllers.py

@@ -82,6 +82,7 @@ class TestPeriodicWorkController(unittest.TestCase):
         m.process_hold_queue()
 
         scratchpad = {}
+
         def on_accept():
             scratchpad["accepted"] = True
 

+ 3 - 5
celery/tests/test_worker_job.py

@@ -16,6 +16,7 @@ import simplejson
 import logging
 
 scratch = {"ACK": False}
+some_kwargs_scratchpad = {}
 
 
 def jail(task_id, task_name, fun, args, kwargs):
@@ -36,8 +37,6 @@ def mytask_no_kwargs(i):
 tasks.register(mytask_no_kwargs, name="mytask_no_kwargs")
 
 
-some_kwargs_scratchpad = {}
-
 def mytask_some_kwargs(i, logfile):
     some_kwargs_scratchpad["logfile"] = logfile
     return i ** i
@@ -61,7 +60,6 @@ class TestJail(unittest.TestCase):
         ret = jail(gen_unique_id(), gen_unique_id(), mytask, [2], {})
         self.assertEquals(ret, 4)
 
-
     def test_execute_jail_failure(self):
         ret = jail(gen_unique_id(), gen_unique_id(), mytask_raising, [4], {})
         self.assertTrue(isinstance(ret, ExceptionInfo))
@@ -191,7 +189,7 @@ class TestTaskWrapper(unittest.TestCase):
         meta = TaskMeta.objects.get(task_id=tid)
         self.assertEquals(meta.result, 256)
         self.assertEquals(meta.status, "DONE")
-    
+
     def test_execute_success_no_kwargs(self):
         tid = gen_unique_id()
         tw = TaskWrapper("cu.mytask_no_kwargs", tid, mytask_no_kwargs,
@@ -200,7 +198,7 @@ class TestTaskWrapper(unittest.TestCase):
         meta = TaskMeta.objects.get(task_id=tid)
         self.assertEquals(meta.result, 256)
         self.assertEquals(meta.status, "DONE")
-    
+
     def test_execute_success_some_kwargs(self):
         tid = gen_unique_id()
         tw = TaskWrapper("cu.mytask_some_kwargs", tid, mytask_some_kwargs,

+ 1 - 1
celery/worker/job.py

@@ -183,7 +183,7 @@ class TaskWrapper(object):
         return pool.apply_async(wrapper,
                 callbacks=[self.on_success], errbacks=[self.on_failure],
                 on_ack=self.on_ack)
-    
+
     def on_success(self, ret_value):
         """The handler used if the task was successfully processed (
         without raising an exception)."""