ソースを参照

Merge branch '3.0'

Conflicts:
	Changelog
	README.rst
	celery/__init__.py
	celery/app/base.py
	celery/app/builtins.py
	celery/apps/worker.py
	celery/events/__init__.py
	celery/result.py
	celery/schedules.py
	celery/tests/backends/test_cache.py
	celery/tests/backends/test_redis.py
	celery/tests/events/test_events.py
	celery/tests/tasks/test_chord.py
	celery/tests/worker/test_worker.py
	celery/worker/autoreload.py
	celery/worker/consumer.py
	celery/worker/hub.py
	docs/includes/introduction.txt
	docs/tutorials/daemonizing.rst
	requirements/default.txt
	setup.cfg
Ask Solem 12 年 前
コミット
e75cc3b00e

+ 1 - 5
README.rst

@@ -9,7 +9,7 @@
 :Download: http://pypi.python.org/pypi/celery/
 :Source: http://github.com/celery/celery/
 :Keywords: task queue, job queue, asynchronous, rabbitmq, amqp, redis,
-  python, webhooks, queue, distributed, ironmq, ironcache
+  python, webhooks, queue, distributed
 
 --
 
@@ -128,7 +128,6 @@ It supports...
         - MongoDB_, Beanstalk_,
         - CouchDB_, SQLAlchemy_,
         - Django ORM, Amazon SQS,
-        - IronMQ_
         - and more...
 
     - **Concurrency**
@@ -141,7 +140,6 @@ It supports...
         - memcached, MongoDB
         - SQLAlchemy, Django ORM
         - Apache Cassandra
-        - IronCache_
 
     - **Serialization**
 
@@ -158,8 +156,6 @@ It supports...
 .. _Beanstalk: http://kr.github.com/beanstalkd
 .. _CouchDB: http://couchdb.apache.org
 .. _SQLAlchemy: http://sqlalchemy.org
-.. _IronMQ: http://iron.io/mq
-.. _IronCache: http://iron.io/cache
 
 Framework Integration
 =====================

+ 28 - 6
celery/app/builtins.py

@@ -69,14 +69,24 @@ def add_unlock_chord_task(app):
     from celery.exceptions import ChordError
     from celery.result import from_serializable
 
+    default_propagate = app.conf.CELERY_CHORD_PROPAGATES
+
     @app.task(name='celery.chord_unlock', max_retries=None,
               default_retry_delay=1, ignore_result=True, _force_evaluate=True)
-    def unlock_chord(group_id, callback, interval=None, propagate=True,
+    def unlock_chord(group_id, callback, interval=None, propagate=None,
                      max_retries=None, result=None,
                      Result=app.AsyncResult, GroupResult=app.GroupResult,
                      from_serializable=from_serializable):
+        # if propagate is disabled exceptions raised by chord tasks
+        # will be sent as part of the result list to the chord callback.
+        # Since 3.1 propagate will be enabled by default, and instead
+        # the chord callback changes state to FAILURE with the
+        # exception set to ChordError.
+        propagate = default_propagate if propagate is None else propagate
         if interval is None:
             interval = unlock_chord.default_retry_delay
+
+        # check if the task group is ready, and if so apply the callback.
         deps = GroupResult(
             group_id,
             [from_serializable(r, Result=Result) for r in result],
@@ -88,14 +98,24 @@ def add_unlock_chord_task(app):
             try:
                 ret = j(propagate=propagate)
             except Exception as exc:
-                culprit = next(deps._failed_join_report())
+                try:
+                    culprit = deps._failed_join_report().next()
+                    reason = 'Dependency {0.id} raised {1!r}'.format(
+                        culprit, exc,
+                    )
+                except StopIteration:
+                    reason = repr(exc)
 
                 app._tasks[callback.task].backend.fail_from_current_stack(
-                    callback.id, exc=ChordError('Dependency %s raised %r' % (
-                        culprit.id, exc)),
+                    callback.id, exc=ChordError(reason),
                 )
             else:
-                callback.delay(ret)
+                try:
+                    callback.delay(ret)
+                except Exception, exc:
+                    app._tasks[callback.task].backend.fail_from_current_stack(
+                        callback.id,
+                        exc=ChordError('Call callback error: %r' % (exc, )))
         else:
             return unlock_chord.retry(countdown=interval,
                                       max_retries=max_retries)
@@ -291,6 +311,7 @@ def add_chord_task(app):
     from celery import group
     from celery.canvas import maybe_subtask
     _app = app
+    default_propagate = app.conf.CELERY_CHORD_PROPAGATES
 
     class Chord(app.Task):
         app = _app
@@ -299,7 +320,8 @@ def add_chord_task(app):
         ignore_result = False
 
         def run(self, header, body, partial_args=(), interval=1, countdown=1,
-                max_retries=None, propagate=True, eager=False, **kwargs):
+                max_retries=None, propagate=None, eager=False, **kwargs):
+            propagate = default_propagate if propagate is None else propagate
             group_id = uuid()
             AsyncResult = self.app.AsyncResult
             prepare_member = self._prepare_member

+ 3 - 1
celery/app/defaults.py

@@ -70,7 +70,7 @@ NAMESPACES = {
         'CONNECTION_RETRY': Option(True, type='bool'),
         'CONNECTION_MAX_RETRIES': Option(100, type='int'),
         'HEARTBEAT': Option(10, type='int'),
-        'HEARTBEAT_CHECKRATE': Option(2.0, type='int'),
+        'HEARTBEAT_CHECKRATE': Option(3.0, type='int'),
         'POOL_LIMIT': Option(10, type='int'),
         'USE_SSL': Option(False, type='bool'),
         'TRANSPORT': Option(type='string'),
@@ -98,6 +98,8 @@ NAMESPACES = {
         'BROADCAST_EXCHANGE_TYPE': Option('fanout'),
         'CACHE_BACKEND': Option(),
         'CACHE_BACKEND_OPTIONS': Option({}, type='dict'),
+        # chord propagate will be True from v3.1
+        'CHORD_PROPAGATES': Option(False, type='bool'),
         'CREATE_MISSING_QUEUES': Option(True, type='bool'),
         'DEFAULT_RATE_LIMIT': Option(type='string'),
         'DISABLE_RATE_LIMITS': Option(False, type='bool'),

+ 4 - 2
celery/apps/worker.py

@@ -20,6 +20,7 @@ import warnings
 from functools import partial
 
 from billiard import current_process
+from kombu.utils.encoding import safe_str
 
 from celery import VERSION_BANNER, platforms, signals
 from celery.app.abstract import from_config
@@ -139,7 +140,7 @@ class Worker(WorkController):
 
     def on_consumer_ready(self, consumer):
         signals.worker_ready.send(sender=consumer)
-        print('{0.hostname} ready.'.format(self))
+        print('{0} ready.'.format(safe_str(self.hostname), ))
 
     def setup_logging(self, colorize=None):
         if colorize is None and self.no_color is not None:
@@ -263,7 +264,8 @@ if not is_jython:
         _shutdown_handler, sig='SIGQUIT', how='Cold', exc=SystemTerminate,
     )
 else:
-    install_worker_term_handler = lambda *a, **kw: None
+    install_worker_term_handler = \
+        install_worker_term_hard_handler = lambda *a, **kw: None
 
 
 def on_SIGINT(worker):

+ 3 - 1
celery/backends/base.py

@@ -423,11 +423,13 @@ class KeyValueStoreBackend(BaseBackend):
         else:
             self.fallback_chord_unlock(group_id, body, result, **kwargs)
 
-    def on_chord_part_return(self, task, propagate=True):
+    def on_chord_part_return(self, task, propagate=None):
         if not self.implements_incr:
             return
         from celery import subtask
         from celery.result import GroupResult
+        if propagate is None:
+            propagate = self.app.conf.CELERY_CHORD_PROPAGATES
         gid = task.request.group
         if not gid:
             return

+ 1 - 0
celery/backends/cache.py

@@ -115,6 +115,7 @@ class CacheBackend(KeyValueStoreBackend):
 
     def on_chord_apply(self, group_id, body, result=None, **kwargs):
         self.client.set(self.get_key_for_chord(group_id), '0', time=86400)
+        self.app.GroupResult(group_id, result).save()
 
     def incr(self, key):
         return self.client.incr(key)

+ 3 - 1
celery/canvas.py

@@ -383,8 +383,10 @@ Signature.register_type(chunks)
 def _maybe_group(tasks):
     if isinstance(tasks, group):
         tasks = list(tasks.tasks)
+    elif isinstance(tasks, Signature):
+        tasks = [tasks]
     else:
-        tasks = regen(tasks if is_list(tasks) else tasks)
+        tasks = regen(tasks)
     return tasks
 
 

+ 5 - 0
celery/events/__init__.py

@@ -288,6 +288,11 @@ class EventReceiver(ConsumerMixin):
     def itercapture(self, limit=None, timeout=None, wakeup=True):
         return self.consume(limit=limit, timeout=timeout, wakeup=wakeup)
 
+    def itercapture(self, limit=None, timeout=None, wakeup=True):
+        with self.consumer(wakeup=wakeup) as consumer:
+            yield consumer
+            self.drain_events(limit=limit, timeout=timeout)
+
     def capture(self, limit=None, timeout=None, wakeup=True):
         """Open up a consumer capturing events.
 

+ 9 - 0
celery/events/state.py

@@ -450,5 +450,14 @@ class State(object):
         self.__dict__ = state
         self._mutex = threading.Lock()
 
+    def __getstate__(self):
+        d = dict(vars(self))
+        d.pop('_mutex')
+        return d
+
+    def __setstate__(self, state):
+        self.__dict__ = state
+        self._mutex = threading.Lock()
+
 
 state = State()

+ 4 - 3
celery/local.py

@@ -124,10 +124,11 @@ class Proxy(object):
         object behind the proxy at a time for performance reasons or because
         you want to pass the object into a different context.
         """
-        if not hasattr(self.__local, '__release_local__'):
-            return self.__local(*self.__args, **self.__kwargs)
+        loc = object.__getattribute__(self, '_Proxy__local')
+        if not hasattr(loc, '__release_local__'):
+            return loc(*self.__args, **self.__kwargs)
         try:
-            return getattr(self.__local, self.__name__)
+            return getattr(loc, self.__name__)
         except AttributeError:
             raise RuntimeError('no object bound to {0.__name__}'.format(self))
 

+ 2 - 1
celery/platforms.py

@@ -17,6 +17,7 @@ import signal as _signal
 import sys
 
 from billiard import current_process
+from kombu.utils.encoding import safe_str
 from contextlib import contextmanager
 
 from .local import try_import
@@ -616,7 +617,7 @@ def set_process_title(progname, info=None):
     proctitle = '[{0}]'.format(progname)
     proctitle = '{0} {1}'.format(proctitle, info) if info else proctitle
     if _setproctitle:
-        _setproctitle.setproctitle(proctitle)
+        _setproctitle.setproctitle(safe_str(proctitle))
     return proctitle
 
 

+ 5 - 4
celery/result.py

@@ -512,6 +512,8 @@ class ResultSet(ResultBase):
         result backends.
 
         """
+        if not self.results:
+            return iter([])
         backend = self.results[0].backend
         ids = [result.id for result in self.results]
         return backend.get_many(ids, timeout=timeout, interval=interval)
@@ -538,10 +540,9 @@ class ResultSet(ResultBase):
         return acc
 
     def _failed_join_report(self):
-        for res in self.results:
-            if (res.backend.is_cached(res.id) and
-                    res.state in states.PROPAGATE_STATES):
-                yield res
+        return (res for res in self.results
+                if res.backend.is_cached(res.id) and
+                res.state in states.PROPAGATE_STATES)
 
     def __len__(self):
         return len(self.results)

+ 21 - 1
celery/tests/events/test_events.py

@@ -4,6 +4,7 @@ import socket
 
 from mock import Mock
 
+from celery import Celery
 from celery import events
 from celery.tests.utils import AppCase
 
@@ -39,10 +40,29 @@ class test_Event(AppCase):
 
 class test_EventDispatcher(AppCase):
 
+    def test_redis_uses_fanout_exchange(self):
+        with Celery(set_as_current=False) as app:
+            app.connection = Mock()
+            conn = app.connection.return_value = Mock()
+            conn.transport.driver_type = 'redis'
+
+            dispatcher = app.events.Dispatcher(conn, enabled=False)
+            self.assertEqual(dispatcher.exchange.type, 'fanout')
+
+    def test_others_use_topic_exchange(self):
+        with Celery(set_as_current=False) as app:
+            app.connection = Mock()
+            conn = app.connection.return_value = Mock()
+            conn.transport.driver_type = 'amqp'
+            dispatcher = app.events.Dispatcher(conn, enabled=False)
+            self.assertEqual(dispatcher.exchange.type, 'topic')
+
     def test_send(self):
         producer = MockProducer()
         producer.connection = self.app.connection()
-        eventer = self.app.events.Dispatcher(Mock(), enabled=False,
+        connection = Mock()
+        connection.transport.driver_type = 'amqp'
+        eventer = self.app.events.Dispatcher(connection, enabled=False,
                                              buffer_while_offline=False)
         eventer.producer = producer
         eventer.enabled = True

+ 2 - 1
celery/worker/pidbox.py

@@ -4,6 +4,7 @@ import socket
 import threading
 
 from kombu.common import ignore_errors
+from kombu.utils.encoding import safe_str
 
 from celery.datastructures import AttributeDict
 from celery.utils.log import get_logger
@@ -21,7 +22,7 @@ class Pidbox(object):
         self.c = c
         self.hostname = c.hostname
         self.node = c.app.control.mailbox.Node(
-            c.hostname,
+            safe_str(c.hostname),
             handlers=control.Panel.data,
             state=AttributeDict(app=c.app, hostname=c.hostname, consumer=c),
         )

+ 8 - 0
docs/conf.py

@@ -33,10 +33,18 @@ app.conf.update(BROKER_URL="memory://",
 extensions = ['sphinx.ext.autodoc',
               'sphinx.ext.coverage',
               'sphinx.ext.pngmath',
+              'sphinx.ext.viewcode',
+              'sphinx.ext.coverage',
               'sphinx.ext.intersphinx',
               'sphinxcontrib.issuetracker',
               'celerydocs']
 
+def linkcode_resolve(domain, info):
+    if domain != 'py' or not info['module']:
+        return
+    filename = info['module'].replace('.', '/')
+    return 'http://github.com/celery/celery/tree/master/%s.py' % (filename, )
+
 html_show_sphinx = False
 
 # Add any paths that contain templates here, relative to this directory.

+ 21 - 0
docs/configuration.rst

@@ -993,6 +993,27 @@ Result backends caches ready results used by the client.
 This is the total number of results to cache before older results are evicted.
 The default is 5000.
 
+.. setting:: CELERY_CHORD_PROPAGATES
+
+CELERY_CHORD_PROPAGATES
+~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.0.14
+
+This setting defines what happens when a task part of a chord raises an
+exception:
+
+- If propagate is True the chord callback will change state to FAILURE
+  with the exception value set to a :exc:`~celery.exceptions.ChordError`
+  instance containing information about the error and the task that failed.
+
+    This is the default behavior in Celery 3.1+
+
+- If propagate is False the exception value will instead be forwarded
+  to the chord callback.
+
+    This was the default behavior before version 3.1.
+
 .. setting:: CELERY_TRACK_STARTED
 
 CELERY_TRACK_STARTED

+ 14 - 2
docs/django/first-steps-with-django.rst

@@ -77,8 +77,20 @@ of your Django project where ``manage.py`` is located and execute:
 
     $ python manage.py startapp celerytest
 
-After the new app has been created, define a task by creating
-a new file called ``celerytest/tasks.py``:
+Next you have to add the new app to ``INSTALLED_APPS`` so that your
+Django project recognizes it.  This setting is a tuple/list so just
+append ``celerytest`` as a new element at the end
+
+.. code-block:: python
+
+    INSTALLED_APPS = (
+        ...,
+        'djcelery',
+        'celerytest',
+    )
+
+After the new app has been created and added to ``INSTALLED_APPS``,
+you can define your tasks by creating a new file called ``celerytest/tasks.py``:
 
 .. code-block:: python
 

+ 128 - 0
docs/history/changelog-3.0.rst

@@ -9,6 +9,134 @@
 
 If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
+.. _version-3.0.15:
+
+3.0.15
+======
+:release-date: 2013-02-11 04:30:00 P.M UTC
+
+- Now depends on billiard 2.7.3.21 which fixed a syntax error crash.
+
+- Fixed bug with :setting:`CELERY_SEND_TASK_SENT_EVENT`.
+
+.. _version-3.0.14:
+
+3.0.14
+======
+:release-date: 2013-02-08 05:00:00 P.M UTC
+
+- Now depends on Kombu 2.5.6
+
+- Now depends on billiard 2.7.3.20
+
+- ``execv`` is now disabled by default.
+
+    It was causing too many problems for users, you can still enable
+    it using the :setting:`CELERYD_FORCE_EXECV` setting.
+
+    execv was only enabled when transports other than amqp/redis was used,
+    and it's there to prevent deadlocks caused by mutexes not being released
+    before the process forks.  Sadly it also changes the environment
+    introducing many corner case bugs that is hard to fix without adding
+    horrible hacks.  Deadlock issues are reported far less often than the
+    bugs that execv are causing, so we now disable it by default.
+
+    Work is in motion to create non-blocking versions of these transports
+    so that execv is not necessary (which is the situation with the amqp
+    and redis broker transports)
+
+- Chord exception behavior defined (Issue #1172).
+
+    From Celery 3.1 the chord callback will change state to FAILURE
+    when a task part of a chord raises an exception.
+
+    It was never documented what happens in this case,
+    and the actual behavior was very unsatisfactory, indeed
+    it will just forward the exception value to the chord callback.
+
+    For backward compatibility reasons we do not change to the new
+    behavior in a bugfix release, even if the current behavior was
+    never documented.  Instead you can enable the
+    :setting:`CELERY_CHORD_PROPAGATES` setting to get the new behavior
+    that will be default from Celery 3.1.
+
+    See more at :ref:`chord-errors`.
+
+- worker: Fixes bug with ignored and retried tasks.
+
+    The ``on_chord_part_return`` and ``Task.after_return`` callbacks,
+    nor the ``task_postrun`` signal should be called when the task was
+    retried/ignored.
+
+    Fix contributed by Vlad.
+
+- ``GroupResult.join_native`` now respects the ``propagate`` argument.
+
+- ``subtask.id`` added as an alias to ``subtask['options'].id``
+
+    .. code-block:: python
+
+        >>> s = add.s(2, 2)
+        >>> s.id = 'my-id'
+        >>> s['options']
+        {'task_id': 'my-id'}
+
+        >>> s.id
+        'my-id'
+
+- worker: Fixed error `Could not start worker processes` occurring
+  when restarting after connection failure (Issue #1118).
+
+- Adds new signal :signal:`task-retried` (Issue #1169).
+
+- `celery events --dumper` now handles connection loss.
+
+- Will now retry sending the task-sent event in case of connection failure.
+
+- amqp backend:  Now uses ``Message.requeue`` instead of republishing
+  the message after poll.
+
+- New :setting:`BROKER_HEARTBEAT_CHECKRATE` setting introduced to modify the
+  rate at which broker connection heartbeats are monitored.
+
+    The default value was also changed from 3.0 to 2.0.
+
+- :class:`celery.events.state.State` is now pickleable.
+
+    Fix contributed by Mher Movsisyan.
+
+- :class:`celery.datastructures.LRUCache` is now pickleable.
+
+    Fix contributed by Mher Movsisyan.
+
+- The stats broadcast command now includes the workers pid.
+
+    Contributed by Mher Movsisyan.
+
+- New ``conf`` remote control command to get a workers current configuration.
+
+    Contributed by Mher Movsisyan.
+
+- Adds the ability to modify the chord unlock task's countdown
+  argument (Issue #1146).
+
+    Contributed by Jun Sakai
+
+- beat: The scheduler now uses the `now()`` method of the schedule,
+  so that schedules can provide a custom way to get the current date and time.
+
+    Contributed by Raphaël Slinckx
+
+- Fixed pickling of configuration modules on Windows or when execv is used
+  (Issue #1126).
+
+- Multiprocessing logger is now configured with loglevel ``ERROR``
+  by default.
+
+    Since 3.0 the multiprocessing loggers were disabled by default
+    (only configured when the :envvar:`MP_LOG` environment variable was set).
+
+
 .. _version-3.0.13:
 
 3.0.13

+ 11 - 3
docs/userguide/canvas.rst

@@ -726,14 +726,18 @@ the return value of each task in the header.  The task id returned by
 and get the final return value (but remember to :ref:`never have a task wait
 for other tasks <task-synchronous-subtasks>`)
 
+.. _chord-errors:
+
 Error handling
 ~~~~~~~~~~~~~~
 
-.. versionadded:: 3.0.14
-
 So what happens if one of the tasks raises an exception?
 
-Errors will propagate to the callback, so the callback will not be executed
+This was not documented for some time and before version 3.1
+the exception value will be forwarded to the chord callback.
+
+
+From 3.1 errors will propagate to the callback, so the callback will not be executed
 instead the callback changes to failure state, and the error is set
 to the :exc:`~celery.exceptions.ChordError` exception:
 
@@ -751,6 +755,10 @@ to the :exc:`~celery.exceptions.ChordError` exception:
     celery.exceptions.ChordError: Dependency 97de6f3f-ea67-4517-a21c-d867c61fcb47
         raised ValueError('something something',)
 
+If you're running 3.0.14 or later you can enable the new behavior via
+the :setting:`CELERY_CHORD_PROPAGATES` setting::
+
+    CELERY_CHORD_PROPAGATES = True
 
 While the traceback may be different depending on which result backend is
 being used, you can see the error description includes the id of the task that failed

+ 2 - 2
docs/userguide/periodic-tasks.rst

@@ -78,7 +78,7 @@ Example: Run the `tasks.add` task every 30 seconds.
     from datetime import timedelta
 
     CELERYBEAT_SCHEDULE = {
-        'runs-every-30-seconds': {
+        'add-every-30-seconds': {
             'task': 'tasks.add',
             'schedule': timedelta(seconds=30),
             'args': (16, 16)
@@ -150,7 +150,7 @@ the :class:`~celery.schedules.crontab` schedule type:
 
     CELERYBEAT_SCHEDULE = {
         # Executes every Monday morning at 7:30 A.M
-        'every-monday-morning': {
+        'add-every-monday-morning': {
             'task': 'tasks.add',
             'schedule': crontab(hour=7, minute=30, day_of_week=1),
             'args': (16, 16),

+ 129 - 0
extra/bash-completion/celery.bash

@@ -0,0 +1,129 @@
+# This is a bash completion script for celery
+# Redirect it to a file, then source it or copy it to /etc/bash_completion.d
+# to get tab completion. celery must be on your PATH for this to work.
+_celery()
+{
+    local cur basep opts base kval kkey loglevels prevp in_opt controlargs
+    local pools
+    COMPREPLY=()
+    cur="${COMP_WORDS[COMP_CWORD]}"
+    prevp="${COMP_WORDS[COMP_CWORD-1]}"
+    basep="${COMP_WORDS[1]}"
+    opts="worker events beat shell multi amqp status
+          inspect control purge list migrate call result report"
+    fargs="--app= --broker= --loader= --config= --version"
+    dopts="--detach --umask= --gid= --uid= --pidfile= --logfile= --loglevel="
+    controlargs="--timeout --destination"
+    pools="processes eventlet gevent threads solo"
+    loglevels="critical error warning info debug"
+    in_opt=0
+
+    # find the current subcommand, store in basep'
+    for index in $(seq 1 $((${#COMP_WORDS[@]} - 2)))
+    do
+        basep=${COMP_WORDS[$index]}
+        if [ "${basep:0:2}" != "--" ]; then
+            break;
+        fi
+    done
+
+    if [ "${cur:0:2}" == "--" -a "$cur" != "${cur//=}" ]; then
+        in_opt=1
+        kkey="${cur%=*}"
+        kval="${cur#*=}"
+    elif [ "${prevp:0:1}" == "-" ]; then
+        in_opt=1
+        kkey="$prevp"
+        kval="$cur"
+    fi
+
+    if [ $in_opt -eq 1 ]; then
+        case "${kkey}" in
+            --uid|-u)
+                COMPREPLY=( $(compgen -u -- "$kval") )
+                return 0
+            ;;
+            --gid|-g)
+                COMPREPLY=( $(compgen -g -- "$kval") )
+                return 0
+            ;;
+            --pidfile|--logfile|-p|-f|--statedb|-S|-s|--schedule-filename)
+                COMPREPLY=( $(compgen -f -- "$kval") )
+                return 0
+            ;;
+            --workdir)
+                COMPREPLY=( $(compgen -d -- "$kval") )
+                return 0
+            ;;
+            --loglevel|-l)
+                COMPREPLY=( $(compgen -W "$loglevels" -- "$kval") )
+                return 0
+            ;;
+            --pool|-P)
+                COMPREPLY=( $(compgen -W "$pools" -- "$kval") )
+                return 0
+            ;;
+            *)
+            ;;
+        esac
+    fi
+
+    case "${basep}" in
+    worker)
+        COMPREPLY=( $(compgen -W '--concurrency= --pool= --purge --logfile=
+        --loglevel= --hostname= --beat --schedule= --scheduler= --statedb= --events
+        --time-limit= --soft-time-limit= --maxtasksperchild= --queues=
+        --include= --pidfile= --autoscale= --autoreload --no-execv $fargs' -- ${cur} ) )
+        return 0
+        ;;
+    inspect)
+        COMPREPLY=( $(compgen -W 'active active_queues ping registered report
+        reserved revoked scheduled stats --help $controlargs $fargs' -- ${cur}) )
+        return 0
+        ;;
+    control)
+        COMPREPLY=( $(compgen -W 'add_consumer autoscale cancel_consumer
+        disable_events enable_events pool_grow pool_shrink
+        rate_limit time_limit --help $controlargs $fargs' -- ${cur}) )
+        return 0
+        ;;
+    multi)
+        COMPREPLY=( $(compgen -W 'start restart stopwait stop show
+        kill names expand get help --quiet --nosplash
+        --verbose --no-color --help $fargs' -- ${cur} ) )
+        return 0
+        ;;
+    amqp)
+        COMPREPLY=( $(compgen -W 'queue.declare queue.purge exchange.delete
+        basic.publish exchange.declare queue.delete queue.bind
+        basic.get --help $fargs' -- ${cur} ))
+        return 0
+        ;;
+    list)
+        COMPREPLY=( $(compgen -W 'bindings $fargs' -- ${cur} ) )
+        return 0
+        ;;
+    shell)
+        COMPREPLY=( $(compgen -W '--ipython --bpython --python
+        --without-tasks --eventlet --gevent $fargs' -- ${cur} ) )
+        return 0
+        ;;
+    beat)
+        COMPREPLY=( $(compgen -W '--schedule= --scheduler=
+        --max-interval= $dopts $fargs' -- ${cur}  ))
+        return 0
+        ;;
+    events)
+        COMPREPLY=( $(compgen -W '--dump --camera= --freq=
+        --maxrate= $dopts $fargs' -- ${cur}))
+        return 0
+        ;;
+    *)
+        ;;
+    esac
+
+   COMPREPLY=($(compgen -W "${opts} ${fargs}" -- ${cur}))
+   return 0
+}
+complete -F _celery celery
+

+ 2 - 2
pavement.py

@@ -86,8 +86,8 @@ def flakes(options):
 
 @task
 def clean_readme(options):
-    path('README').unlink()
-    path('README.rst').unlink()
+    path('README').unlink_p()
+    path('README.rst').unlink_p()
 
 
 @task

+ 2 - 2
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz
-billiard>=2.7.3.19
-kombu>=2.5.4
+billiard>=2.7.3.21
+kombu>=2.5.6

+ 2 - 2
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 [bdist_rpm]
 requires = pytz
-           billiard >= 2.7.3.19
-           kombu >= 2.5.4
+           billiard >= 2.7.3.21
+           kombu >= 2.5.6