Browse Source

Don't use map without return value (2to3 suggested)

Ask Solem 14 years ago
parent
commit
4776440405

+ 1 - 4
celery/app/amqp.py

@@ -68,14 +68,11 @@ class Queues(UserDict):
     @classmethod
     def with_defaults(cls, queues, default_exchange, default_exchange_type):
 
-        def _defaults(opts):
+        for opts in queues.values():
             opts.setdefault("exchange", default_exchange),
             opts.setdefault("exchange_type", default_exchange_type)
             opts.setdefault("binding_key", default_exchange)
             opts.setdefault("routing_key", opts.get("binding_key"))
-            return opts
-
-        map(_defaults, queues.values())
         return cls(queues)
 
 

+ 2 - 1
celery/apps/worker.py

@@ -128,7 +128,8 @@ class Worker(object):
     def init_loader(self):
         self.loader = self.app.loader
         self.settings = self.app.conf
-        map(self.loader.import_module, self.include)
+        for module in self.include:
+            self.loader.import_module(module)
 
     def redirect_stdouts_to_logger(self):
         handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,

+ 2 - 2
celery/datastructures.py

@@ -98,8 +98,8 @@ class PositionQueue(UserList):
     @property
     def filled(self):
         """Returns the filled slots as a list."""
-        return filter(lambda v: not isinstance(v, self.UnfilledPosition),
-                      self.data)
+        return [slot for slot in self.data
+                    if not isinstance(slot, self.UnfilledPosition)]
 
 
 class ExceptionInfo(object):

+ 4 - 2
celery/log.py

@@ -251,7 +251,8 @@ class LoggingProxy(object):
         This is equivalent to calling :meth:`write` for each string.
 
         """
-        map(self.write, sequence)
+        for part in sequence:
+            self.write(part)
 
     def flush(self):
         """This object is not buffered so any :meth:`flush` requests
@@ -281,7 +282,8 @@ class SilenceRepeated(object):
 
     def __call__(self, *msgs):
         if self._iterations >= self.max_iterations:
-            map(self.action, msgs)
+            for msg in msgs:
+                self.action(msg)
             self._iterations = 0
         else:
             self._iterations += 1

+ 2 - 2
celery/task/base.py

@@ -541,12 +541,12 @@ class BaseTask(object):
         if kwargs is None:
             kwargs = request.kwargs
 
-        delivery_info = request.delivery_info
+        delivery_info = request.delivery_info or {}
         options.setdefault("exchange", delivery_info.get("exchange"))
         options.setdefault("routing_key", delivery_info.get("routing_key"))
 
         options["retries"] = request.retries + 1
-        options["task_id"] = kwargs.pop("task_id", None)
+        options["task_id"] = request.id
         options["countdown"] = options.get("countdown",
                                         self.default_retry_delay)
         max_exc = exc or self.MaxRetriesExceededError(

+ 2 - 1
celery/tests/test_buckets.py

@@ -232,7 +232,8 @@ class test_TaskBucket(unittest.TestCase):
         ajobs = [cjob(i, TaskA) for i in xrange(10)]
         bjobs = [cjob(i, TaskB) for i in xrange(20)]
         jobs = list(chain(*izip(bjobs, ajobs)))
-        map(b.put, jobs)
+        for job in jobs:
+            b.put(job)
 
         got_ajobs = 0
         for job in (b.get() for i in xrange(20)):

+ 4 - 2
celery/tests/test_datastructures.py

@@ -118,7 +118,8 @@ class test_LimitedSet(unittest.TestCase):
     def test_iter(self):
         s = LimitedSet(maxlen=2)
         items = "foo", "bar"
-        map(s.add, items)
+        for item in items:
+            s.add(item)
         l = list(iter(s))
         for item in items:
             self.assertIn(item, l)
@@ -126,7 +127,8 @@ class test_LimitedSet(unittest.TestCase):
     def test_repr(self):
         s = LimitedSet(maxlen=2)
         items = "foo", "bar"
-        map(s.add, items)
+        for item in items:
+            s.add(item)
         self.assertIn("LimitedSet(", repr(s))
 
 

+ 8 - 8
celery/tests/test_task.py

@@ -147,9 +147,9 @@ class TestTaskRetries(unittest.TestCase):
         self.assertEqual(result.get(), 42)
         self.assertEqual(RetryTaskNoArgs.iterations, 4)
 
-    def test_retry_kwargs_can_not_be_empty(self):
-        self.assertRaises(TypeError, RetryTaskMockApply.retry,
-                            args=[4, 4], kwargs={})
+    def test_retry_kwargs_can_be_empty(self):
+        self.assertRaises(RetryTaskError, RetryTaskMockApply.retry,
+                            args=[4, 4], kwargs=None)
 
     def test_retry_not_eager(self):
         exc = Exception("baz")
@@ -322,20 +322,20 @@ class TestCeleryTasks(unittest.TestCase):
                 name="Elaine M. Benes")
 
         # With eta.
-        presult2 = task.apply_async(t1, kwargs=dict(name="George Costanza"),
-                                    eta=datetime.now() + timedelta(days=1))
+        presult2 = t1.apply_async(kwargs=dict(name="George Costanza"),
+                                  eta=datetime.now() + timedelta(days=1))
         self.assertNextTaskDataEqual(consumer, presult2, t1.name,
                 name="George Costanza", test_eta=True)
 
         # With countdown.
-        presult2 = task.apply_async(t1, kwargs=dict(name="George Costanza"),
-                                    countdown=10)
+        presult2 = t1.apply_async(kwargs=dict(name="George Costanza"),
+                                  countdown=10)
         self.assertNextTaskDataEqual(consumer, presult2, t1.name,
                 name="George Costanza", test_eta=True)
 
         # Discarding all tasks.
         consumer.discard_all()
-        task.apply_async(t1)
+        t1.apply_async()
         self.assertEqual(consumer.discard_all(), 1)
         self.assertIsNone(consumer.fetch())
 

+ 10 - 5
celery/tests/test_worker_state.py

@@ -62,7 +62,8 @@ class test_Persistent(StateResetCase):
         self.assertTrue(self.p.db.closed)
 
     def add_revoked(self, *ids):
-        map(self.p.db.setdefault("revoked", LimitedSet()).add, ids)
+        for id in ids:
+            self.p.db.setdefault("revoked", LimitedSet()).add(id)
 
     def test_merge(self, data=["foo", "bar", "baz"]):
         self.add_revoked(*data)
@@ -73,7 +74,8 @@ class test_Persistent(StateResetCase):
     def test_sync(self, data1=["foo", "bar", "baz"],
                         data2=["baz", "ini", "koz"]):
         self.add_revoked(*data1)
-        map(state.revoked.add, data2)
+        for item in data2:
+            state.revoked.add(item)
         self.p.sync(self.p.db)
 
         for item in data2:
@@ -92,7 +94,8 @@ class test_state(StateResetCase):
                                       SimpleReq("bar"),
                                       SimpleReq("baz"),
                                       SimpleReq("baz")]):
-        map(state.task_accepted, requests)
+        for request in requests:
+            state.task_accepted(request)
         for req in requests:
             self.assertIn(req, state.active_requests)
         self.assertEqual(state.total_count["foo"], 1)
@@ -101,7 +104,9 @@ class test_state(StateResetCase):
 
     def test_ready(self, requests=[SimpleReq("foo"),
                                    SimpleReq("bar")]):
-        map(state.task_accepted, requests)
+        for request in requests:
+            state.task_accepted(request)
         self.assertEqual(len(state.active_requests), 2)
-        map(state.task_ready, requests)
+        for request in requests:
+            state.task_ready(requests)
         self.assertEqual(len(state.active_requests), 0)

+ 2 - 2
celery/worker/__init__.py

@@ -37,8 +37,8 @@ def process_initializer(app, hostname):
 
     """
     app = app_or_default(app)
-    map(platforms.reset_signal, WORKER_SIGRESET)
-    map(platforms.ignore_signal, WORKER_SIGIGNORE)
+    [platforms.reset_signal(signal) for signal in WORKER_SIGRESET]
+    [platforms.ignore_signal(signal) for signal in WORKER_SIGIGNORE]
     platforms.set_mp_process_title("celeryd", hostname=hostname)
 
     # This is for windows and other platforms not supporting

+ 4 - 2
celery/worker/buckets.py

@@ -138,11 +138,13 @@ class TaskBucket(object):
 
     def init_with_registry(self):
         """Initialize with buckets for all the task types in the registry."""
-        map(self.add_bucket_for_type, self.task_registry.keys())
+        for task in self.task_registry.keys():
+            self.add_bucket_for_type(task)
 
     def refresh(self):
         """Refresh rate limits for all task types in the registry."""
-        map(self.update_bucket_for_type, self.task_registry.keys())
+        for task in self.task_registry.keys():
+            self.update_bucket_for_type(task)
 
     def get_bucket_for_type(self, task_name):
         """Get the bucket for a particular task type."""