Просмотр исходного кода

Merge branch '3.0'

Conflicts:
	celery/backends/mongodb.py
	celery/bin/amqp.py
	celery/bin/multi.py
	celery/schedules.py
	celery/tests/utils.py
	celery/worker/__init__.py
	requirements/default.txt
	setup.cfg
Ask Solem 12 лет назад
Родитель
Сommit
19af4433db
44 измененных файлов с 263 добавлено и 175 удалено
  1. 1 0
      CONTRIBUTORS.txt
  2. 15 1
      Changelog
  3. 11 11
      celery/__init__.py
  4. 6 2
      celery/app/amqp.py
  5. 2 1
      celery/app/base.py
  6. 2 1
      celery/app/builtins.py
  7. 3 2
      celery/apps/beat.py
  8. 2 1
      celery/apps/worker.py
  9. 5 3
      celery/backends/mongodb.py
  10. 5 3
      celery/backends/redis.py
  11. 9 8
      celery/bin/base.py
  12. 6 6
      celery/bin/beat.py
  13. 7 7
      celery/bin/events.py
  14. 4 3
      celery/bin/multi.py
  15. 2 2
      celery/bootsteps.py
  16. 6 4
      celery/datastructures.py
  17. 4 3
      celery/events/cursesmon.py
  18. 8 8
      celery/loaders/base.py
  19. 9 6
      celery/schedules.py
  20. 4 4
      celery/task/__init__.py
  21. 2 3
      celery/task/http.py
  22. 12 13
      celery/tests/backends/test_database.py
  23. 3 3
      celery/tests/concurrency/test_concurrency.py
  24. 3 3
      celery/tests/slow/test_buckets.py
  25. 4 1
      celery/tests/tasks/test_tasks.py
  26. 12 8
      celery/tests/utilities/test_info.py
  27. 8 7
      celery/tests/utils.py
  28. 1 1
      celery/tests/worker/test_control.py
  29. 3 3
      celery/utils/__init__.py
  30. 1 1
      celery/utils/dispatch/saferef.py
  31. 4 4
      celery/utils/timeutils.py
  32. 1 1
      celery/worker/__init__.py
  33. 1 1
      celery/worker/consumer.py
  34. 3 4
      celery/worker/control.py
  35. 2 2
      celery/worker/heartbeat.py
  36. 27 26
      celery/worker/job.py
  37. 5 5
      celery/worker/loops.py
  38. 2 1
      celery/worker/pidbox.py
  39. 4 4
      celery/worker/state.py
  40. 3 3
      celery/worker/strategy.py
  41. 46 0
      docs/getting-started/brokers/redis.rst
  42. 1 1
      pavement.py
  43. 2 2
      requirements/default.txt
  44. 2 2
      setup.cfg

+ 1 - 0
CONTRIBUTORS.txt

@@ -123,3 +123,4 @@ Alexey Zatelepin, 2012/09/18
 Sundar Raman, 2012/09/24
 Henri Colas, 2012/11/16
 Thomas Grainger, 2012/11/29
+Marius Gedminas, 2012/11/29

+ 15 - 1
Changelog

@@ -20,6 +20,21 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 ======
 :release-date: 2012-11-30 XX:XX:XX X.X UTC
 
+- Now depends on Kombu 2.5
+
+    - py-amqp has replaced amqplib as the default transport,
+      gaining support for AMQP 0.9, and the RabbitMQ extensions
+      including Consumer Cancel Notifications and heartbeats.
+
+    - support for multiple connection URLs for failover.
+
+    - Read more in the `Kombu 2.5 changelog`_.
+
+    .. _`Kombu 2.5 changelog`:
+        http://kombu.readthedocs.org/en/latest/changelog.html#version-2-5-0
+
+- Now depends on billiard 2.7.3.19
+
 - Fixed a deadlock issue that could occur when the producer pool
   inherited the connection pool instance of the parent process.
 
@@ -43,7 +58,6 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
             b12ead10-a622-4d44-86e9-3193a778f345,
             26c7a420-11f3-4b33-8fac-66cd3b62abfd]>
 
-
 - Chains can now chain other chains and use partial arguments (Issue #1057).
 
     Example:

+ 11 - 11
celery/__init__.py

@@ -44,26 +44,26 @@ if STATICA_HACK:
     # This is never executed, but tricks static analyzers (PyDev, PyCharm,
     # pylint, etc.) into knowing the types of these symbols, and what
     # they contain.
-    from celery.app.base import Celery                  # noqa
-    from celery.app.utils import bugreport              # noqa
-    from celery.app.task import Task                    # noqa
-    from celery._state import current_app, current_task # noqa
-    from celery.canvas import (                         # noqa
+    from celery.app.base import Celery                   # noqa
+    from celery.app.utils import bugreport               # noqa
+    from celery.app.task import Task                     # noqa
+    from celery._state import current_app, current_task  # noqa
+    from celery.canvas import (                          # noqa
         chain, chord, chunks, group, subtask, xmap, xstarmap,
     )
-    from celery.utils import uuid                       # noqa
+    from celery.utils import uuid                        # noqa
 
 # Lazy loading
 from .five import recreate_module
 
 old_module, new_module = recreate_module(__name__,  # pragma: no cover
     by_module={
-        'celery.app':      ['Celery', 'bugreport', 'shared_task'],
+        'celery.app': ['Celery', 'bugreport', 'shared_task'],
         'celery.app.task': ['Task'],
-        'celery._state':   ['current_app', 'current_task'],
-        'celery.canvas':   ['chain', 'chord', 'chunks', 'group',
-                            'subtask', 'xmap', 'xstarmap'],
-        'celery.utils':    ['uuid'],
+        'celery._state': ['current_app', 'current_task'],
+        'celery.canvas': ['chain', 'chord', 'chunks', 'group',
+                          'subtask', 'xmap', 'xstarmap'],
+        'celery.utils': ['uuid'],
     },
     direct={'task': 'celery.task'},
     __package__='celery', __file__=__file__,

+ 6 - 2
celery/app/amqp.py

@@ -170,6 +170,7 @@ class TaskProducer(Producer):
                                         self.retry_policy or {})
         exchange = exchange or self.exchange
         self.queues = self.app.amqp.queues  # shortcut
+        self.default_queue = self.app.amqp.default_queue
         super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
 
     def publish_task(self, task_name, task_args=None, task_kwargs=None,
@@ -185,8 +186,9 @@ class TaskProducer(Producer):
         """Send task message."""
         retry = self.retry if retry is None else retry
 
-        declare = declare or []
         qname = queue
+        if queue is None and exchange is None:
+            queue = self.default_queue
         if queue is not None:
             if isinstance(queue, basestring):
                 qname, queue = queue, self.queues[queue]
@@ -194,6 +196,7 @@ class TaskProducer(Producer):
                 qname = queue.name
             exchange = exchange or queue.exchange.name
             routing_key = routing_key or queue.routing_key
+        declare = declare or ([queue] if queue else [])
 
         # merge default and custom policy
         retry = self.retry if retry is None else retry
@@ -378,7 +381,8 @@ class AMQP(object):
     @property
     def producer_pool(self):
         if self._producer_pool is None:
-            self._producer_pool = ProducerPool(self.app.pool,
+            self._producer_pool = ProducerPool(
+                self.app.pool,
                 limit=self.app.pool.limit,
                 Producer=self.TaskProducer,
             )

+ 2 - 1
celery/app/base.py

@@ -159,7 +159,8 @@ class Celery(object):
             # the task instance from the current app.
             # Really need a better solution for this :(
             from . import shared_task as proxies_to_curapp
-            return proxies_to_curapp(*args, _force_evaluate=True, **opts)
+            opts['_force_evaluate'] = True  # XXX Py2.5
+            return proxies_to_curapp(*args, **opts)
 
         def inner_create_task_cls(shared=True, filter=None, **opts):
 

+ 2 - 1
celery/app/builtins.py

@@ -79,7 +79,8 @@ def add_unlock_chord_task(app):
         if result.ready():
             subtask(callback).delay(j(propagate=propagate))
         else:
-            return unlock_chord.retry(countdown=interval, max_retries=max_retries)
+            return unlock_chord.retry(countdown=interval,
+                                      max_retries=max_retries)
     return unlock_chord
 
 

+ 3 - 2
celery/apps/beat.py

@@ -55,7 +55,8 @@ class Beat(configurated):
         self.max_interval = max_interval
         self.socket_timeout = socket_timeout
         self.no_color = no_color
-        self.colored = app.log.colored(self.logfile,
+        self.colored = app.log.colored(
+            self.logfile,
             enabled=not no_color if no_color is not None else no_color,
         )
         self.pidfile = pidfile
@@ -120,7 +121,7 @@ class Beat(configurated):
             scheduler_info=scheduler.info,
             hmax_interval=humanize_seconds(beat.max_interval),
             max_interval=beat.max_interval,
-            )
+        )
 
     def set_process_title(self):
         arg_start = 'manage' in sys.argv[0] and 2 or 1

+ 2 - 1
celery/apps/worker.py

@@ -98,7 +98,8 @@ class Worker(WorkController):
         self.purge = purge
         self.no_color = no_color
         self._isatty = isatty(sys.stdout)
-        self.colored = self.app.log.colored(self.logfile,
+        self.colored = self.app.log.colored(
+            self.logfile,
             enabled=not no_color if no_color is not None else no_color
         )
 

+ 5 - 3
celery/backends/mongodb.py

@@ -182,11 +182,13 @@ class MongoBackend(BaseBackend):
 
     def cleanup(self):
         """Delete expired metadata."""
-        self.collection.remove({
+        self.collection.remove(
+            {
                 'date_done': {
                     '$lt': self.app.now() - self.expires,
-                 }
-        })
+                },
+            },
+        )
 
     def __reduce__(self, args=(), kwargs={}):
         kwargs.update(

+ 5 - 3
celery/backends/redis.py

@@ -22,6 +22,10 @@ except ImportError:         # pragma: no cover
     redis = None            # noqa
     ConnectionError = None  # noqa
 
+REDIS_MISSING = """\
+You need to install the redis library in order to use \
+the Redis result store backend."""
+
 
 class RedisBackend(KeyValueStoreBackend):
     """Redis task result store."""
@@ -52,9 +56,7 @@ class RedisBackend(KeyValueStoreBackend):
         super(RedisBackend, self).__init__(**kwargs)
         conf = self.app.conf
         if self.redis is None:
-            raise ImproperlyConfigured(
-                    'You need to install the redis library in order to use '
-                  + 'the Redis result store backend.')
+            raise ImproperlyConfigured(REDIS_MISSING)
 
         # For compatibility with the old REDIS_* configuration keys.
         def _get(key):

+ 9 - 8
celery/bin/base.py

@@ -331,14 +331,15 @@ class Command(object):
         return self.parser.parse_args(arguments)
 
     def create_parser(self, prog_name, command=None):
-        return self.prepare_parser(self.Parser(prog=prog_name,
-                           usage=self.usage(command),
-                           version=self.version,
-                           epilog=self.epilog,
-                           formatter=HelpFormatter(),
-                           description=self.description,
-                           option_list=(self.preload_options +
-                                        self.get_options())))
+        return self.prepare_parser(self.Parser(
+            prog=prog_name,
+            usage=self.usage(command),
+            version=self.version,
+            epilog=self.epilog,
+            formatter=HelpFormatter(),
+            description=self.description,
+            option_list=(self.preload_options + self.get_options())),
+        )
 
     def prepare_parser(self, parser):
         docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc]

+ 6 - 6
celery/bin/beat.py

@@ -77,13 +77,13 @@ class beat(Command):
     def get_options(self):
         c = self.app.conf
 
-        return ((
-                Option('--detach', action='store_true'),
-                Option('-s', '--schedule',
+        return (
+            (Option('--detach', action='store_true'),
+             Option('-s', '--schedule',
                     default=c.CELERYBEAT_SCHEDULE_FILENAME),
-                Option('--max-interval', type='float'),
-                Option('-S', '--scheduler', dest='scheduler_cls'),
-                Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
+             Option('--max-interval', type='float'),
+             Option('-S', '--scheduler', dest='scheduler_cls'),
+             Option('-l', '--loglevel', default=c.CELERYBEAT_LOG_LEVEL))
             + daemon_options(default_pidfile='celerybeat.pid')
             + tuple(self.app.user_options['beat'])
         )

+ 7 - 7
celery/bin/events.py

@@ -116,14 +116,14 @@ class events(Command):
         return set_process_title(prog, info=info)
 
     def get_options(self):
-        return ((
-                Option('-d', '--dump', action='store_true'),
-                Option('-c', '--camera'),
-                Option('--detach', action='store_true'),
-                Option('-F', '--frequency', '--freq',
+        return (
+            (Option('-d', '--dump', action='store_true'),
+             Option('-c', '--camera'),
+             Option('--detach', action='store_true'),
+             Option('-F', '--frequency', '--freq',
                     type='float', default=1.0),
-                Option('-r', '--maxrate'),
-                Option('-l', '--loglevel', default='INFO'))
+             Option('-r', '--maxrate'),
+             Option('-l', '--loglevel', default='INFO'))
             + daemon_options(default_pidfile='celeryev.pid')
             + tuple(self.app.user_options['events'])
         )

+ 4 - 3
celery/bin/multi.py

@@ -276,8 +276,9 @@ class MultiTool(object):
             left = len(P)
             if left:
                 pids = ', '.join(str(pid) for _, _, pid in P)
-                self.note(self.colored.blue('> Waiting for {0} {1} -> {2}...'.format(
-                    left, pluralize(left, 'node'), pids)), newline=False)
+                self.note(self.colored.blue(
+                    '> Waiting for {0} {1} -> {2}...'.format(
+                        left, pluralize(left, 'node'), pids)), newline=False)
 
         if retry:
             note_waiting()
@@ -457,7 +458,7 @@ def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''):
             name = nodename
         else:
             nodename = '%s%s' % (prefix, name)
-            this_name = options['-n'] = '%s@%s' % (nodename,  this_suffix)
+            this_name = options['-n'] = '%s@%s' % (nodename, this_suffix)
         expand = abbreviations({'%h': this_name,
                                 '%n': name,
                                 '%N': nodename,

+ 2 - 2
celery/bootsteps.py

@@ -222,8 +222,8 @@ class Namespace(object):
         last = self._find_last()
         self._firstpass(steps)
         it = ((C, C.requires) for C in values(steps))
-        G = self.graph = DependencyGraph(it,
-            formatter=self.GraphFormatter(root=last),
+        G = self.graph = DependencyGraph(
+            it, formatter=self.GraphFormatter(root=last),
         )
         if last:
             for obj in G:

+ 6 - 4
celery/datastructures.py

@@ -81,7 +81,8 @@ class GraphFormatter(object):
         )
 
     def head(self, **attrs):
-        return self.FMT(self._head, id=self.id, type=self.type,
+        return self.FMT(
+            self._head, id=self.id, type=self.type,
             attrs=self.attrs(attrs, self.graph_scheme),
         )
 
@@ -109,13 +110,14 @@ class GraphFormatter(object):
         ))
 
     def draw_edge(self, a, b, scheme=None, attrs=None):
-        return self.FMT(self._edge, self.label(a), self.label(b),
+        return self.FMT(
+            self._edge, self.label(a), self.label(b),
             dir=self.direction, attrs=self.attrs(attrs, self.edge_scheme),
         )
 
     def draw_node(self, obj, scheme=None, attrs=None):
-        return self.FMT(self._node, self.label(obj),
-            attrs=self.attrs(attrs, scheme),
+        return self.FMT(
+            self._node, self.label(obj), attrs=self.attrs(attrs, scheme),
         )
 
 

+ 4 - 3
celery/events/cursesmon.py

@@ -256,9 +256,10 @@ class CursesMonitor(object):  # pragma: no cover
             y = count(xs)
             task = self.state.tasks[self.selected_task]
             info = task.info(extra=['state'])
-            infoitems = [('args', info.pop('args', None)),
-                         ('kwargs', info.pop('kwargs', None))
-                        ] + list(info.items())
+            infoitems = [
+                ('args', info.pop('args', None)),
+                ('kwargs', info.pop('kwargs', None))
+            ] + list(info.items())
             for key, value in infoitems:
                 if key is None:
                     continue

+ 8 - 8
celery/loaders/base.py

@@ -30,19 +30,19 @@ from celery.utils.imports import (
 
 BUILTIN_MODULES = frozenset()
 
-ERROR_ENVVAR_NOT_SET = (
-"""The environment variable {0!r} is not set,
+ERROR_ENVVAR_NOT_SET = """\
+The environment variable {0!r} is not set,
 and as such the configuration could not be loaded.
 Please set this variable and make it point to
-a configuration module.""")
+a configuration module."""
 
 _RACE_PROTECTION = False
-CONFIG_INVALID_NAME = """
+CONFIG_INVALID_NAME = """\
 Error: Module '{module}' doesn't exist, or it's not a valid \
 Python module name.
 """
 
-CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """
+CONFIG_WITH_SUFFIX = CONFIG_INVALID_NAME + """\
 Did you mean '{suggest}'?
 """
 
@@ -249,9 +249,9 @@ class BaseLoader(object):
         return {}
 
     def autodiscover_tasks(self, packages, related_name='tasks'):
-        self.task_modules.update(mod.__name__
-            for mod in autodiscover_tasks(packages, related_name) if mod
-        )
+        self.task_modules.update(
+            mod.__name__ for mod in autodiscover_tasks(packages,
+                                                       related_name) if mod)
 
     @property
     def conf(self):

+ 9 - 6
celery/schedules.py

@@ -207,7 +207,10 @@ class crontab_parser(object):
         if len(toks) > 1:
             to = self._expand_number(toks[1])
             if to < fr:  # Wrap around max_ if necessary
-                return range(fr, self.min_ + self.max_) + range(self.min_, to + 1)
+                return (
+                    range(fr, self.min_ + self.max_) +
+                    range(self.min_, to + 1)
+                )
             return range(fr, to + 1)
         return [fr]
 
@@ -401,11 +404,11 @@ class crontab(schedule):
                 datedata.moy = 0
         roll_over()
 
-        while not (datetime(year=datedata.year,
-                            month=months_of_year[datedata.moy],
-                            day=days_of_month[datedata.dom]
-                           ).isoweekday() % 7
-                  ) in self.day_of_week:
+        while not datetime(
+                year=datedata.year,
+                month=months_of_year[datedata.moy],
+                day=days_of_month[datedata.dom]) \
+                    .isoweekday() % 7 in self.day_of_week:
             datedata.dom += 1
             roll_over()
 

+ 4 - 4
celery/task/__init__.py

@@ -40,10 +40,10 @@ class module(MagicModule):
 
 old_module, new_module = recreate_module(__name__,  # pragma: no cover
     by_module={
-        'celery.task.base':   ['BaseTask', 'Task', 'PeriodicTask',
-                               'task', 'periodic_task'],
-        'celery.canvas':      ['group', 'chord', 'subtask'],
-        'celery.task.sets':   ['TaskSet'],
+        'celery.task.base': ['BaseTask', 'Task', 'PeriodicTask',
+                             'task', 'periodic_task'],
+        'celery.canvas': ['group', 'chord', 'subtask'],
+        'celery.task.sets': ['TaskSet'],
     },
     base=module,
     __package__='celery.task',

+ 2 - 3
celery/task/http.py

@@ -45,7 +45,6 @@ else:
             return value.encode('utf-8')
         return value
 
-
     def utf8dict(tup):  # noqa
         """With a dict's items() tuple return a new dict with any utf-8
         keys/values encoded."""
@@ -112,8 +111,8 @@ class MutableURL(object):
         scheme, netloc, path, params, query, fragment = self.parts
         query = urlencode(utf8dict(items(self.query)))
         components = [scheme + '://', netloc, path or '/',
-                      ';{0}'.format(params)   if params   else '',
-                      '?{0}'.format(query)    if query    else '',
+                      ';{0}'.format(params) if params else '',
+                      '?{0}'.format(query) if query else '',
                       '#{0}'.format(fragment) if fragment else '']
         return ''.join(filter(None, components))
 

+ 12 - 13
celery/tests/backends/test_database.py

@@ -21,7 +21,7 @@ from celery.tests.utils import (
 try:
     import sqlalchemy  # noqa
 except ImportError:
-    DatabaseBackend = Task = TaskSet = None
+    DatabaseBackend = Task = TaskSet = None  # noqa
 else:
     from celery.backends.database import DatabaseBackend
     from celery.backends.database.models import Task, TaskSet
@@ -62,12 +62,11 @@ class test_DatabaseBackend(Case):
 
     def test_missing_task_meta_is_dict_with_pending(self):
         tb = DatabaseBackend()
-        self.assertDictContainsSubset({
-            'status': states.PENDING,
-            'task_id': 'xxx-does-not-exist-at-all',
-            'result': None,
-            'traceback': None,
-            }, tb.get_task_meta('xxx-does-not-exist-at-all'))
+        self.assertDictContainsSubset(
+            {'status': states.PENDING,
+             'task_id': 'xxx-does-not-exist-at-all',
+             'result': None,
+             'traceback': None}, tb.get_task_meta('xxx-does-not-exist-at-all'))
 
     def test_mark_as_done(self):
         tb = DatabaseBackend()
@@ -113,9 +112,9 @@ class test_DatabaseBackend(Case):
             import traceback
             trace = '\n'.join(traceback.format_stack())
             tb.mark_as_retry(tid, exception, traceback=trace)
-        self.assertEqual(tb.get_status(tid), states.RETRY)
-        self.assertIsInstance(tb.get_result(tid), KeyError)
-        self.assertEqual(tb.get_traceback(tid), trace)
+            self.assertEqual(tb.get_status(tid), states.RETRY)
+            self.assertIsInstance(tb.get_result(tid), KeyError)
+            self.assertEqual(tb.get_traceback(tid), trace)
 
     def test_mark_as_failure(self):
         tb = DatabaseBackend()
@@ -127,9 +126,9 @@ class test_DatabaseBackend(Case):
             import traceback
             trace = '\n'.join(traceback.format_stack())
             tb.mark_as_failure(tid3, exception, traceback=trace)
-        self.assertEqual(tb.get_status(tid3), states.FAILURE)
-        self.assertIsInstance(tb.get_result(tid3), KeyError)
-        self.assertEqual(tb.get_traceback(tid3), trace)
+            self.assertEqual(tb.get_status(tid3), states.FAILURE)
+            self.assertIsInstance(tb.get_result(tid3), KeyError)
+            self.assertEqual(tb.get_traceback(tid3), trace)
 
     def test_forget(self):
         tb = DatabaseBackend(backend='memory://')

+ 3 - 3
celery/tests/concurrency/test_concurrency.py

@@ -28,9 +28,9 @@ class test_BasePool(Case):
                      callback=gen_callback('callback'),
                      accept_callback=gen_callback('accept_callback'))
 
-        self.assertDictContainsSubset({
-                              'target': (1, (8, 16)),
-                              'callback': (2, (42, ))}, scratch)
+        self.assertDictContainsSubset(
+            {'target': (1, (8, 16)),
+             'callback': (2, (42, ))}, scratch)
         pa1 = scratch['accept_callback']
         self.assertEqual(0, pa1[0])
         self.assertEqual(pa1[1][0], os.getpid())

+ 3 - 3
celery/tests/slow/test_buckets.py

@@ -30,9 +30,9 @@ class MockJob(object):
 
     def __eq__(self, other):
         if isinstance(other, self.__class__):
-            return bool(self.id == other.id \
-                    and self.name == other.name \
-                    and self.args == other.args \
+            return bool(self.id == other.id
+                    and self.name == other.name
+                    and self.args == other.args
                     and self.kwargs == other.kwargs)
         else:
             return self == other

+ 4 - 1
celery/tests/tasks/test_tasks.py

@@ -708,7 +708,10 @@ class test_crontab_parser(Case):
         self.assertEqual(crontab_parser(8).parse('*/2'), set([0, 2, 4, 6]))
         self.assertEqual(crontab_parser().parse('2-9/5'), set([2, 7]))
         self.assertEqual(crontab_parser().parse('2-10/5'), set([2, 7]))
-        self.assertEqual(crontab_parser(min_=1).parse('55-5/3'), set([55, 58, 1, 4])) 
+        self.assertEqual(
+            crontab_parser(min_=1).parse('55-5/3'),
+            set([55, 58, 1, 4]),
+        )
         self.assertEqual(crontab_parser().parse('2-11/5,3'), set([2, 3, 7]))
         self.assertEqual(crontab_parser().parse('2-4/3,*/5,0-21/4'),
                 set([0, 2, 4, 5, 8, 10, 12, 15, 16,

+ 12 - 8
celery/tests/utilities/test_info.py

@@ -18,14 +18,18 @@ RANDTEXT_RES = """\
     lazy dog\
 """
 
-QUEUES = {'queue1': {
-            'exchange': 'exchange1',
-            'exchange_type': 'type1',
-            'routing_key': 'bind1'},
-         'queue2': {
-            'exchange': 'exchange2',
-            'exchange_type': 'type2',
-            'routing_key': 'bind2'}}
+QUEUES = {
+    'queue1': {
+        'exchange': 'exchange1',
+        'exchange_type': 'type1',
+        'routing_key': 'bind1',
+    },
+    'queue2': {
+        'exchange': 'exchange2',
+        'exchange_type': 'type2',
+        'routing_key': 'bind2',
+    },
+}
 
 
 QUEUE_FORMAT1 = '. queue1           exchange=exchange1(type1) key=bind1'

+ 8 - 7
celery/tests/utils.py

@@ -1,11 +1,11 @@
 from __future__ import absolute_import
 
 try:
-    import unittest
+    import unittest  # noqa
     unittest.skip
     from unittest.util import safe_repr, unorderable_list_difference
 except AttributeError:
-    import unittest2 as unittest
+    import unittest2 as unittest  # noqa
     from unittest2.util import safe_repr, unorderable_list_difference  # noqa
 
 import importlib
@@ -106,7 +106,7 @@ class _AssertWarnsContext(_AssertRaisesBaseContext):
             if first_matching is None:
                 first_matching = w
             if (self.expected_regex is not None and
-                not self.expected_regex.search(str(w))):
+                    not self.expected_regex.search(str(w))):
                 continue
             # store warning for later retrieval
             self.warning = w
@@ -161,6 +161,7 @@ class Case(unittest.TestCase):
         self.fail(self._formatMessage(msg, standard_msg))
 
     def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
+        missing = unexpected = None
         try:
             expected = sorted(expected_seq)
             actual = sorted(actual_seq)
@@ -176,10 +177,10 @@ class Case(unittest.TestCase):
         errors = []
         if missing:
             errors.append('Expected, but missing:\n    %s' % (
-                           safe_repr(missing)))
+                          safe_repr(missing)))
         if unexpected:
             errors.append('Unexpected, but present:\n    %s' % (
-                           safe_repr(unexpected)))
+                          safe_repr(unexpected)))
         if errors:
             standardMsg = '\n'.join(errors)
             self.fail(self._formatMessage(msg, standardMsg))
@@ -188,8 +189,8 @@ class Case(unittest.TestCase):
 class AppCase(Case):
 
     def setUp(self):
-        from ..app import current_app
-        from ..backends.cache import CacheBackend, DummyClient
+        from celery.app import current_app
+        from celery.backends.cache import CacheBackend, DummyClient
         app = self.app = self._current_app = current_app()
         if isinstance(app.backend, CacheBackend):
             if isinstance(app.backend.client, DummyClient):

+ 1 - 1
celery/tests/worker/test_control.py

@@ -258,7 +258,7 @@ class test_ControlPanel(Case):
         app.conf.CELERY_DISABLE_RATE_LIMITS = True
         try:
             e = self.panel.handle('rate_limit', arguments=dict(
-                 task_name=mytask.name, rate_limit='100/m'))
+                    task_name=mytask.name, rate_limit='100/m'))
             self.assertIn('rate limits disabled', e.get('error'))
         finally:
             app.conf.CELERY_DISABLE_RATE_LIMITS = False

+ 3 - 3
celery/utils/__init__.py

@@ -129,7 +129,7 @@ def fun_takes_kwargs(fun, kwlist=[]):
 
     """
     S = getattr(fun, 'argspec', getargspec(fun))
-    if S.keywords != None:
+    if S.keywords is not None:
         return kwlist
     return [kw for kw in kwlist if kw in S.args]
 
@@ -190,8 +190,8 @@ def maybe_reraise():
 
 
 def strtobool(term, table={'false': False, 'no': False, '0': False,
-                             'true':  True, 'yes': True,  '1': True,
-                             'on':    True, 'off': False}):
+                           'true': True, 'yes': True, '1': True,
+                           'on': True, 'off': False}):
     if isinstance(term, string_t):
         try:
             return table[term.lower()]

+ 1 - 1
celery/utils/dispatch/saferef.py

@@ -243,7 +243,7 @@ class BoundNonDescriptorMethodWeakref(BoundMethodWeakref):  # pragma: no cover
         """
         assert getattr(target.__self__, target.__name__) == target, \
                "method %s isn't available as the attribute %s of %s" % (
-                    target, target.__name__, target.__self__)
+                   target, target.__name__, target.__self__)
         super(BoundNonDescriptorMethodWeakref, self).__init__(target,
                                                               on_delete)
 

+ 4 - 4
celery/utils/timeutils.py

@@ -35,10 +35,10 @@ RATE_MODIFIER_MAP = {'s': lambda n: n,
 
 HAVE_TIMEDELTA_TOTAL_SECONDS = hasattr(timedelta, 'total_seconds')
 
-TIME_UNITS = (('day',    60 * 60 * 24.0, lambda n: format(n, '.2f')),
-              ('hour',   60 * 60.0,      lambda n: format(n, '.2f')),
-              ('minute', 60.0,           lambda n: format(n, '.2f')),
-              ('second', 1.0,            lambda n: format(n, '.2f')))
+TIME_UNITS = (('day', 60 * 60 * 24.0, lambda n: format(n, '.2f')),
+              ('hour', 60 * 60.0, lambda n: format(n, '.2f')),
+              ('minute', 60.0, lambda n: format(n, '.2f')),
+              ('second', 1.0, lambda n: format(n, '.2f')))
 
 ZERO = timedelta(0)
 

+ 1 - 1
celery/worker/__init__.py

@@ -17,8 +17,8 @@ import sys
 import traceback
 
 from billiard import cpu_count
+from billiard.util import Finalize
 from kombu.syn import detect_environment
-from kombu.utils.finalize import Finalize
 
 from celery import bootsteps
 from celery import concurrency as _concurrency

+ 1 - 1
celery/worker/consumer.py

@@ -284,7 +284,7 @@ class Consumer(object):
             q = queues[queue]
         else:
             exchange = queue if exchange is None else exchange
-            exchange_type = ('direct' if   exchange_type is None
+            exchange_type = ('direct' if exchange_type is None
                                       else exchange_type)
             q = queues.select_add(queue,
                                   exchange=exchange,

+ 3 - 4
celery/worker/control.py

@@ -169,8 +169,7 @@ def dump_reserved(panel, safe=False, **kwargs):
 
 @Panel.register
 def dump_active(panel, safe=False, **kwargs):
-    return [request.info(safe=safe)
-                for request in state.active_requests]
+    return [request.info(safe=safe) for request in state.active_requests]
 
 
 @Panel.register
@@ -200,8 +199,8 @@ def dump_tasks(panel, taskinfoitems=None, **kwargs):
 
     def _extract_info(task):
         fields = dict((field, str(getattr(task, field, None)))
-                        for field in taskinfoitems
-                            if getattr(task, field, None) is not None)
+                            for field in taskinfoitems
+                                if getattr(task, field, None) is not None)
         if fields:
             info = ['='.join(f) for f in items(fields)]
             return '{0} [{1}]'.format(task.name, ' '.join(info))

+ 2 - 2
celery/worker/heartbeat.py

@@ -45,8 +45,8 @@ class Heart(object):
     def start(self):
         if self.eventer.enabled:
             self._send('worker-online')
-            self.tref = self.timer.apply_interval(self.interval * 1000.0,
-                    self._send, ('worker-heartbeat', ))
+            self.tref = self.timer.apply_interval(
+                self.interval * 1000.0, self._send, ('worker-heartbeat', ))
 
     def stop(self):
         if self.tref is not None:

+ 27 - 26
celery/worker/job.py

@@ -98,9 +98,9 @@ class Request(object):
     retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
 
     def __init__(self, body, on_ack=noop,
-            hostname=None, eventer=None, app=None,
-            connection_errors=None, request_dict=None,
-            delivery_info=None, task=None, **opts):
+                 hostname=None, eventer=None, app=None,
+                 connection_errors=None, request_dict=None,
+                 delivery_info=None, task=None, **opts):
         self.app = app or app_or_default(app)
         name = self.name = body['task']
         self.id = body['id']
@@ -110,7 +110,7 @@ class Request(object):
             self.kwargs.items
         except AttributeError:
             raise exceptions.InvalidTaskError(
-                    'Task keyword arguments is not a mapping')
+                'Task keyword arguments is not a mapping')
         if NEEDS_KWDICT:
             self.kwargs = kwdict(self.kwargs)
         eta = body.get('eta')
@@ -152,7 +152,8 @@ class Request(object):
     def from_message(cls, message, body, **kwargs):
         # should be deprecated
         return Request(body,
-            delivery_info=getattr(message, 'delivery_info', None), **kwargs)
+                       delivery_info=getattr(message, 'delivery_info', None),
+                       **kwargs)
 
     def extend_with_default_kwargs(self):
         """Extend the tasks keyword arguments with standard task arguments.
@@ -177,7 +178,7 @@ class Request(object):
         fun = self.task.run
         supported_keys = fun_takes_kwargs(fun, default_kwargs)
         extend_with = dict((key, val) for key, val in items(default_kwargs)
-                                if key in supported_keys)
+                           if key in supported_keys)
         kwargs.update(extend_with)
         return kwargs
 
@@ -238,8 +239,8 @@ class Request(object):
                         'hostname': self.hostname, 'is_eager': False,
                         'delivery_info': self.delivery_info})
         retval = trace_task(self.task, self.id, self.args, kwargs, request,
-                               **{'hostname': self.hostname,
-                                  'loader': self.app.loader})
+                            **{'hostname': self.hostname,
+                               'loader': self.app.loader})
         self.acknowledge()
         return retval
 
@@ -280,7 +281,7 @@ class Request(object):
         if self.id in revoked_tasks:
             warn('Skipping revoked task: %s[%s]', self.name, self.id)
             self._announce_revoked('expired' if expired else 'revoked',
-                False, None, expired)
+                                   False, None, expired)
             return True
         return False
 
@@ -337,10 +338,10 @@ class Request(object):
         if _does_info:
             now = now or time.time()
             runtime = self.time_start and (time.time() - self.time_start) or 0
-            info(self.success_msg.strip(), {
-                    'id': self.id, 'name': self.name,
-                    'return_value': self.repr_result(ret_value),
-                    'runtime': runtime})
+            info(self.success_msg.strip(),
+                 {'id': self.id, 'name': self.name,
+                  'return_value': self.repr_result(ret_value),
+                  'runtime': runtime})
 
     def on_retry(self, exc_info):
         """Handler called if the task should be retried."""
@@ -348,13 +349,13 @@ class Request(object):
             self.acknowledge()
 
         self.send_event('task-retried',
-                         exception=safe_repr(exc_info.exception.exc),
-                         traceback=safe_str(exc_info.traceback))
+                        exception=safe_repr(exc_info.exception.exc),
+                        traceback=safe_str(exc_info.traceback))
 
         if _does_info:
-            info(self.retry_msg.strip(), {
-                'id': self.id, 'name': self.name,
-                'exc': exc_info.exception})
+            info(self.retry_msg.strip(),
+                 {'id': self.id, 'name': self.name,
+                  'exc': exc_info.exception})
 
     def on_failure(self, exc_info):
         """Handler called if the task raised an exception."""
@@ -392,8 +393,7 @@ class Request(object):
         description = 'raised exception'
         severity = logging.ERROR
         self.send_event('task-failed',
-                         exception=exception,
-                         traceback=traceback)
+                        exception=exception, traceback=traceback)
 
         if internal:
             if isinstance(einfo.exception, Ignore):
@@ -453,13 +453,14 @@ class Request(object):
 
     def __str__(self):
         return '{0.name}[{0.id}]{1}{2}'.format(self,
-                ' eta:[{0}]'.format(self.eta) if self.eta else '',
-                ' expires:[{0}]'.format(self.expires) if self.expires else '')
+               ' eta:[{0}]'.format(self.eta) if self.eta else '',
+               ' expires:[{0}]'.format(self.expires) if self.expires else '')
     shortinfo = __str__
 
     def __repr__(self):
-        return '<{0} {1}: {2}>'.format(type(self).__name__, self.id,
-                reprcall(self.name, self.args, self.kwargs))
+        return '<{0} {1}: {2}>'.format(
+            type(self).__name__, self.id,
+            reprcall(self.name, self.args, self.kwargs))
 
     @property
     def tzlocal(self):
@@ -470,7 +471,7 @@ class Request(object):
     @property
     def store_errors(self):
         return (not self.task.ignore_result
-                 or self.task.store_errors_even_if_ignored)
+                or self.task.store_errors_even_if_ignored)
 
     @property
     def task_id(self):
@@ -494,7 +495,7 @@ class Request(object):
 class TaskRequest(Request):
 
     def __init__(self, name, id, args=(), kwargs={},
-            eta=None, expires=None, **options):
+                 eta=None, expires=None, **options):
         """Compatibility class."""
 
         super(TaskRequest, self).__init__({

+ 5 - 5
celery/worker/loops.py

@@ -24,9 +24,9 @@ AMQHEARTBEAT_RATE = 2.0
 
 
 def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
-        heartbeat, handle_unknown_message, handle_unknown_task,
-        handle_invalid_task, clock, sleep=sleep, min=min, Empty=Empty,
-        hbrate=AMQHEARTBEAT_RATE):
+             heartbeat, handle_unknown_message, handle_unknown_task,
+             handle_invalid_task, clock, sleep=sleep, min=min, Empty=Empty,
+             hbrate=AMQHEARTBEAT_RATE):
     """Non-blocking eventloop consuming messages until connection is lost,
     or shutdown is requested."""
 
@@ -122,8 +122,8 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
 
 
 def synloop(obj, connection, consumer, strategies, ns, hub, qos,
-        heartbeat, handle_unknown_message, handle_unknown_task,
-        handle_invalid_task, clock, **kwargs):
+            heartbeat, handle_unknown_message, handle_unknown_task,
+            handle_invalid_task, clock, **kwargs):
     """Fallback blocking eventloop for transports that doesn't support AIO."""
 
     def on_task_received(body, message):

+ 2 - 1
celery/worker/pidbox.py

@@ -20,7 +20,8 @@ class Pidbox(object):
     def __init__(self, c):
         self.c = c
         self.hostname = c.hostname
-        self.node = c.app.control.mailbox.Node(c.hostname,
+        self.node = c.app.control.mailbox.Node(
+            c.hostname,
             handlers=control.Panel.data,
             state=AttributeDict(app=c.app, hostname=c.hostname, consumer=c),
         )

+ 4 - 4
celery/worker/state.py

@@ -12,6 +12,7 @@
 from __future__ import absolute_import
 
 import os
+import sys
 import platform
 import shelve
 
@@ -97,9 +98,9 @@ if C_BENCH:  # pragma: no cover
         def on_shutdown():
             if bench_first is not None and bench_last is not None:
                 print('- Time spent in benchmark: {0!r}'.format(
-                        bench_last - bench_first))
+                      bench_last - bench_first))
                 print('- Avg: {0}'.format(
-                        sum(bench_sample) / len(bench_sample)))
+                      sum(bench_sample) / len(bench_sample)))
                 memdump()
 
     def task_reserved(request):  # noqa
@@ -113,7 +114,6 @@ if C_BENCH:  # pragma: no cover
 
         return __reserved(request)
 
-    import sys
     def task_ready(request):  # noqa
         global all_count
         global bench_start
@@ -123,7 +123,7 @@ if C_BENCH:  # pragma: no cover
             now = time()
             diff = now - bench_start
             print('- Time spent processing %s tasks (since first '
-                    'task received): ~{0:.4f}s\n'.format(bench_every, diff))
+                  'task received): ~{0:.4f}s\n'.format(bench_every, diff))
             sys.stdout.flush()
             bench_start = bench_last = now
             bench_sample.append(diff)

+ 3 - 3
celery/worker/strategy.py

@@ -20,7 +20,7 @@ def default(task, app, consumer):
 
     def task_message_handler(message, body, ack):
         handle(Req(body, on_ack=ack, app=app, hostname=hostname,
-                         eventer=eventer, task=task,
-                         connection_errors=connection_errors,
-                         delivery_info=message.delivery_info))
+                   eventer=eventer, task=task,
+                   connection_errors=connection_errors,
+                   delivery_info=message.delivery_info))
     return task_message_handler

+ 46 - 0
docs/getting-started/brokers/redis.rst

@@ -39,6 +39,21 @@ Where the URL is in the format of::
 all fields after the scheme are optional, and will default to localhost on port 6379,
 using database 0.
 
+.. _redis-visibility_timeout:
+
+Visibility Timeout
+------------------
+
+The visibility timeout defines the number of seconds to wait
+for the worker to acknowledge the task before the message is redelivered
+to another worker.  Be sure to see :ref:`redis-caveats` below.
+
+This option is set via the :setting:`BROKER_TRANSPORT_OPTIONS` setting::
+
+    BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}  # 1 hour.
+
+The default visibility timeout for Redis is 1 hour.
+
 .. _redis-results-configuration:
 
 Results
@@ -51,3 +66,34 @@ you should configure these settings::
 
 For a complete list of options supported by the Redis result backend, see
 :ref:`conf-redis-result-backend`
+
+.. _redis-caveats:
+
+Caveats
+=======
+
+- If a task is not acknowledged within the :ref:`redis-visibility_timeout`
+  the task will be redelivered to another worker and executed.
+
+    This causes problems with ETA/countdown/retry tasks where the
+    time to execute exceeds the visibility timeout; in fact if that
+    happens it will be executed again, and again in a loop.
+
+    So you have to increase the visibility timeout to match
+    the time of the longest ETA you are planning to use.
+
+    Note that Celery will redeliver messages at worker shutdown,
+    so having a long visibility timeout will only delay the redelivery
+    of 'lost' tasks in the event of a power failure or forcefully terminated
+    workers.
+
+    Periodic tasks will not be affected by the visibility timeout,
+    as this is a concept separate from ETA/countdown.
+
+    You can increase this timeout by configuring a transport option
+    with the same name:
+
+        BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
+
+    The value must be an int describing the number of seconds.
+

+ 1 - 1
pavement.py

@@ -59,7 +59,7 @@ def verifyconfigref(options):
 def flake8(options):
     noerror = getattr(options, 'noerror', False)
     complexity = getattr(options, 'complexity', 22)
-    sh("""flake8 celery | perl -mstrict -mwarnings -nle'
+    sh("""flake8 --ignore=E126,E127,E128 celery | perl -mstrict -mwarnings -nle'
         my $ignore = m/too complex \((\d+)\)/ && $1 le {0};
         if (! $ignore) {{ print STDERR; our $FOUND_FLAKE = 1 }}
     }}{{exit $FOUND_FLAKE;

+ 2 - 2
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz
-billiard>=2.7.3.18
-kombu>=2.4.8,<3.0
+billiard>=2.7.3.19
+kombu>=2.5.3

+ 2 - 2
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 [bdist_rpm]
 requires = pytz
-           billiard >= 2.7.3.18
-           kombu >= 2.4.8
+           billiard >= 2.7.3.19
+           kombu >= 2.5.3