Ask Solem 14 years ago
parent
commit
025a75a1fe

+ 18 - 14
celery/backends/cassandra.py

@@ -6,12 +6,13 @@ try:
 except ImportError:
     pycassa = None
 
-from datetime import datetime
 import itertools
 import random
 import socket
 import time
 
+from datetime import datetime
+
 from celery.backends.base import BaseDictBackend
 from celery import conf
 from celery.exceptions import ImproperlyConfigured
@@ -21,7 +22,6 @@ from celery.serialization import pickle
 from celery import states
 
 
-
 class CassandraBackend(BaseDictBackend):
     """Highly fault tolerant Cassandra backend.
 
@@ -39,13 +39,15 @@ class CassandraBackend(BaseDictBackend):
     _retry_timeout = 300
     _retry_wait = 3
     _index_shards = 64
-    _index_keys = ["celery.results.index!%02x" % i for i in range(_index_shards)]
+    _index_keys = ["celery.results.index!%02x" % i
+                        for i in range(_index_shards)]
 
-    def __init__(self, servers=None, keyspace=None, column_family=None, cassandra_options=None, **kwargs):
+    def __init__(self, servers=None, keyspace=None, column_family=None,
+            cassandra_options=None, **kwargs):
         """Initialize Cassandra backend.
 
         Raises :class:`celery.exceptions.ImproperlyConfigured` if
-        :setting:`CASSANDRA_SERVERS` is not set.
+        the ``CASSANDRA_SERVERS`` setting is not set.
 
         """
         self.logger = setup_logger("celery.backends.cassandra")
@@ -69,7 +71,8 @@ class CassandraBackend(BaseDictBackend):
                                getattr(settings, "CASSANDRA_COLUMN_FAMILY",
                                        self.column_family)
         self.cassandra_options = dict(cassandra_options or {},
-                                   **getattr(settings, "CASSANDRA_OPTIONS", {}))
+                                   **getattr(settings,
+                                             "CASSANDRA_OPTIONS", {}))
         if not self.servers or not self.keyspace or not self.column_family:
             raise ImproperlyConfigured(
                     "Cassandra backend not configured.")
@@ -103,9 +106,9 @@ class CassandraBackend(BaseDictBackend):
                                    **self.cassandra_options)
             self._column_family = \
               pycassa.ColumnFamily(conn, self.keyspace,
-                                   self.column_family,
-                                   read_consistency_level=pycassa.ConsistencyLevel.DCQUORUM,
-                                   write_consistency_level=pycassa.ConsistencyLevel.DCQUORUM)
+                    self.column_family,
+                    read_consistency_level=pycassa.ConsistencyLevel.DCQUORUM,
+                    write_consistency_level=pycassa.ConsistencyLevel.DCQUORUM)
         return self._column_family
 
     def process_cleanup(self):
@@ -117,8 +120,10 @@ class CassandraBackend(BaseDictBackend):
         """Store return value and status of an executed task."""
         cf = self._get_column_family()
         date_done = datetime.utcnow()
-        index_key = 'celery.results.index!%02x' % random.randrange(self._index_shards)
-        index_column_name = '%8x!%s' % (time.mktime(date_done.timetuple()), task_id)
+        index_key = 'celery.results.index!%02x' % (
+                random.randrange(self._index_shards))
+        index_column_name = '%8x!%s' % (time.mktime(date_done.timetuple()),
+                                        task_id)
         meta = {"status": status,
                 "result": pickle.dumps(result),
                 "date_done": date_done.strftime('%Y-%m-%dT%H:%M:%SZ'),
@@ -157,7 +162,8 @@ class CassandraBackend(BaseDictBackend):
                                            column_parent, slice_pred,
                                            pycassa.ConsistencyLevel.DCQUORUM)
 
-        index_cols = [c.column.name for c in itertools.chain(*columns.values())]
+        index_cols = [c.column.name
+                        for c in itertools.chain(*columns.values())]
         for k in self._index_keys:
             cf.remove(k, index_cols)
 
@@ -166,5 +172,3 @@ class CassandraBackend(BaseDictBackend):
             cf.remove(k)
 
         self.logger.debug('Cleaned %i expired results' % len(task_ids))
-
-

+ 1 - 1
celery/loaders/default.py

@@ -57,7 +57,7 @@ class Loader(BaseLoader):
         settings.CELERY_TASK_ERROR_WHITELIST = tuple(
                 getattr(import_module(mod), cls)
                     for fqn in settings.CELERY_TASK_ERROR_WHITELIST
-                        for mod, cls in (fqn.rsplit('.', 1),))
+                        for mod, cls in (fqn.rsplit('.', 1), ))
 
         return settings
 

+ 0 - 1
celery/task/control.py

@@ -138,7 +138,6 @@ class inspect(object):
         return self._request("disable_events")
 
 
-
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,
         connect_timeout=conf.BROKER_CONNECTION_TIMEOUT, reply=False,

+ 1 - 2
celery/tests/functional/case.py

@@ -10,7 +10,7 @@ import unittest2 as unittest
 from itertools import count
 
 from celery.exceptions import TimeoutError
-from celery.task.control import broadcast, ping, flatten_reply, inspect
+from celery.task.control import ping, flatten_reply, inspect
 from celery.utils import get_full_cls_name
 
 HOSTNAME = socket.gethostname()
@@ -149,7 +149,6 @@ class WorkerCase(unittest.TestCase):
                 self.is_scheduled(task_id, interval) or
                 self.is_accepted(task_id, interval))
 
-
     def ensure_accepted(self, task_id, interval=0.5, timeout=10):
         return try_while(lambda: self.is_accepted(task_id, interval),
                          "Task not accepted within timeout",

+ 0 - 1
celery/worker/__init__.py

@@ -31,7 +31,6 @@ WORKER_SIGRESET = frozenset(["SIGTERM",
 WORKER_SIGIGNORE = frozenset(["SIGINT"])
 
 
-
 def process_initializer():
     """Initializes the process so it can be used to process tasks.
 

+ 2 - 2
celery/worker/control/__init__.py

@@ -59,8 +59,8 @@ class ControlDispatch(object):
         else:
             try:
                 reply = control(self.panel, **kwdict(kwargs))
-	    except SystemExit:
-		raise
+            except SystemExit:
+                raise
             except Exception, exc:
                 self.logger.error(
                         "Error running control command %s kwargs=%s: %s" % (

+ 0 - 1
celery/worker/control/builtins.py

@@ -58,7 +58,6 @@ def set_loglevel(panel, loglevel=None):
     return {"ok": loglevel}
 
 
-
 @Panel.register
 def rate_limit(panel, task_name, rate_limit, **kwargs):
     """Set new rate limit for a task type.

+ 4 - 3
celery/worker/scheduler.py

@@ -40,9 +40,10 @@ class Scheduler(object):
             try:
                 eta = time.mktime(eta.timetuple())
             except OverflowError:
-                # this machine can't represent the passed in time as a unix timestamp
-                # just ignore this for now
-                self.logger.error("Cannot represent %s as a unix timestamp. Ignoring it." % eta)
+                # this machine can't represent the passed in time as
+                # a unix timestamp just ignore this for now
+                self.logger.error("Cannot represent %s as a unix timestamp. "
+                                  "Ignoring %s." % (eta, item))
                 return
         eta = eta or time.time()
         heapq.heappush(self._queue, (eta, priority, item, callback))

+ 0 - 1
funtests/test_basic.py

@@ -47,4 +47,3 @@ class test_basic(WorkerCase):
         self.assertTrue(len(schedule), 2)
         self.assertEqual(schedule[0]["request"]["name"], tasks.add.name)
         self.assertEqual(schedule[0]["request"]["args"], [2, 2])
-