Browse Source

Merge branch '3.0'

Conflicts:
	README.rst
	celery/__init__.py
	celery/task/trace.py
	docs/includes/introduction.txt
	docs/userguide/periodic-tasks.rst
	docs/userguide/signals.rst
	requirements/default-py3k.txt
	requirements/default.txt
	setup.cfg
Ask Solem 12 years ago
parent
commit
d8a0591091

+ 43 - 12
Changelog

@@ -24,11 +24,11 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 
 
 3.0.10
 3.0.10
 ======
 ======
-:release-date: TBA
+:release-date: 2012-09-20 05:30 P.M BST
 
 
 - Now depends on kombu 2.4.7
 - Now depends on kombu 2.4.7
 
 
-- Now depends on billiard 2.7.3.13
+- Now depends on billiard 2.7.3.14
 
 
     - Fixes crash at startup when using Django and pre-1.4 projects
     - Fixes crash at startup when using Django and pre-1.4 projects
       (setup_environ).
       (setup_environ).
@@ -39,7 +39,7 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
     - Billiard now installs even if the C extension cannot be built.
     - Billiard now installs even if the C extension cannot be built.
 
 
         It's still recommended to build the C extension if you are using
         It's still recommended to build the C extension if you are using
-        a transport other than rabbitmq/redis (or use force_execv for some
+        a transport other than rabbitmq/redis (or use forced execv for some
         other reason).
         other reason).
 
 
     - Pool now sets a ``current_process().index`` attribute that can be used to create
     - Pool now sets a ``current_process().index`` attribute that can be used to create
@@ -48,7 +48,9 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 - Canvas: chord/group/chain no longer modifies the state when called
 - Canvas: chord/group/chain no longer modifies the state when called
 
 
     Previously calling a chord/group/chain would modify the ids of subtasks
     Previously calling a chord/group/chain would modify the ids of subtasks
-    so that::
+    so that:
+
+    .. code-block:: python
 
 
         >>> c = chord([add.s(2, 2), add.s(4, 4)], xsum.s())
         >>> c = chord([add.s(2, 2), add.s(4, 4)], xsum.s())
         >>> c()
         >>> c()
@@ -58,23 +60,37 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
     previous invocation.  This is now fixed, so that calling a subtask
     previous invocation.  This is now fixed, so that calling a subtask
     won't mutate any options.
     won't mutate any options.
 
 
-- Canvas: Chaining a chord to another task now works.
+- Canvas: Chaining a chord to another task now works (Issue #965).
 
 
 - Worker: Fixed a bug where the request stack could be corrupted if
 - Worker: Fixed a bug where the request stack could be corrupted if
   relative imports are used.
   relative imports are used.
 
 
     Problem usually manifested itself as an exception while trying to
     Problem usually manifested itself as an exception while trying to
-    send a failed task result (NoneType does not have id attribute).
+    send a failed task result (``NoneType does not have id attribute``).
 
 
     Fix contributed by Sam Cooke.
     Fix contributed by Sam Cooke.
 
 
+- Tasks can now raise :exc:`~celery.exceptions.Ignore` to skip updating states
+  or events after return.
+
+    Example:
+
+    .. code-block:: python
+
+        from celery.exceptions import Ignore
+
+        @task
+        def custom_revokes():
+            if redis.sismember('tasks.revoked', custom_revokes.request.id):
+                raise Ignore()
+
 - The worker now makes sure the request/task stacks are not modified
 - The worker now makes sure the request/task stacks are not modified
   by the initial ``Task.__call__``.
   by the initial ``Task.__call__``.
 
 
     This would previously be a problem if a custom task class defined
     This would previously be a problem if a custom task class defined
     ``__call__`` and also called ``super()``.
     ``__call__`` and also called ``super()``.
 
 
-- Because of many bugs the fast local optimization has been disabled,
+- Because of problems the fast local optimization has been disabled,
   and can only be enabled by setting the :envvar:`USE_FAST_LOCALS` attribute.
   and can only be enabled by setting the :envvar:`USE_FAST_LOCALS` attribute.
 
 
 - Worker: Now sets a default socket timeout of 5 seconds at shutdown
 - Worker: Now sets a default socket timeout of 5 seconds at shutdown
@@ -82,7 +98,7 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 
 
 - More fixes related to late eventlet/gevent patching.
 - More fixes related to late eventlet/gevent patching.
 
 
-- Documentation for the settings out of sync with reality:
+- Documentation for settings out of sync with reality:
 
 
     - :setting:`CELERY_TASK_PUBLISH_RETRY`
     - :setting:`CELERY_TASK_PUBLISH_RETRY`
 
 
@@ -98,9 +114,7 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 
 
     Fix contributed by Matt Long.
     Fix contributed by Matt Long.
 
 
-- Worker: Log messages when connection established and lost have been improved
-  so that they are more useful when used with the upcoming multiple broker
-  hostlist for failover that is coming in the next Kombu version.
+- Worker: Log messages when connection established and lost have been improved.
 
 
 - The repr of a crontab schedule value of '0' should be '*'  (Issue #972).
 - The repr of a crontab schedule value of '0' should be '*'  (Issue #972).
 
 
@@ -109,7 +123,24 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 
 
     Fix contributed by Alexey Zatelepin.
     Fix contributed by Alexey Zatelepin.
 
 
-- gevent: Now supports hard time limits using ``gevent.Timeout`.
+- gevent: Now supports hard time limits using ``gevent.Timeout``.
+
+- Documentation: Links to init scripts now point to the 3.0 branch instead
+  of the development branch (master).
+
+- Documentation: Fixed typo in signals user guide (Issue #986).
+
+    ``instance.app.queues`` -> ``instance.app.amqp.queues``.
+
+- Eventlet/gevent: The worker did not properly set the custom app
+  for new greenlets.
+
+- Eventlet/gevent: Fixed a bug where the worker could not recover
+  from connection loss (Issue #959).
+
+    Also, because of a suspected bug in gevent the
+    :setting:`BROKER_CONNECTION_TIMEOUT` setting has been disabled
+    when using gevent
 
 
 3.0.9
 3.0.9
 =====
 =====

+ 6 - 4
celery/_state.py

@@ -36,14 +36,16 @@ _task_stack = LocalStack()
 
 
 def set_default_app(app):
 def set_default_app(app):
     global default_app
     global default_app
-    if default_app is None:
-        default_app = app
+    default_app = app
 
 
 
 
 def get_current_app():
 def get_current_app():
     if default_app is None:
     if default_app is None:
-        # creates the default app, but we want to defer that.
-        import celery.app  # noqa
+        #: creates the global fallback app instance.
+        from celery.app import Celery, default_loader
+        set_default_app(Celery('default', loader=default_loader,
+                                          set_as_current=False,
+                                          accept_magic_kwargs=True))
     return _tls.current_app or default_app
     return _tls.current_app or default_app
 
 
 
 

+ 0 - 5
celery/app/__init__.py

@@ -36,11 +36,6 @@ app_or_default = None
 #: The 'default' loader is the default loader used by old applications.
 #: The 'default' loader is the default loader used by old applications.
 default_loader = os.environ.get('CELERY_LOADER') or 'default'
 default_loader = os.environ.get('CELERY_LOADER') or 'default'
 
 
-#: Global fallback app instance.
-set_default_app(Celery('default', loader=default_loader,
-                                  set_as_current=False,
-                                  accept_magic_kwargs=True))
-
 
 
 def bugreport():
 def bugreport():
     return current_app().bugreport()
     return current_app().bugreport()

+ 1 - 1
celery/beat.py

@@ -172,7 +172,7 @@ class Scheduler(object):
         is_due, next_time_to_run = entry.is_due()
         is_due, next_time_to_run = entry.is_due()
 
 
         if is_due:
         if is_due:
-            info('Scheduler: Sending due task %s', entry.task)
+            info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
             try:
             try:
                 result = self.apply_async(entry, publisher=publisher)
                 result = self.apply_async(entry, publisher=publisher)
             except Exception as exc:
             except Exception as exc:

+ 2 - 2
celery/bin/base.py

@@ -377,8 +377,8 @@ class Command(object):
             return match.sub(lambda m: keys[m.expand(expand)], s)
             return match.sub(lambda m: keys[m.expand(expand)], s)
 
 
     def _get_default_app(self, *args, **kwargs):
     def _get_default_app(self, *args, **kwargs):
-        from celery.app import default_app
-        return default_app._get_current_object()  # omit proxy
+        from celery._state import get_current_app
+        return get_current_app()  # omit proxy
 
 
 
 
 def daemon_options(default_pidfile=None, default_logfile=None):
 def daemon_options(default_pidfile=None, default_logfile=None):

+ 4 - 0
celery/exceptions.py

@@ -25,6 +25,10 @@ class SecurityError(Exception):
     """
     """
 
 
 
 
+class Ignore(Exception):
+    """A task can raise this to ignore doing state updates."""
+
+
 class SystemTerminate(SystemExit):
 class SystemTerminate(SystemExit):
     """Signals that the worker should terminate."""
     """Signals that the worker should terminate."""
 
 

+ 1 - 0
celery/states.py

@@ -133,6 +133,7 @@ FAILURE = 'FAILURE'
 REVOKED = 'REVOKED'
 REVOKED = 'REVOKED'
 #: Task is waiting for retry.
 #: Task is waiting for retry.
 RETRY = 'RETRY'
 RETRY = 'RETRY'
+IGNORED = 'IGNORED'
 
 
 READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])
 READY_STATES = frozenset([SUCCESS, FAILURE, REVOKED])
 UNREADY_STATES = frozenset([PENDING, RECEIVED, STARTED, RETRY])
 UNREADY_STATES = frozenset([PENDING, RECEIVED, STARTED, RETRY])

+ 4 - 1
celery/task/trace.py

@@ -29,7 +29,7 @@ from celery._state import _task_stack
 from celery.app import set_default_app
 from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
 from celery.app.task import Task as BaseTask, Context
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
-from celery.exceptions import RetryTaskError
+from celery.exceptions import Ignore, RetryTaskError
 from celery.utils.serialization import get_pickleable_exception
 from celery.utils.serialization import get_pickleable_exception
 from celery.utils.log import get_logger
 from celery.utils.log import get_logger
 
 
@@ -43,6 +43,7 @@ send_success = signals.task_success.send
 success_receivers = signals.task_success.receivers
 success_receivers = signals.task_success.receivers
 STARTED = states.STARTED
 STARTED = states.STARTED
 SUCCESS = states.SUCCESS
 SUCCESS = states.SUCCESS
+IGNORED = states.IGNORED
 RETRY = states.RETRY
 RETRY = states.RETRY
 FAILURE = states.FAILURE
 FAILURE = states.FAILURE
 EXCEPTION_STATES = states.EXCEPTION_STATES
 EXCEPTION_STATES = states.EXCEPTION_STATES
@@ -222,6 +223,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 try:
                 try:
                     R = retval = fun(*args, **kwargs)
                     R = retval = fun(*args, **kwargs)
                     state = SUCCESS
                     state = SUCCESS
+                except Ignore as exc:
+                    I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
                 except RetryTaskError as exc:
                 except RetryTaskError as exc:
                     I = Info(RETRY, exc)
                     I = Info(RETRY, exc)
                     state, retval = I.state, I.retval
                     state, retval = I.state, I.retval

+ 18 - 6
celery/worker/job.py

@@ -23,7 +23,7 @@ from celery import exceptions
 from celery import signals
 from celery import signals
 from celery.app import app_or_default
 from celery.app import app_or_default
 from celery.datastructures import ExceptionInfo
 from celery.datastructures import ExceptionInfo
-from celery.exceptions import TaskRevokedError
+from celery.exceptions import Ignore, TaskRevokedError
 from celery.platforms import signals as _signals
 from celery.platforms import signals as _signals
 from celery.task.trace import (
 from celery.task.trace import (
     trace_task,
     trace_task,
@@ -64,8 +64,9 @@ class Request(object):
                  'eventer', 'connection_errors',
                  'eventer', 'connection_errors',
                  'task', 'eta', 'expires',
                  'task', 'eta', 'expires',
                  'request_dict', 'acknowledged', 'success_msg',
                  'request_dict', 'acknowledged', 'success_msg',
-                 'error_msg', 'retry_msg', 'time_start', 'worker_pid',
-                 '_already_revoked', '_terminate_on_ack', '_tzlocal')
+                 'error_msg', 'retry_msg', 'ignore_msg',
+                 'time_start', 'worker_pid', '_already_revoked',
+                 '_terminate_on_ack', '_tzlocal')
 
 
     #: Format string used to log task success.
     #: Format string used to log task success.
     success_msg = """\
     success_msg = """\
@@ -82,6 +83,10 @@ class Request(object):
         Task %(name)s[%(id)s] INTERNAL ERROR: %(exc)s
         Task %(name)s[%(id)s] INTERNAL ERROR: %(exc)s
     """
     """
 
 
+    ignored_msg = """\
+        Task %(name)s[%(id)s] ignored
+    """
+
     #: Format string used to log task retry.
     #: Format string used to log task retry.
     retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
     retry_msg = """Task %(name)s[%(id)s] retry: %(exc)s"""
 
 
@@ -383,9 +388,16 @@ class Request(object):
                          traceback=traceback)
                          traceback=traceback)
 
 
         if internal:
         if internal:
-            format = self.internal_error_msg
-            description = 'INTERNAL ERROR'
-            severity = logging.CRITICAL
+            if isinstance(einfo.exception, Ignore):
+                format = self.ignored_msg
+                description = 'ignored'
+                severity = logging.INFO
+                exc_info = None
+                self.acknowledge()
+            else:
+                format = self.internal_error_msg
+                description = 'INTERNAL ERROR'
+                severity = logging.CRITICAL
 
 
         context = {
         context = {
             'hostname': self.hostname,
             'hostname': self.hostname,

+ 2 - 10
docs/reference/celery.app.amqp.rst

@@ -39,16 +39,8 @@
     ------
     ------
 
 
     .. autoclass:: Queues
     .. autoclass:: Queues
-
-        .. automethod:: add
-
-        .. automethod:: format
-
-        .. automethod:: select_subset
-
-        .. automethod:: new_missing
-
-        .. autoattribute:: consume_from
+        :members:
+        :undoc-members:
 
 
     TaskPublisher
     TaskPublisher
     -------------
     -------------

+ 3 - 3
docs/tutorials/daemonizing.rst

@@ -22,7 +22,7 @@ This directory contains generic bash init scripts for :program:`celeryd`,
 that should run on Linux, FreeBSD, OpenBSD, and other Unix platforms.
 that should run on Linux, FreeBSD, OpenBSD, and other Unix platforms.
 
 
 .. _`extra/generic-init.d/`:
 .. _`extra/generic-init.d/`:
-    http://github.com/celery/celery/tree/master/extra/generic-init.d/
+    http://github.com/celery/celery/tree/3.0/extra/generic-init.d/
 
 
 .. _generic-initd-celeryd:
 .. _generic-initd-celeryd:
 
 
@@ -299,7 +299,7 @@ actual resulting output:
 * `extra/supervisord/`_
 * `extra/supervisord/`_
 
 
 .. _`extra/supervisord/`:
 .. _`extra/supervisord/`:
-    http://github.com/celery/celery/tree/master/extra/supervisord/
+    http://github.com/celery/celery/tree/3.0/extra/supervisord/
 .. _`supervisord`: http://supervisord.org/
 .. _`supervisord`: http://supervisord.org/
 
 
 .. _daemon-launchd:
 .. _daemon-launchd:
@@ -310,7 +310,7 @@ launchd (OS X)
 * `extra/mac/`_
 * `extra/mac/`_
 
 
 .. _`extra/mac/`:
 .. _`extra/mac/`:
-    http://github.com/celery/celery/tree/master/extra/mac/
+    http://github.com/celery/celery/tree/3.0/extra/mac/
 
 
 
 
 .. _daemon-windows:
 .. _daemon-windows:

+ 22 - 28
docs/userguide/periodic-tasks.rst

@@ -37,14 +37,32 @@ An example time zone could be `Europe/London`:
 
 
     CELERY_TIMEZONE = 'Europe/London'
     CELERY_TIMEZONE = 'Europe/London'
 
 
-.. admonition:: Changing the time zone
-
 The default scheduler (storing the schedule in the :file:`celerybeat-schedule`
 The default scheduler (storing the schedule in the :file:`celerybeat-schedule`
-file) will automatically detect that the timezone has changed, and so will
+file) will automatically detect that the time zone has changed, and so will
 reset the schedule itself, but other schedulers may not be so smart (e.g. the
 reset the schedule itself, but other schedulers may not be so smart (e.g. the
-Django database scheduler) and in that case you will have to reset the
+Django database scheduler, see below) and in that case you will have to reset the
 schedule manually.
 schedule manually.
 
 
+.. admonition:: Django Users
+
+    Celery recommends and is compatible with the new ``USE_TZ`` setting introduced
+    in Django 1.4.
+
+    For Django users the time zone specified in the ``TIME_ZONE`` setting
+    will be used, or you can specify a custom time zone for Celery alone
+    by using the :setting:`CELERY_TIMEZONE` setting.
+
+    The database scheduler will not reset when timezone related settings
+    change, so you must do this manually:
+
+    .. code-block:: bash
+
+        $ python manage.py shell
+        >>> from djcelery.models import PeriodicTask
+        >>> PeriodicTask.objects.update(last_run_at=None)
+
+.. _`pytz`: http://pypi.python.org/pypi/pytz/
+
 .. _beat-entries:
 .. _beat-entries:
 
 
 Entries
 Entries
@@ -69,7 +87,6 @@ Example: Run the `tasks.add` task every 30 seconds.
 
 
     CELERY_TIMEZONE = 'UTC'
     CELERY_TIMEZONE = 'UTC'
 
 
-
 Using a :class:`~datetime.timedelta` for the schedule means the task will
 Using a :class:`~datetime.timedelta` for the schedule means the task will
 be executed 30 seconds after `celery beat` starts, and then every 30 seconds
 be executed 30 seconds after `celery beat` starts, and then every 30 seconds
 after the last run.  A crontab like schedule also exists, see the section
 after the last run.  A crontab like schedule also exists, see the section
@@ -199,29 +216,6 @@ The syntax of these crontab expressions are very flexible.  Some examples:
 
 
 See :class:`celery.schedules.crontab` for more documentation.
 See :class:`celery.schedules.crontab` for more documentation.
 
 
-.. _beat-timezones:
-
-Timezones
-=========
-
-By default the current local timezone is used, but you can also set a specific
-timezone by enabling the :setting:`CELERY_ENABLE_UTC` setting and configuring
-the :setting:`CELERY_TIMEZONE` setting:
-
-.. code-block:: python
-
-    CELERY_ENABLE_UTC = True
-    CELERY_TIMEZONE = 'Europe/London'
-
-.. admonition:: Django Users
-
-    For Django users the timezone specified in the ``TIME_ZONE`` setting
-    will be used, but *not if the :setting:`CELERY_ENABLE_UTC` setting is
-    enabled*.
-
-    Celery is also compatible with the new ``USE_TZ`` setting introduced
-    in Django 1.4.
-
 .. _beat-starting:
 .. _beat-starting:
 
 
 Starting the Scheduler
 Starting the Scheduler

+ 1 - 1
docs/userguide/signals.rst

@@ -226,7 +226,7 @@ used to route a task to any specific worker:
     @celeryd_after_setup.connect
     @celeryd_after_setup.connect
     def setup_direct_queue(sender, instance, **kwargs):
     def setup_direct_queue(sender, instance, **kwargs):
         queue_name = '{0}.dq'.format(sender)  # sender is the hostname of the worker
         queue_name = '{0}.dq'.format(sender)  # sender is the hostname of the worker
-        instance.app.queues.select_add(queue_name)
+        instance.app.amqp.queues.select_add(queue_name)
 
 
 Provides arguments:
 Provides arguments:
 
 

+ 2 - 2
examples/eventlet/README.rst

@@ -35,13 +35,13 @@ of the response body::
     >>> urlopen.delay("http://www.google.com/").get()
     >>> urlopen.delay("http://www.google.com/").get()
     9980
     9980
 
 
-To open several URLs at once you can do:
+To open several URLs at once you can do::
 
 
     $ cd examples/eventlet
     $ cd examples/eventlet
     $ python
     $ python
     >>> from tasks import urlopen
     >>> from tasks import urlopen
     >>> from celery import group
     >>> from celery import group
-    >>> result = gruop(urlopen.s(url)
+    >>> result = group(urlopen.s(url)
     ...                     for url in LIST_OF_URLS).apply_async()
     ...                     for url in LIST_OF_URLS).apply_async()
     >>> for incoming_result in result.iter_native():
     >>> for incoming_result in result.iter_native():
     ...     print(incoming_result, )
     ...     print(incoming_result, )

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz
 pytz
-billiard>=2.7.3.13
+billiard>=2.7.3.15
 kombu>=2.4.7,<3.0
 kombu>=2.4.7,<3.0

+ 1 - 1
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 
 [bdist_rpm]
 [bdist_rpm]
 requires = pytz
 requires = pytz
-           billiard>=2.7.3.13
+           billiard>=2.7.3.15
            kombu >= 2.4.7
            kombu >= 2.4.7