Browse Source

Merge branch '3.0'

Conflicts:
	celery/__init__.py
	celery/app/annotations.py
	celery/app/builtins.py
	celery/app/utils.py
	celery/apps/worker.py
	celery/backends/base.py
	celery/bin/base.py
	celery/bin/celery.py
	celery/bin/multi.py
	celery/canvas.py
	celery/contrib/rdb.py
	celery/loaders/base.py
	celery/result.py
	celery/task/http.py
	celery/utils/__init__.py
	celery/utils/log.py
	celery/utils/mail.py
	celery/utils/text.py
	celery/utils/timer2.py
	celery/worker/control.py
	celery/worker/hub.py
Ask Solem 12 years ago
parent
commit
bbf2f26adc

+ 1 - 1
celery/__init__.py

@@ -9,7 +9,7 @@ from __future__ import absolute_import
 
 SERIES = 'Cipater'
 VERSION = (3, 1, 0, 'rc1')
-__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
+__version__ = '.'.join(str(p) for p in VERSION[0:3]) + ''.join(VERSION[3:])
 __author__ = 'Ask Solem'
 __contact__ = 'ask@celeryproject.org'
 __homepage__ = 'http://celeryproject.org'

+ 13 - 2
celery/_state.py

@@ -9,9 +9,10 @@
     This module shouldn't be used directly.
 
 """
-from __future__ import absolute_import
+from __future__ import absolute_import, print_function
 
 import os
+import sys
 import threading
 import weakref
 
@@ -40,7 +41,7 @@ def set_default_app(app):
     default_app = app
 
 
-def get_current_app():
+def _get_current_app():
     if default_app is None:
         #: creates the global fallback app instance.
         from celery.app import Celery
@@ -51,6 +52,16 @@ def get_current_app():
         ))
     return _tls.current_app or default_app
 
+C_STRICT_APP = os.environ.get('C_STRICT_APP')
+if os.environ.get('C_STRICT_APP'):
+    def get_current_app():
+        import traceback
+        print('-- USES CURRENT_APP', file=sys.stderr)  # noqa+
+        traceback.print_stack(file=sys.stderr)
+        return _get_current_app()
+else:
+    get_current_app = _get_current_app
+
 
 def get_current_task():
     """Currently executing task."""

+ 1 - 1
celery/app/annotations.py

@@ -53,4 +53,4 @@ def prepare(annotations):
         return ()
     elif not isinstance(annotations, (list, tuple)):
         annotations = (annotations, )
-    return [expand_annotation(a) for a in annotations]
+    return [expand_annotation(anno) for anno in annotations]

+ 5 - 5
celery/apps/worker.py

@@ -155,11 +155,11 @@ class Worker(WorkController):
         print('purge: Erased {0} {1} from the queue.\n'.format(
             count, pluralize(count, 'message')))
 
-    def tasklist(self, include_builtins=True):
-        tasks = self.app.tasks
-        if not include_builtins:
-            tasks = (t for t in tasks if not t.startswith('celery.'))
-        return '\n'.join('  . {0}'.format(task) for task in sorted(tasks))
+    def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):
+        return sep.join(
+            '  . {0}'.format(task) for task in sorted(self.app.tasks)
+            if (not task.startswith(int_) if not include_builtins else task)
+        )
 
     def extra_info(self):
         if self.loglevel <= logging.INFO:

+ 1 - 1
celery/backends/base.py

@@ -372,7 +372,7 @@ class KeyValueStoreBackend(BaseBackend):
             r = self._mget_to_results(self.mget([self.get_key_for_task(k)
                                                  for k in keys]), keys)
             self._cache.update(r)
-            ids.difference_update(set(map(bytes_to_str, r)))
+            ids.difference_update(set(bytes_to_str(v) for v in r))
             for key, value in items(r):
                 yield bytes_to_str(key), value
             if timeout and iterations * interval >= timeout:

+ 23 - 18
celery/canvas.py

@@ -19,7 +19,6 @@ from itertools import chain as _chain
 from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 
 from celery import current_app
-from celery.local import Proxy
 from celery.result import AsyncResult, GroupResult
 from celery.utils.functional import (
     maybe_list, is_list, regen,
@@ -27,8 +26,6 @@ from celery.utils.functional import (
 )
 from celery.utils.text import truncate
 
-Chord = Proxy(lambda: current_app.tasks['celery.chord'])
-
 
 class _getitem_property(object):
     """Attribute -> dict key descriptor.
@@ -292,7 +289,8 @@ class chain(Signature):
         self.subtask_type = 'chain'
 
     def __call__(self, *args, **kwargs):
-        return self.apply_async(args, kwargs)
+        if self.tasks:
+            return self.apply_async(args, kwargs)
 
     @classmethod
     def from_dict(self, d):
@@ -302,8 +300,12 @@ class chain(Signature):
             tasks[0]['args'] = d['args'] + tasks[0]['args']
         return chain(*d['kwargs']['tasks'], **kwdict(d['options']))
 
+    @property
+    def type(self):
+        return self._type or self.tasks[0].type.app.tasks['celery.chain']
+
     def __repr__(self):
-        return ' | '.join(map(repr, self.tasks))
+        return ' | '.join(repr(t) for t in self.tasks)
 Signature.register_type(chain)
 
 
@@ -410,10 +412,15 @@ class group(Signature):
         return group(tasks, **kwdict(d['options']))
 
     def __call__(self, *partial_args, **options):
-        tasks, result, gid, args = self.type.prepare(
-            options, [Signature.clone(t) for t in self.tasks], partial_args,
-        )
-        return self.type(tasks, result, gid, args)
+        tasks = [task.clone() for task in self.tasks]
+        if not tasks:
+            return
+        # taking the app from the first task in the list,
+        # there may be a better solution to this, e.g.
+        # consolidate tasks with the same app and apply them in
+        # batches.
+        type = tasks[0].type.app.tasks[self['task']]
+        return type(*type.prepare(options, tasks, partial_args))
 
     def _freeze(self, _id=None):
         opts = self.options
@@ -444,7 +451,6 @@ Signature.register_type(group)
 
 
 class chord(Signature):
-    Chord = Chord
 
     def __init__(self, header, body=None, task='celery.chord',
                  args=(), kwargs={}, **options):
@@ -466,8 +472,12 @@ class chord(Signature):
         # than manually popping things off.
         return (header, body), kwargs
 
+    @property
+    def type(self):
+        return self._type or self.tasks[0].type.app.tasks['celery.chord']
+
     def __call__(self, body=None, **kwargs):
-        _chord = self.Chord
+        _chord = self.type
         body = (body or self.kwargs['body']).clone()
         kwargs = dict(self.kwargs, body=body, **kwargs)
         if _chord.app.conf.CELERY_ALWAYS_EAGER:
@@ -497,13 +507,8 @@ class chord(Signature):
             return self.body.reprcall(self.tasks)
         return '<chord without body: {0.tasks!r}>'.format(self)
 
-    @property
-    def tasks(self):
-        return self.kwargs['header']
-
-    @property
-    def body(self):
-        return self.kwargs.get('body')
+    tasks = _getitem_property('kwargs.header')
+    body = _getitem_property('kwargs.body')
 Signature.register_type(chord)
 
 

+ 1 - 1
celery/contrib/rdb.py

@@ -98,7 +98,7 @@ class Rdb(Pdb):
         self.say(BANNER.format(self=self))
 
         self._client, address = self._sock.accept()
-        self.remote_addr = ':'.join(map(str, address))
+        self.remote_addr = ':'.join(str(v) for v in address)
         self.say(SESSION_STARTED.format(self=self))
         self._handle = sys.stdin = sys.stdout = self._client.makefile('rw')
         Pdb.__init__(self, completekey='tab',

+ 9 - 5
celery/five.py

@@ -314,11 +314,15 @@ def recreate_module(name, compat_modules=(), by_module={}, direct={},
     origins = get_origins(by_module)
     compat_modules = COMPAT_MODULES.get(name, ())
 
-    cattrs = dict(_compat_modules=compat_modules,
-                  _all_by_module=by_module, _direct=direct,
-                  _object_origins=origins,
-                  __all__=tuple(set(reduce(operator.add, map(tuple, [
-                                compat_modules, origins, direct, attrs])))))
+    cattrs = dict(
+        _compat_modules=compat_modules,
+        _all_by_module=by_module, _direct=direct,
+        _object_origins=origins,
+        __all__=tuple(set(reduce(
+            operator.add,
+            [tuple(v) for v in [compat_modules, origins, direct, attrs]],
+        ))),
+    )
     new_module = create_module(name, attrs, cls_attrs=cattrs, base=base)
     new_module.__dict__.update(dict((mod, get_compat_module(new_module, mod))
                                for mod in compat_modules))

+ 0 - 1
celery/loaders/base.py

@@ -223,7 +223,6 @@ class BaseLoader(object):
                     # display key name in error message.
                     raise ValueError('{0!r}: {1}'.format(ns_key, exc))
             return ns_key, value
-
         return dict(getarg(arg) for arg in args)
 
     def mail_admins(self, subject, body, fail_silently=False,

+ 3 - 3
celery/platforms.py

@@ -21,7 +21,7 @@ from kombu.utils.encoding import safe_str
 from contextlib import contextmanager
 
 from .local import try_import
-from .five import items, map, range, reraise, string_t
+from .five import items, range, reraise, string_t
 
 _setproctitle = try_import('setproctitle')
 resource = try_import('resource')
@@ -61,9 +61,9 @@ def pyimplementation():
     elif sys.platform.startswith('java'):
         return 'Jython ' + sys.platform
     elif hasattr(sys, 'pypy_version_info'):
-        v = '.'.join(map(str, sys.pypy_version_info[:3]))
+        v = '.'.join(str(p) for p in sys.pypy_version_info[:3])
         if sys.pypy_version_info[3:]:
-            v += '-' + ''.join(map(str, sys.pypy_version_info[3:]))
+            v += '-' + ''.join(str(p) for p in sys.pypy_version_info[3:])
         return 'PyPy ' + v
     else:
         return 'CPython'

+ 2 - 2
celery/result.py

@@ -21,7 +21,7 @@ from . import states
 from .app import app_or_default
 from .datastructures import DependencyGraph, GraphFormatter
 from .exceptions import IncompleteStream, TimeoutError
-from .five import items, map, range, string_t
+from .five import items, range, string_t
 
 
 class ResultBase(object):
@@ -394,7 +394,7 @@ class ResultSet(ResultBase):
         :returns: the number of tasks completed.
 
         """
-        return sum(map(int, (result.successful() for result in self.results)))
+        return sum(int(result.successful()) for result in self.results)
 
     def forget(self):
         """Forget about (and possible remove the result of) all the tasks."""

+ 3 - 3
celery/tests/tasks/test_chord.py

@@ -132,7 +132,7 @@ class test_chord(AppCase):
         m = Mock()
         m.app.conf.CELERY_ALWAYS_EAGER = False
         m.AsyncResult = AsyncResult
-        prev, chord.Chord = chord.Chord, m
+        prev, chord._type = chord._type, m
         try:
             x = chord(add.s(i, i) for i in range(10))
             body = add.s(2)
@@ -141,9 +141,9 @@ class test_chord(AppCase):
             # does not modify original subtask
             with self.assertRaises(KeyError):
                 body.options['task_id']
-            self.assertTrue(chord.Chord.called)
+            self.assertTrue(chord._type.called)
         finally:
-            chord.Chord = prev
+            chord._type = prev
 
 
 class test_Chord_task(AppCase):

+ 2 - 2
celery/utils/__init__.py

@@ -205,7 +205,7 @@ def jsonify(obj, builtin_types=(int, float, string_t)):
     if obj is None or isinstance(obj, builtin_types):
         return obj
     elif isinstance(obj, (tuple, list)):
-        return [jsonify(o) for o in obj]
+        return [jsonify(v) for v in obj]
     elif isinstance(obj, dict):
         return dict((k, jsonify(v)) for k, v in items(obj))
     elif isinstance(obj, datetime.datetime):
@@ -246,7 +246,7 @@ def gen_task_name(app, name, module_name):
             module_name = '__main__'
     if module_name == '__main__' and app.main:
         return '.'.join([app.main, name])
-    return '.'.join(p for p in [module_name, name] if p)
+    return '.'.join(p for p in (module_name, name) if p)
 
 
 def nodename(name, hostname):

+ 11 - 9
celery/utils/functional.py

@@ -8,10 +8,9 @@
 """
 from __future__ import absolute_import
 
-import operator
 import threading
 
-from functools import partial, wraps
+from functools import wraps
 from itertools import islice
 
 from kombu.utils import cached_property
@@ -21,7 +20,6 @@ from kombu.utils.compat import OrderedDict
 from celery.five import UserDict, UserList, items, keys, string_t
 
 KEYWORD_MARK = object()
-is_not_None = partial(operator.is_not, None)
 
 
 class LRUCache(UserDict):
@@ -172,13 +170,17 @@ def noop(*args, **kwargs):
     pass
 
 
-def first(predicate, iterable):
+def first(predicate, it):
     """Returns the first element in `iterable` that `predicate` returns a
-    :const:`True` value for."""
-    predicate = predicate or is_not_None
-    for item in iterable:
-        if predicate(item):
-            return item
+    :const:`True` value for.
+
+    If `predicate` is None it will return the first item that is not None.
+
+    """
+    return next(
+        (v for v in it if (predicate(v) if predicate else v is not None)),
+        None,
+    )
 
 
 def firstmethod(method):

+ 1 - 2
celery/utils/log.py

@@ -152,8 +152,7 @@ class LoggingProxy(object):
                         del(exc_info)
 
             handler.handleError = WithSafeHandleError().handleError
-
-        return [wrap_handler(l) for l in self.logger.handlers]
+        return [wrap_handler(h) for h in self.logger.handlers]
 
     def write(self, data):
         """Write message to logging object."""

+ 15 - 13
celery/utils/text.py

@@ -17,21 +17,21 @@ def dedent_initial(s, n=4):
     return s[n:] if s[:n] == ' ' * n else s
 
 
-def dedent(s, n=4):
-    return '\n'.join(dedent_initial(p) for p in s.splitlines())
+def dedent(s, n=4, sep='\n'):
+    return sep.join(dedent_initial(l) for l in s.splitlines())
 
 
-def fill_paragraphs(s, width):
-    return '\n'.join(fill(p, width) for p in s.split('\n'))
+def fill_paragraphs(s, width, sep='\n'):
+    return sep.join(fill(p, width) for p in s.split(sep))
 
 
-def join(l):
-    return '\n'.join(part for part in l if part)
+def join(l, sep='\n'):
+    return sep.join(v for v in l if v)
 
 
-def ensure_2lines(s):
+def ensure_2lines(s, sep='\n'):
     if len(s.splitlines()) <= 2:
-        return s + '\n'
+        return s + sep
     return s
 
 
@@ -53,9 +53,9 @@ def abbrtask(S, max):
     return S
 
 
-def indent(t, indent=0):
+def indent(t, indent=0, sep='\n'):
     """Indent text."""
-    return '\n'.join(' ' * indent + p for p in t.split('\n'))
+    return sep.join(' ' * indent + p for p in t.split(sep))
 
 
 def truncate(text, maxlen=128, suffix='...'):
@@ -71,10 +71,12 @@ def pluralize(n, text, suffix='s'):
     return text
 
 
-def pretty(value, width=80, nl_width=80, **kw):
+def pretty(value, width=80, nl_width=80, sep='\n', **kw):
     if isinstance(value, dict):
-        return '{{\n {0}'.format(pformat(value, 4, nl_width)[1:])
+        return '{{{0} {1}'.format(sep, pformat(value, 4, nl_width)[1:])
     elif isinstance(value, tuple):
-        return '\n{0}{1}'.format(' ' * 4, pformat(value, width=nl_width, **kw))
+        return '{0}{1}{2}'.format(
+            sep, ' ' * 4, pformat(value, width=nl_width, **kw),
+        )
     else:
         return pformat(value, width=width, **kw)

+ 5 - 4
celery/utils/timer2.py

@@ -20,12 +20,12 @@ from itertools import count
 from time import time, sleep
 from weakref import proxy as weakrefproxy
 
-from celery.five import THREAD_TIMEOUT_MAX, map
+from celery.five import THREAD_TIMEOUT_MAX
 from celery.utils.timeutils import timedelta_seconds, timezone
 from kombu.log import get_logger
 
 VERSION = (1, 0, 0)
-__version__ = '.'.join(map(str, VERSION))
+__version__ = '.'.join(str(p) for p in VERSION)
 __author__ = 'Ask Solem'
 __contact__ = 'ask@celeryproject.org'
 __homepage__ = 'http://github.com/ask/timer2/'
@@ -215,9 +215,10 @@ class Schedule(object):
         tref.cancel()
 
     @property
-    def queue(self):
+    def queue(self, _pop=heapq.heappop):
+        """Snapshot of underlying datastructure."""
         events = list(self._queue)
-        return [heapq.heappop(x) for x in [events] * len(events)]
+        return [_pop(v) for v in [events] * len(events)]
 
 
 class Timer(threading.Thread):

+ 6 - 4
celery/worker/control.py

@@ -8,6 +8,8 @@
 """
 from __future__ import absolute_import
 
+import logging
+
 from kombu.utils.encoding import safe_repr
 
 from celery.five import UserDict, items
@@ -161,10 +163,10 @@ def dump_reserved(panel, safe=False, **kwargs):
     if not reserved:
         logger.debug('--Empty queue--')
         return []
-    logger.debug('* Dump of currently reserved tasks:\n%s',
-                 '\n'.join(safe_repr(id) for id in reserved))
-    return [request.info(safe=safe)
-            for request in reserved]
+    if logger.isEnabledFor(logging.DEBUG):
+        logger.debug('* Dump of currently reserved tasks:\n%s',
+                     '\n'.join(safe_repr(id) for id in reserved))
+    return [request.info(safe=safe) for request in reserved]
 
 
 @Panel.register

+ 4 - 4
celery/worker/hub.py

@@ -177,11 +177,11 @@ class Hub(object):
     def add_writer(self, fd, callback):
         return self.add(fd, callback, WRITE)
 
-    def update_readers(self, map):
-        [self.add_reader(*x) for x in items(map)]
+    def update_readers(self, readers):
+        [self.add_reader(*x) for x in items(readers)]
 
-    def update_writers(self, map):
-        [self.add_writer(*x) for x in items(map)]
+    def update_writers(self, writers):
+        [self.add_writer(*x) for x in items(writers)]
 
     def _unregister(self, fd):
         try: