Ask Solem 16 years ago
parent
commit
ea2f3a4a6e
9 changed files with 40 additions and 35 deletions
  1. 1 1
      celery/backends/__init__.py
  2. 4 4
      celery/backends/base.py
  3. 15 13
      celery/conf.py
  4. 2 2
      celery/datastructures.py
  5. 6 4
      celery/managers.py
  6. 1 1
      celery/result.py
  7. 1 1
      celery/task.py
  8. 1 0
      celery/views.py
  9. 9 9
      celery/worker.py

+ 1 - 1
celery/backends/__init__.py

@@ -47,7 +47,7 @@ get_default_periodicstatus_backend_cls = partial(get_backend_cls,
 
 """
 .. class:: DefaultBackend
-    
+
     The default backend class used for storing task results and status,
     specified in :setting:`CELERY_BACKEND`.
 

+ 4 - 4
celery/backends/base.py

@@ -13,7 +13,7 @@ def find_nearest_pickleable_exception(exc):
     not go below :exc:`Exception` (i.e. it skips :exc:`Exception`,
     :class:`BaseException` and :class:`object`). If that happens
     you should use :exc:`UnpickleableException` instead.
-  
+
     :param exc: An exception instance.
 
     :returns: the nearest exception if it's not :exc:`Exception` or below,
@@ -39,11 +39,11 @@ def find_nearest_pickleable_exception(exc):
 
 class UnpickleableExceptionWrapper(Exception):
     """Wraps unpickleable exceptions.
-   
+
     :param exc_module: see :attr:`exc_module`.
 
     :param exc_cls_name: see :attr:`exc_cls_name`.
-    
+
     :param exc_args: see :attr:`exc_args`
 
     .. attribute:: exc_module
@@ -116,7 +116,7 @@ class BaseBackend(object):
             return excwrapper
         else:
             return exc
-    
+
     def exception_to_python(self, exc):
         if isinstance(exc, UnpickleableExceptionWrapper):
             exc_cls = self.create_exception_cls(exc.exc_cls_name,

+ 15 - 13
celery/conf.py

@@ -18,7 +18,7 @@ DEFAULT_REAP_TIMEOUT = 30
 
 """
 .. data:: LOG_LEVELS
-   
+
     Mapping of log level names to :mod:`logging` module constants.
 
 """
@@ -34,7 +34,7 @@ LOG_LEVELS = {
 
 """
 .. data:: LOG_FORMAT
-   
+
     The format to use for log messages.
     Default is ``[%(asctime)s: %(levelname)s/%(processName)s] %(message)s``
 
@@ -44,7 +44,7 @@ LOG_FORMAT = getattr(settings, "CELERYD_DAEMON_LOG_FORMAT",
 
 """
 .. data:: DAEMON_LOG_FILE
-   
+
     The path to the deamon log file (if not set, ``stderr`` is used).
 
 """
@@ -53,7 +53,7 @@ DAEMON_LOG_FILE = getattr(settings, "CELERYD_LOG_FILE",
 
 """
 .. data:: DAEMON_LOG_LEVEL
-   
+
     Celery daemon log level, can be any of ``DEBUG``, ``INFO``, ``WARNING``,
     ``ERROR``, ``CRITICAL``, or ``FATAL``. See the :mod:`logging` module
     for more information.
@@ -64,7 +64,7 @@ DAEMON_LOG_LEVEL = LOG_LEVELS[getattr(settings, "CELERYD_DAEMON_LOG_LEVEL",
 
 """
 .. data:: QUEUE_WAKEUP_AFTER
-   
+
     The time (in seconds) the celery worker should sleep when there's
     no messages left on the queue. After the time is slept, the worker
     wakes up and checks the queue again.
@@ -75,7 +75,7 @@ QUEUE_WAKEUP_AFTER = getattr(settings, "CELERYD_QUEUE_WAKEUP_AFTER",
 
 """
 .. data:: EMPTY_MSG_EMIT_EVERY
-   
+
     How often the celery daemon should write a log message saying there are no
     messages in the queue. If this is ``None`` or ``0``, it will never print
     this message.
@@ -86,7 +86,7 @@ EMPTY_MSG_EMIT_EVERY = getattr(settings, "CELERYD_EMPTY_MSG_EMIT_EVERY",
 
 """
 .. data:: DAEMON_PID_FILE
-   
+
     Full path to the daemon pidfile.
 
 """
@@ -95,7 +95,7 @@ DAEMON_PID_FILE = getattr(settings, "CELERYD_PID_FILE",
 
 """
 .. data:: DAEMON_CONCURRENCY
-   
+
     The number of concurrent worker processes, executing tasks simultaneously.
 
 """
@@ -116,11 +116,13 @@ AMQP_EXCHANGE = getattr(settings, "CELERY_AMQP_EXCHANGE",
 .. data:: AMQP_EXCHANGE_TYPE
 
 The type of exchange. If the exchange type is ``direct``, all messages
-receives all tasks. However, if the exchange type is ``topic``, you can 
+receives all tasks. However, if the exchange type is ``topic``, you can
 route e.g some tasks to one server, and others to the rest.
 See `Exchange types and the effect of bindings`_.
 
-.. _`Exchange types and the effect of bindings: http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol#Exchange_types_and_the_effect_of_bindings
+.. _`Exchange types and the effect of bindings:
+    http://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol
+    #Exchange_types_and_the_effect_of_bindings
 
 """
 AMQP_EXCHANGE_TYPE = getattr(settings, "CELERY_AMQP_EXCHANGE_TYPE",
@@ -128,7 +130,7 @@ AMQP_EXCHANGE_TYPE = getattr(settings, "CELERY_AMQP_EXCHANGE_TYPE",
 
 """
 .. data:: AMQP_PUBLISHER_ROUTING_KEY
-   
+
     The default AMQP routing key used when publishing tasks.
 
 """
@@ -138,7 +140,7 @@ AMQP_PUBLISHER_ROUTING_KEY = getattr(settings,
 
 """
 .. data:: AMQP_CONSUMER_ROUTING_KEY
-   
+
     The AMQP routing key used when consuming tasks.
 
 """
@@ -148,7 +150,7 @@ AMQP_CONSUMER_ROUTING_KEY = getattr(settings,
 
 """
 .. data:: AMQP_CONSUMER_QUEUE
-   
+
     The name of the AMQP queue.
 
 """

+ 2 - 2
celery/datastructures.py

@@ -120,7 +120,7 @@ class TaskProcessQueue(object):
             self._start()
 
         self._processed_total = self._process_counter.next()
-        
+
         on_return = lambda r: self.on_return(r, task_name, task_id)
 
         result = self._pool.apply_async(target, args, kwargs,
@@ -152,7 +152,7 @@ class TaskProcessQueue(object):
         :param task_id: Id of the task executed.
 
         """
-      
+
         self._processes[task_id] = [result, task_name]
 
         if self.full():

+ 6 - 4
celery/managers.py

@@ -4,6 +4,10 @@ from celery.registry import tasks
 from datetime import datetime, timedelta
 import random
 
+# server_drift can be negative, but timedelta supports addition on
+# negative seconds.
+SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
+
 
 class TaskManager(models.Manager):
     """Manager for :class:`celery.models.Task` models."""
@@ -47,9 +51,7 @@ class TaskManager(models.Manager):
             task.result = result
             task.save()
 
-# server_drift can be negative, but timedelta supports addition on
-#negative seconds.
-server_drift = timedelta(seconds=random.vonmisesvariate(1, 4))
+
 
 class PeriodicTaskManager(models.Manager):
     """Manager for :class:`celery.models.PeriodicTask` models."""
@@ -64,7 +66,7 @@ class PeriodicTaskManager(models.Manager):
         for task_name, task in periodic_tasks.items():
             task_meta, created = self.get_or_create(name=task_name)
             # task_run.every must be a timedelta object.
-            run_every_drifted = task.run_every + server_drift
+            run_every_drifted = task.run_every + SERVER_DRIFT
             run_at = task_meta.last_run_at + task.run_every
             if datetime.now() > run_at:
                 waiting.append(task_meta)

+ 1 - 1
celery/result.py

@@ -123,7 +123,7 @@ class AsyncResult(BaseAsyncResult):
 
     :param task_id: see :attr:`task_id`.
 
-    
+
     .. attribute:: task_id
 
         The unique identifier for this task.

+ 1 - 1
celery/task.py

@@ -276,7 +276,7 @@ class Task(object):
         :rtype: :class:`celery.result.AsyncResult`
 
         See :func:`apply_async`.
-        
+
         """
         return apply_async(cls, args, kwargs, **options)
 

+ 1 - 0
celery/views.py

@@ -8,6 +8,7 @@ import simplejson
 def apply_async(request, task_name, *args, **kwargs):
     res = delay_task(task_name, args, kwargs)
 
+
 def is_task_done(request, task_id):
     """Returns task execute status in JSON format."""
     response_data = {"task": {"id": task_id, "executed": is_done(task_id)}}

+ 9 - 9
celery/worker.py

@@ -26,7 +26,7 @@ class UnknownTask(Exception):
 
 
 def jail(task_id, func, args, kwargs):
-    """Wraps the task in a jail, which catches all exceptions, and 
+    """Wraps the task in a jail, which catches all exceptions, and
     saves the status and result of the task execution to the task
     meta backend.
 
@@ -57,7 +57,7 @@ def jail(task_id, func, args, kwargs):
 
 class TaskWrapper(object):
     """Class wrapping a task to be run.
-    
+
     :param task_name: see :attr:`task_name`.
 
     :param task_id: see :attr:`task_id`.
@@ -71,7 +71,7 @@ class TaskWrapper(object):
     .. attribute:: task_name
 
         Kind of task. Must be a name registered in the task registry.
-    
+
     .. attribute:: task_id
 
         UUID of the task.
@@ -87,7 +87,7 @@ class TaskWrapper(object):
     .. attribute:: kwargs
 
         Mapping of keyword arguments to apply to the task.
-    
+
     """
 
     def __init__(self, task_name, task_id, task_func, args, kwargs):
@@ -141,7 +141,7 @@ class TaskWrapper(object):
     def execute(self, loglevel=None, logfile=None):
         """Execute the task in a :func:`jail` and store return value
         and status in the task meta backend.
-       
+
         :keyword loglevel: The loglevel used by the task.
 
         :keyword logfile: The logfile used by the task.
@@ -172,9 +172,9 @@ class TaskWrapper(object):
 
 class WorkController(object):
     """Executes tasks waiting in the task queue.
-    
+
     :param concurrency: see :attr:`concurrency`.
-    
+
     :param logfile: see :attr:`logfile`.
 
     :param loglevel: see :attr:`loglevel`.
@@ -244,7 +244,7 @@ class WorkController(object):
     def reset_connection(self):
         """Reset the AMQP connection, and reinitialize the
         :class:`celery.messaging.TaskConsumer` instance.
-       
+
         Resets the task consumer in :attr:`task_consumer`.
 
         """
@@ -313,7 +313,7 @@ class WorkController(object):
         self.logger.debug("Trying to fetch a task.")
         task, message = self.fetch_next_task()
         self.logger.debug("Got a task: %s. Trying to execute it..." % task)
-        
+
         result = task.execute_using_pool(self.pool, self.loglevel,
                                          self.logfile)