Browse Source

Use future builtins

Ask Solem 12 years ago
parent
commit
43fb28f769

+ 1 - 0
celery/__compat__.py

@@ -15,6 +15,7 @@ import operator
 import sys
 
 from functools import reduce
+from future_builtins import map
 from importlib import import_module
 from types import ModuleType
 

+ 2 - 0
celery/__init__.py

@@ -7,6 +7,8 @@
 
 from __future__ import absolute_import
 
+from future_builtins import map
+
 SERIES = 'DEVEL'
 VERSION = (3, 1, 0, 'a1')
 __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])

+ 3 - 1
celery/app/annotations.py

@@ -12,6 +12,8 @@
 """
 from __future__ import absolute_import
 
+from future_builtins import filter
+
 from celery.utils.functional import firstmethod, mpromise
 from celery.utils.imports import instantiate
 
@@ -52,4 +54,4 @@ def prepare(annotations):
         return ()
     elif not isinstance(annotations, (list, tuple)):
         annotations = (annotations, )
-    return map(expand_annotation, annotations)
+    return [expand_annotation(a) for a in annotations]

+ 2 - 1
celery/app/base.py

@@ -14,6 +14,7 @@ from collections import deque
 from contextlib import contextmanager
 from copy import deepcopy
 from functools import reduce, wraps
+from operator import attrgetter
 from threading import Lock
 
 from billiard.util import register_after_fork
@@ -335,7 +336,7 @@ class Celery(object):
         return type(name or Class.__name__, (Class, ), attrs)
 
     def _rgetattr(self, path):
-        return reduce(getattr, [self] + path.split('.'))
+        return attrgetter(path)(self)
 
     def __repr__(self):
         return '<{0} {1}:0x{2:x}>'.format(

+ 6 - 5
celery/app/builtins.py

@@ -10,6 +10,7 @@
 from __future__ import absolute_import
 
 from collections import deque
+from future_builtins import map, zip
 from itertools import starmap
 
 from celery._state import get_current_worker_task
@@ -72,7 +73,8 @@ def add_unlock_chord_task(app):
     @app.task(name='celery.chord_unlock', max_retries=None)
     def unlock_chord(group_id, callback, interval=1, propagate=False,
             max_retries=None, result=None):
-        result = _res.GroupResult(group_id, map(_res.AsyncResult, result))
+        AR = _res.AsyncResult
+        result = _res.GroupResult(group_id, [AR(r) for r in result])
         j = result.join_native if result.supports_native_join else result.join
         if result.ready():
             subtask(callback).delay(j(propagate=propagate))
@@ -155,9 +157,8 @@ def add_group_task(app):
                     tid = opts['task_id'] = uuid()
                 return task, self.AsyncResult(tid)
 
-            tasks, results = zip(*[prepare_member(task) for task in tasks])
-            return (tasks, self.app.GroupResult(group_id, results),
-                    group_id, args)
+            tasks, res = list(zip(*[prepare_member(task) for task in tasks]))
+            return (tasks, self.app.GroupResult(group_id, res), group_id, args)
 
         def apply_async(self, partial_args=(), kwargs={}, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
@@ -270,7 +271,7 @@ def add_chord_task(app):
 
             # - convert back to group if serialized
             if not isinstance(header, group):
-                header = group(map(maybe_subtask, header))
+                header = group([maybe_subtask(t) for t in  header])
             # - eager applies the group inline
             if eager:
                 return header.apply(args=partial_args, task_id=group_id)

+ 7 - 6
celery/app/log.py

@@ -95,12 +95,13 @@ class Logging(object):
             if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
                 root.handlers = []
 
-            for logger in filter(None, (root, get_multiprocessing_logger())):
-                self.setup_handlers(logger, logfile, format,
-                                    colorize, **kwargs)
-                if loglevel:
-                    logger.setLevel(loglevel)
-                signals.after_setup_logger.send(sender=None, logger=logger,
+            for logger in root, get_multiprocessing_logger():
+                if logger is not None:
+                    self.setup_handlers(logger, logfile, format,
+                                        colorize, **kwargs)
+                    if loglevel:
+                        logger.setLevel(loglevel)
+                    signals.after_setup_logger.send(sender=None, logger=logger,
                                             loglevel=loglevel, logfile=logfile,
                                             format=format, colorize=colorize)
             # then setup the root task logger.

+ 1 - 1
celery/app/routes.py

@@ -92,4 +92,4 @@ def prepare(routes):
         return ()
     if not isinstance(routes, (list, tuple)):
         routes = (routes, )
-    return map(expand_route, routes)
+    return [expand_route(route) for route in routes]

+ 2 - 2
celery/app/utils.py

@@ -85,7 +85,7 @@ class Settings(datastructures.ConfigurationView):
             False
 
         """
-        return self['_'.join(filter(None, parts))]
+        return self['_'.join(part for part in parts if part)]
 
     def humanize(self):
         """Returns a human readable string showing changes to the
@@ -138,7 +138,7 @@ def bugreport(app):
 
     return BUGREPORT_INFO.format(
         system=_platform.system(),
-        arch=', '.join(filter(None, _platform.architecture())),
+        arch=', '.join(x for x in _platform.architecture() if x),
         py_i=platforms.pyimplementation(),
         celery_v=celery.VERSION_BANNER,
         kombu_v=kombu.__version__,

+ 1 - 1
celery/apps/worker.py

@@ -196,7 +196,7 @@ class Worker(configurated):
     def tasklist(self, include_builtins=True):
         tasks = self.app.tasks.keys()
         if not include_builtins:
-            tasks = filter(lambda s: not s.startswith('celery.'), tasks)
+            tasks = [t for t in tasks if not t.startswith('celery.')]
         return '\n'.join('  . {0}'.format(task) for task in sorted(tasks))
 
     def extra_info(self):

+ 1 - 0
celery/backends/base.py

@@ -19,6 +19,7 @@ import time
 import sys
 
 from datetime import timedelta
+from future_builtins import map
 
 from kombu import serialization
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8

+ 2 - 1
celery/bin/base.py

@@ -64,6 +64,7 @@ import sys
 import warnings
 
 from collections import defaultdict
+from future_builtins import zip
 from optparse import OptionParser, IndentedHelpFormatter, make_option as Option
 from types import ModuleType
 
@@ -232,7 +233,7 @@ class Command(object):
             options = dict((k, self.expanduser(v))
                             for k, v in vars(options).iteritems()
                                 if not k.startswith('_'))
-        args = map(self.expanduser, args)
+        args = [self.expanduser(arg) for arg in args]
         self.check_args(args)
         return options, args
 

+ 2 - 1
celery/bin/celery.py

@@ -13,6 +13,7 @@ import sys
 import warnings
 
 from billiard import freeze_support
+from future_builtins import map
 from importlib import import_module
 from pprint import pformat
 
@@ -518,7 +519,7 @@ class _RemoteControl(Command):
         destination = kwargs.get('destination')
         timeout = kwargs.get('timeout') or self.choices[method][0]
         if destination and isinstance(destination, basestring):
-            destination = map(str.strip, destination.split(','))
+            destination = list(map(str.strip, destination.split(',')))
 
         try:
             handler = getattr(self, method)

+ 3 - 2
celery/bin/celeryd_multi.py

@@ -97,6 +97,7 @@ import socket
 import sys
 
 from collections import defaultdict
+from future_builtins import map
 from subprocess import Popen
 from time import sleep
 
@@ -424,7 +425,7 @@ def multi_args(p, cmd='celeryd', append='', prefix='', suffix=''):
         except ValueError:
             pass
         else:
-            names = map(str, range(1, noderange + 1))
+            names = list(map(str, range(1, noderange + 1)))
             prefix = 'celery'
     cmd = options.pop('--cmd', cmd)
     append = options.pop('--append', append)
@@ -525,7 +526,7 @@ def parse_ns_range(ns, ranges=False):
     for space in ',' in ns and ns.split(',') or [ns]:
         if ranges and '-' in space:
             start, stop = space.split('-')
-            x = map(str, range(int(start), int(stop) + 1))
+            x = list(map(str, range(int(start), int(stop) + 1)))
             ret.extend(x)
         else:
             ret.append(space)

+ 2 - 1
celery/canvas.py

@@ -11,6 +11,7 @@
 """
 from __future__ import absolute_import
 
+from future_builtins import map
 from operator import itemgetter
 from itertools import chain as _chain
 
@@ -311,7 +312,7 @@ class group(Signature):
 
     def __call__(self, *partial_args, **options):
         tasks, result, gid, args = self.type.prepare(options,
-                    map(Signature.clone, self.tasks), partial_args)
+                    [Signature.clone(t) for t in self.tasks], partial_args)
         return self.type(tasks, result, gid, args)
 
     def skew(self, start=1.0, stop=None, step=1.0):

+ 1 - 0
celery/contrib/rdb.py

@@ -41,6 +41,7 @@ import os
 import socket
 import sys
 
+from future_builtins import map
 from pdb import Pdb
 
 from billiard import current_process

+ 1 - 0
celery/loaders/base.py

@@ -13,6 +13,7 @@ import importlib
 import os
 import re
 
+from future_builtins import map
 from datetime import datetime
 
 from kombu.utils.encoding import safe_str

+ 2 - 4
celery/platforms.py

@@ -18,6 +18,7 @@ import signal as _signal
 import sys
 
 from contextlib import contextmanager
+from future_builtins import map
 
 from .local import try_import
 
@@ -244,10 +245,7 @@ class DaemonContext(object):
             os.chdir(self.workdir)
             os.umask(self.umask)
 
-            for fd in reversed(range(get_fdmax(default=2048))):
-                with ignore_EBADF():
-                    os.close(fd)
-
+            os.closerange(1, get_fdmax(default=2048))
             os.open(DAEMON_REDIRECT_TO, os.O_RDWR)
             os.dup2(0, 1)
             os.dup2(0, 2)

+ 3 - 3
celery/result.py

@@ -12,7 +12,7 @@ import time
 
 from collections import deque
 from copy import copy
-from itertools import imap
+from future_builtins import map
 
 from . import current_app
 from . import states
@@ -228,7 +228,7 @@ class AsyncResult(ResultBase):
     def children(self):
         children = self.backend.get_children(self.id)
         if children:
-            return map(from_serializable, children)
+            return [from_serializable(child) for child in children]
 
     @property
     def result(self):
@@ -386,7 +386,7 @@ class ResultSet(ResultBase):
         :returns: the number of tasks completed.
 
         """
-        return sum(imap(int, (result.successful() for result in self.results)))
+        return sum(map(int, (result.successful() for result in self.results)))
 
     def forget(self):
         """Forget about (and possible remove the result of) all the tasks."""

+ 1 - 0
celery/security/serialization.py

@@ -10,6 +10,7 @@ from __future__ import absolute_import
 
 import base64
 
+from future_builtins import zip
 from kombu.serialization import registry, encode, decode
 from kombu.utils.encoding import bytes_to_str, str_to_bytes
 

+ 3 - 2
celery/tests/utilities/test_platforms.py

@@ -327,10 +327,11 @@ if not current_app.IS_WINDOWS:
         @patch('os.chdir')
         @patch('os.umask')
         @patch('os.close')
+        @patch('os.closerange')
         @patch('os.open')
         @patch('os.dup2')
-        def test_open(self, dup2, open, close, umask, chdir, _exit, setsid,
-                fork):
+        def test_open(self, dup2, open, close, closer, umask, chdir,
+                _exit, setsid, fork):
             x = DaemonContext(workdir='/opt/workdir')
 
             fork.return_value = 0

+ 5 - 7
celery/utils/__init__.py

@@ -8,7 +8,6 @@
 """
 from __future__ import absolute_import, print_function
 
-import operator
 import os
 import sys
 import threading
@@ -129,11 +128,10 @@ def fun_takes_kwargs(fun, kwlist=[]):
         ['logfile', 'loglevel', 'task_id']
 
     """
-    argspec = getattr(fun, 'argspec', getargspec(fun))
-    args, _varargs, keywords, _defaults = argspec
-    if keywords != None:
+    S = getattr(fun, 'argspec', getargspec(fun))
+    if S.keywords != None:
         return kwlist
-    return filter(partial(operator.contains, args), kwlist)
+    return [kw for kw in kwlist if kw in S.args]
 
 
 def isatty(fh):
@@ -205,9 +203,9 @@ def jsonify(obj):
     if isinstance(obj, (int, float, basestring, types.NoneType)):
         return obj
     elif isinstance(obj, (tuple, list)):
-        return map(jsonify, obj)
+        return [jsonify(o) for o in obj]
     elif isinstance(obj, dict):
-        return dict([(k, jsonify(v)) for k, v in obj.iteritems()])
+        return dict((k, jsonify(v)) for k, v in obj.iteritems())
     # See "Date Time String Format" in the ECMA-262 specification.
     elif isinstance(obj, datetime.datetime):
         r = obj.isoformat()

+ 1 - 1
celery/utils/debug.py

@@ -41,7 +41,7 @@ def memdump(samples=10):
     if ps() is None:
         print('- rss: (psutil not installed).')
         return
-    if filter(None, _mem_sample):
+    if any(_mem_sample):
         print('- rss (sample):')
         for mem in sample(_mem_sample, samples):
             print('-    > {0},'.format(mem))

+ 1 - 1
celery/utils/log.py

@@ -142,7 +142,7 @@ class LoggingProxy(object):
 
             handler.handleError = WithSafeHandleError().handleError
 
-        return map(wrap_handler, self.logger.handlers)
+        return [wrap_handler(l) for l in self.logger.handlers]
 
     def write(self, data):
         """Write message to logging object."""

+ 1 - 0
celery/utils/mail.py

@@ -13,6 +13,7 @@ import traceback
 import warnings
 
 from email.mime.text import MIMEText
+from future_builtins import map
 
 from .functional import maybe_list
 from .imports import symbol_by_name

+ 3 - 2
celery/utils/text.py

@@ -8,7 +8,8 @@
 """
 from __future__ import absolute_import
 
-import textwrap
+from future_builtins import filter, map
+from textwrap import fill
 
 from pprint import pformat
 
@@ -22,7 +23,7 @@ def dedent(s, n=4):
 
 
 def fill_paragraphs(s, width):
-    return '\n'.join(textwrap.fill(p, width) for p in s.split('\n'))
+    return '\n'.join(fill(p, width) for p in s.split('\n'))
 
 
 def join(l):

+ 0 - 2
celery/utils/threads.py

@@ -15,8 +15,6 @@ import traceback
 
 from kombu.syn import detect_environment
 
-active_count = (getattr(threading, 'active_count', None) or
-                threading.activeCount)
 USE_PURE_LOCALS = os.environ.get("USE_PURE_LOCALS")
 
 

+ 3 - 3
celery/utils/timer2.py

@@ -14,6 +14,7 @@ import os
 import sys
 
 from functools import wraps
+from future_builtins import map
 from itertools import count
 from threading import Condition, Event, Lock, Thread
 from time import time, sleep, mktime
@@ -56,8 +57,7 @@ class Entry(object):
     if sys.version_info[0] == 3:  # pragma: no cover
 
         def __hash__(self):
-            return hash('|'.join(map(repr, (self.fun, self.args,
-                                            self.kwargs))))
+            return hash('{0.fun!r}|{0.args!r}|{0.kwargs!r}'.format(self))
 
         def __lt__(self, other):
             return hash(self) < hash(other)
@@ -206,7 +206,7 @@ class Schedule(object):
     @property
     def queue(self):
         events = list(self._queue)
-        return map(heapq.heappop, [events] * len(events))
+        return [heapq.heappop(x) for x in [events] * len(events)]
 
 
 class Timer(Thread):

+ 2 - 1
celery/utils/timeutils.py

@@ -13,6 +13,7 @@ from kombu.utils import cached_property
 from datetime import datetime, timedelta
 from dateutil import tz
 from dateutil.parser import parse as parse_iso8601
+from future_builtins import zip
 
 from celery.exceptions import ImproperlyConfigured
 
@@ -25,7 +26,7 @@ except ImportError:     # pragma: no cover
 
 
 DAYNAMES = 'sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'
-WEEKDAYS = dict((name, dow) for name, dow in zip(DAYNAMES, range(7)))
+WEEKDAYS = dict(zip(DAYNAMES, range(7)))
 
 RATE_MODIFIER_MAP = {'s': lambda n: n,
                      'm': lambda n: n / 60.0,

+ 2 - 2
celery/worker/buckets.py

@@ -214,8 +214,8 @@ class TaskBucket(object):
         """Flattens the data in all of the buckets into a single list."""
         # for queues with contents [(1, 2), (3, 4), (5, 6), (7, 8)]
         # zips and flattens to [1, 3, 5, 7, 2, 4, 6, 8]
-        return filter(None, chain.from_iterable(izip_longest(*[bucket.items
-                                    for bucket in self.buckets.values()])))
+        return [x for x in chain.from_iterable(izip_longest(*[bucket.items
+                    for bucket in self.buckets.values()])) if x]
 
 
 class FastQueue(Queue):

+ 14 - 24
celery/worker/control.py

@@ -8,6 +8,7 @@
 """
 from __future__ import absolute_import
 
+from future_builtins import map
 from datetime import datetime
 
 from kombu.utils.encoding import safe_repr
@@ -145,23 +146,16 @@ def dump_schedule(panel, safe=False, **kwargs):
     from celery.worker.job import Request
     schedule = panel.consumer.timer.schedule
     if not schedule.queue:
-        logger.debug('--Empty schedule--')
         return []
 
-    formatitem = lambda (i, item): '{0}. {1} pri{2} {3!r}'.format(i,
-            datetime.utcfromtimestamp(item['eta']),
-            item['priority'], item['item'])
-    info = map(formatitem, enumerate(schedule.info()))
-    logger.debug('* Dump of current schedule:\n%s', '\n'.join(info))
-    scheduled_tasks = []
-    for info in schedule.info():
-        item = info['item']
-        if item.args and isinstance(item.args[0], Request):
-            scheduled_tasks.append({'eta': info['eta'],
-                                    'priority': info['priority'],
-                                    'request':
-                                        item.args[0].info(safe=safe)})
-    return scheduled_tasks
+    def prepare_entries():
+        for entry in schedule.info():
+            item = entry['item']
+            if item.args and isinstance(item.args[0], Request):
+                yield {'eta': entry['eta'],
+                       'priority': entry['priority'],
+                       'request': item.args[0].info(safe=safe)}
+    return list(prepare_entries())
 
 
 @Panel.register
@@ -208,16 +202,12 @@ def dump_tasks(panel, taskinfoitems=None, **kwargs):
         fields = dict((field, str(getattr(task, field, None)))
                         for field in taskinfoitems
                             if getattr(task, field, None) is not None)
-        info = map('='.join, fields.items())
-        if not info:
-            return task.name
-        return '{0} [{1}]'.format(task.name, ' '.join(info))
+        if fields:
+            info = map('='.join, fields.iteritems())
+            return '{0} [{1}]'.format(task.name, ' '.join(info))
+        return task.name
 
-    info = map(_extract_info, (tasks[task]
-                                    for task in sorted(tasks.keys())))
-    logger.debug('* Dump of currently registered tasks:\n%s', '\n'.join(info))
-
-    return info
+    return [_extract_info(tasks[task]) for task in sorted(tasks)]
 
 
 @Panel.register