Преглед на файлове

Merge branch 'master' into bootsteps_refactor

Conflicts:
	celery/worker/consumer.py
Ask Solem преди 12 години
родител
ревизия
8e62ecc45d

+ 2 - 1
CONTRIBUTORS.txt

@@ -118,4 +118,5 @@ Paul McMillan, 2012/07/26
 Mitar, 2012/07/28
 Adam DePue, 2012/08/22
 Thomas Meson, 2012/08/28
-Daniel Lundin, 2012/08/30 
+Daniel Lundin, 2012/08/30
+Alexey Zatelepin, 2012/09/18

+ 72 - 1
Changelog

@@ -7,7 +7,7 @@
 .. contents::
     :local:
 
-If you're looking for versions prior to 3.x you should see :ref:`history`.
+If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
 .. _version-3.1.0:
 
@@ -20,6 +20,77 @@ If you're looking for versions prior to 3.x you should see :ref:`history`.
 - `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
 - `App.control.Inspect.conf` can be used for inspecting worker configuration
 
+.. _version-3.0.11:
+
+3.0.11
+======
+:release-date: 2012-09-26 04:00 P.M UTC
+
+- [security:low] generic-init.d scripts changed permissions of /var/log & /var/run
+
+    In the daemonization tutorial the recommended directories were as follows:
+
+    .. code-block:: bash
+
+        CELERYD_LOG_FILE="/var/log/celery/%n.log"
+        CELERYD_PID_FILE="/var/run/celery/%n.pid"
+
+    But in the scripts themselves the default files were ``/var/log/celery%n.log``
+    and ``/var/run/celery%n.pid``, so if the user did not change the location
+    by configuration, the directories ``/var/log`` and ``/var/run`` would be
+    created - and worse have their permissions and owners changed.
+
+    This change means that:
+
+        - Default pid file is ``/var/run/celery/%n.pid``
+        - Default log file is ``/var/log/celery/%n.log``
+
+        - The directories are only created and have their permissions
+          changed if *no custom locations are set*.
+
+    Users can force paths to be created by calling the ``create-paths``
+    subcommand:
+
+    .. code-block:: bash
+
+        $ sudo /etc/init.d/celeryd create-paths
+
+    .. admonition:: Upgrading Celery will not update init scripts
+
+        To update the init scripts you have to re-download
+        the files from source control and update them manually.
+        You can find the init scripts for version 3.0.x at:
+
+            http://github.com/celery/celery/tree/3.0/extra/generic-init.d
+
+- Now depends on billiard 2.7.3.17
+
+- Fixes request stack protection when app is initialized more than
+  once (Issue #1003).
+
+- ETA tasks now properly works when system timezone is not the same
+  as the configured timezone (Issue #1004).
+
+- Terminating a task now works if the task has been sent to the
+  pool but not yet acknowledged by a pool process (Issue #1007).
+
+    Fix contributed by Alexey Zatelepin
+
+- Terminating a task now properly updates the state of the task to revoked,
+  and sends a ``task-revoked`` event.
+
+- :program:`celery worker` and :program:`celery beat` commands now respects
+  the :option:`--no-color` option (Issue #999).
+
+- Fixed typos in eventlet examples (Issue #1000)
+
+    Fix contributed by Bryan Bishop.
+    Congratulations on opening bug #1000!
+
+- Tasks that raise :exc:`~celery.exceptions.Ignore` are now acknowledged.
+
+- Beat: Now shows the name of the entry in ``sending due task`` logs.
+
 .. _version-3.0.10:
 
 3.0.10

+ 1 - 1
celery/concurrency/processes.py

@@ -142,4 +142,4 @@ class TaskPool(BasePool):
 
     @property
     def timers(self):
-        return {self.maintain_pool: 30.0}
+        return {self.maintain_pool: 5.0}

+ 1 - 1
celery/exceptions.py

@@ -9,7 +9,7 @@
 from __future__ import absolute_import
 
 from billiard.exceptions import (  # noqa
-    SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError,
+    SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError, Terminated,
 )
 
 UNREGISTERED_FMT = """\

+ 4 - 4
celery/tests/worker/test_control.py

@@ -351,17 +351,17 @@ class test_ControlPanel(Case):
     def test_revoke_terminate(self):
         request = Mock()
         request.id = tid = uuid()
-        state.active_requests.add(request)
+        state.reserved_requests.add(request)
         try:
             r = control.revoke(Mock(), tid, terminate=True)
             self.assertIn(tid, revoked)
             self.assertTrue(request.terminate.call_count)
-            self.assertIn('terminated', r['ok'])
+            self.assertIn('terminating', r['ok'])
             # unknown task id only revokes
             r = control.revoke(Mock(), uuid(), terminate=True)
-            self.assertIn('revoked', r['ok'])
+            self.assertIn('not found', r['ok'])
         finally:
-            state.active_requests.discard(request)
+            state.reserved_requests.discard(request)
 
     def test_autoscale(self):
         self.panel.state.consumer = Mock()

+ 6 - 10
celery/utils/timeutils.py

@@ -90,13 +90,6 @@ class LocalTimezone(tzinfo):
         return tt.tm_isdst > 0
 
 
-def _get_local_timezone():
-    global _local_timezone
-    if _local_timezone is None:
-        _local_timezone = LocalTimezone()
-    return _local_timezone
-
-
 class _Zone(object):
 
     def tz_or_local(self, tzinfo=None):
@@ -109,10 +102,13 @@ class _Zone(object):
             dt = make_aware(dt, orig or self.utc)
         return localize(dt, self.tz_or_local(local))
 
+    def to_system(self, dt):
+        return localize(dt, self.local)
+
     def to_local_fallback(self, dt, *args, **kwargs):
         if is_naive(dt):
-            return make_aware(dt, _get_local_timezone())
-        return localize(dt, _get_local_timezone())
+            return make_aware(dt, self.local)
+        return localize(dt, self.local)
 
     def get_timezone(self, zone):
         if isinstance(zone, basestring):
@@ -121,7 +117,7 @@ class _Zone(object):
 
     @cached_property
     def local(self):
-        return _get_local_timezone()
+        return LocalTimezone()
 
     @cached_property
     def utc(self):

+ 7 - 5
celery/worker/control.py

@@ -39,17 +39,19 @@ class Panel(UserDict):
 def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
     """Revoke task by task id."""
     revoked.add(task_id)
-    action = 'revoked'
     if terminate:
         signum = _signals.signum(signal or 'TERM')
-        for request in state.active_requests:
+        for request in state.reserved_requests:
             if request.id == task_id:
-                action = 'terminated ({0})'.format(signum)
+                logger.info('Terminating %s (%s)', task_id, signum)
                 request.terminate(panel.consumer.pool, signal=signum)
                 break
+        else:
+            return {'ok': 'terminate: task {0} not found'.format(task_id)}
+        return {'ok': 'terminating {0} ({1})'.format(task_id, signal)}
 
-    logger.info('Task %s %s.', task_id, action)
-    return {'ok': 'task {0} {1}'.format(task_id, action)}
+    logger.info('Revoking task %s', task_id)
+    return {'ok': 'revoking task {0}'.format(task_id)}
 
 
 @Panel.register

+ 24 - 20
celery/worker/job.py

@@ -34,7 +34,7 @@ from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.serialization import get_pickled_exception
 from celery.utils.text import truncate
-from celery.utils.timeutils import maybe_iso8601, timezone
+from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
 
 from . import state
 
@@ -45,9 +45,8 @@ _does_debug = logger.isEnabledFor(logging.DEBUG)
 _does_info = logger.isEnabledFor(logging.INFO)
 
 # Localize
-tz_to_local = timezone.to_local
-tz_or_local = timezone.tz_or_local
 tz_utc = timezone.utc
+tz_or_local = timezone.tz_or_local
 send_revoked = signals.task_revoked.send
 
 task_accepted = state.task_accepted
@@ -64,7 +63,7 @@ class Request(object):
                  'eventer', 'connection_errors',
                  'task', 'eta', 'expires',
                  'request_dict', 'acknowledged', 'success_msg',
-                 'error_msg', 'retry_msg', 'ignore_msg',
+                 'error_msg', 'retry_msg', 'ignore_msg', 'utc',
                  'time_start', 'worker_pid', '_already_revoked',
                  '_terminate_on_ack', '_tzlocal')
 
@@ -108,7 +107,7 @@ class Request(object):
             self.kwargs = kwdict(self.kwargs)
         eta = body.get('eta')
         expires = body.get('expires')
-        utc = body.get('utc', False)
+        utc = self.utc = body.get('utc', False)
         self.on_ack = on_ack
         self.hostname = hostname or socket.gethostname()
         self.eventer = eventer
@@ -121,14 +120,15 @@ class Request(object):
         # timezone means the message is timezone-aware, and the only timezone
         # supported at this point is UTC.
         if eta is not None:
-            tz = tz_utc if utc else self.tzlocal
-            self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
+            self.eta = maybe_iso8601(eta)
+            if utc:
+                self.eta = maybe_make_aware(self.eta, self.tzlocal)
         else:
             self.eta = None
         if expires is not None:
-            tz = tz_utc if utc else self.tzlocal
-            self.expires = tz_to_local(maybe_iso8601(expires),
-                                       self.tzlocal, tz)
+            self.expires = maybe_iso8601(expires)
+            if utc:
+                self.expires = maybe_make_aware(self.expires, self.tzlocal)
         else:
             self.expires = None
 
@@ -241,9 +241,11 @@ class Request(object):
 
     def maybe_expire(self):
         """If expired, mark the task as revoked."""
-        if self.expires and datetime.now(self.tzlocal) > self.expires:
-            revoked_tasks.add(self.id)
-            return True
+        if self.expires:
+            now = datetime.now(tz_or_local(self.tzlocal) if self.utc else None)
+            if now > self.expires:
+                revoked_tasks.add(self.id)
+                return True
 
     def terminate(self, pool, signal=None):
         if self.time_start:
@@ -355,19 +357,21 @@ class Request(object):
         task_ready(self)
 
         if not exc_info.internal:
+            exc = exc_info.exception
 
-            if isinstance(exc_info.exception, exceptions.RetryTaskError):
+            if isinstance(exc, exceptions.RetryTaskError):
                 return self.on_retry(exc_info)
 
-            # This is a special case as the process would not have had
+            # These are special cases where the process would not have had
             # time to write the result.
-            if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
-                    self.store_errors:
-                self.task.backend.mark_as_failure(self.id, exc_info.exception)
+            if self.store_errors:
+                if isinstance(exc, exceptions.WorkerLostError):
+                    self.task.backend.mark_as_failure(self.id, exc)
+                elif isinstance(exc, exceptions.Terminated):
+                    self._announce_revoked('terminated', True, str(exc), False)
             # (acks_late) acknowledge after result stored.
             if self.task.acks_late:
                 self.acknowledge()
-
         self._log_error(exc_info)
 
     def _log_error(self, einfo):
@@ -456,7 +460,7 @@ class Request(object):
     @property
     def tzlocal(self):
         if self._tzlocal is None:
-            self._tzlocal = tz_or_local(self.app.conf.CELERY_TIMEZONE)
+            self._tzlocal = self.app.conf.CELERY_TIMEZONE
         return self._tzlocal
 
     @property

+ 75 - 37
extra/generic-init.d/celerybeat

@@ -21,8 +21,8 @@
 # abnormally in the absence of a valid process ID.
 #set -e
 
-DEFAULT_PID_FILE="/var/run/celerybeat.pid"
-DEFAULT_LOG_FILE="/var/log/celerybeat.log"
+DEFAULT_PID_FILE="/var/run/celery/beat.pid"
+DEFAULT_LOG_FILE="/var/log/celery/beat.log"
 DEFAULT_LOG_LEVEL="INFO"
 DEFAULT_CELERYBEAT="celery beat"
 
@@ -37,9 +37,17 @@ if test -f /etc/default/celerybeat; then
 fi
 
 CELERYBEAT=${CELERYBEAT:-$DEFAULT_CELERYBEAT}
-CELERYBEAT_PID_FILE=${CELERYBEAT_PID_FILE:-${CELERYBEAT_PIDFILE:-$DEFAULT_PID_FILE}}
-CELERYBEAT_LOG_FILE=${CELERYBEAT_LOG_FILE:-${CELERYBEAT_LOGFILE:-$DEFAULT_LOG_FILE}}
 CELERYBEAT_LOG_LEVEL=${CELERYBEAT_LOG_LEVEL:-${CELERYBEAT_LOGLEVEL:-$DEFAULT_LOG_LEVEL}}
+CELERY_CREATE_RUNDIR=0
+CELERY_CREATE_LOGDIR=0
+if [ -z "$CELERYBEAT_PID_FILE" ]; then
+    CELERYBEAT_PID_FILE="$DEFAULT_PID_FILE"
+    CELERY_CREATE_RUNDIR=1
+fi
+if [ -z "$CELERYBEAT_LOG_FILE" ]; then
+    CELERYBEAT_LOG_FILE="$DEFAULT_LOG_FILE"
+    CELERY_CREATE_LOGDIR=1
+fi
 
 export CELERY_LOADER
 
@@ -51,21 +59,13 @@ fi
 
 CELERYBEAT_LOG_DIR=`dirname $CELERYBEAT_LOG_FILE`
 CELERYBEAT_PID_DIR=`dirname $CELERYBEAT_PID_FILE`
-if [ ! -d "$CELERYBEAT_LOG_DIR" ]; then
-    mkdir -p $CELERYBEAT_LOG_DIR
-fi
-if [ ! -d "$CELERYBEAT_PID_DIR" ]; then
-    mkdir -p $CELERYBEAT_PID_DIR
-fi
 
 # Extra start-stop-daemon options, like user/group.
 if [ -n "$CELERYBEAT_USER" ]; then
     DAEMON_OPTS="$DAEMON_OPTS --uid $CELERYBEAT_USER"
-    chown "$CELERYBEAT_USER" $CELERYBEAT_LOG_DIR $CELERYBEAT_PID_DIR
 fi
 if [ -n "$CELERYBEAT_GROUP" ]; then
     DAEMON_OPTS="$DAEMON_OPTS --gid $CELERYBEAT_GROUP"
-    chgrp "$CELERYBEAT_GROUP" $CELERYBEAT_LOG_DIR $CELERYBEAT_PID_DIR
 fi
 
 CELERYBEAT_CHDIR=${CELERYBEAT_CHDIR:-$CELERYD_CHDIR}
@@ -79,21 +79,51 @@ export PATH="${PATH:+$PATH:}/usr/sbin:/sbin"
 check_dev_null() {
     if [ ! -c /dev/null ]; then
         echo "/dev/null is not a character device!"
-        exit 1
+        exit 75  # EX_TEMPFAIL
     fi
 }
 
-ensure_dir() {
-    if [ -d "$1" ]; then
+maybe_die() {
+    if [ $? -ne 0 ]; then
+        echo "Exiting: $*"
+        exit 77  # EX_NOPERM
+    fi
+}
+
+create_default_dir() {
+    if [ ! -d "$1" ]; then
+        echo "- Creating default directory: '$1'"
         mkdir -p "$1"
-        chown $CELERYBEAT_USER:$CELERYBEAT_GROUP "$1"
+        maybe_die "Couldn't create directory $1"
+        echo "- Changing permissions of '$1' to 02755"
         chmod 02755 "$1"
+        maybe_die "Couldn't change permissions for $1"
+        if [ -n "$CELERYBEAT_USER" ]; then
+            echo "- Changing owner of '$1' to '$CELERYBEAT_USER'"
+            chown "$CELERYBEAT_USER" "$1"
+            maybe_die "Couldn't change owner of $1"
+        fi
+        if [ -n "$CELERYBEAT_GROUP" ]; then
+            echo "- Changing group of '$1' to '$CELERYBEAT_GROUP'"
+            chgrp "$CELERYBEAT_GROUP" "$1"
+            maybe_die "Couldn't change group of $1"
+        fi
     fi
 }
 
 check_paths() {
-    ensure_dir "$(dirname $CELERYBEAT_PID_FILE)"
-    ensure_dir "$(dirname $CELERYBEAT_LOG_FILE)"
+    if [ $CELERY_CREATE_LOGDIR -eq 1 ]; then
+        create_default_dir "$CELERYBEAT_LOG_DIR"
+    fi
+    if [ $CELERY_CREATE_RUNDIR -eq 1 ]; then
+        create_default_dir "$CELERYBEAT_PID_DIR"
+    fi
+}
+
+
+create_paths () {
+    create_default_dir "$CELERYBEAT_LOG_DIR"
+    create_default_dir "$CELERYBEAT_PID_DIR"
 }
 
 
@@ -142,29 +172,37 @@ start_beat () {
 
 
 case "$1" in
-  start)
-    check_dev_null
-    check_paths
-    start_beat
+    start)
+        check_dev_null
+        check_paths
+        start_beat
     ;;
-  stop)
-    check_paths
-    stop_beat
+    stop)
+        check_paths
+        stop_beat
     ;;
-  reload|force-reload)
-    echo "Use start+stop"
+    reload|force-reload)
+        echo "Use start+stop"
     ;;
-  restart)
-    echo "Restarting celery periodic task scheduler"
-    check_paths
-    stop_beat
-    check_dev_null
-    start_beat
+    restart)
+        echo "Restarting celery periodic task scheduler"
+        check_paths
+        stop_beat
+        check_dev_null
+        start_beat
+    ;;
+    create-paths)
+        check_dev_null
+        create_paths
+    ;;
+    check-paths)
+        check_dev_null
+        check_paths
+    ;;
+    *)
+        echo "Usage: /etc/init.d/celerybeat {start|stop|restart|create-paths}"
+        exit 64  # EX_USAGE
     ;;
-
-  *)
-    echo "Usage: /etc/init.d/celerybeat {start|stop|restart}"
-    exit 1
 esac
 
 exit 0

+ 60 - 22
extra/generic-init.d/celeryd

@@ -20,8 +20,8 @@
 
 #set -e
 
-DEFAULT_PID_FILE="/var/run/celeryd@%n.pid"
-DEFAULT_LOG_FILE="/var/log/celeryd@%n.log"
+DEFAULT_PID_FILE="/var/run/celery/%n.pid"
+DEFAULT_LOG_FILE="/var/log/celery/%n.log"
 DEFAULT_LOG_LEVEL="INFO"
 DEFAULT_NODES="celery"
 DEFAULT_CELERYD="-m celery worker --detach"
@@ -35,8 +35,17 @@ if [ -f "/etc/default/celeryd" ]; then
     . /etc/default/celeryd
 fi
 
-CELERYD_PID_FILE=${CELERYD_PID_FILE:-${CELERYD_PIDFILE:-$DEFAULT_PID_FILE}}
-CELERYD_LOG_FILE=${CELERYD_LOG_FILE:-${CELERYD_LOGFILE:-$DEFAULT_LOG_FILE}}
+CELERY_CREATE_RUNDIR=0
+CELERY_CREATE_LOGDIR=0
+if [ -z "$CELERYD_PID_FILE" ]; then
+    CELERYD_PID_FILE="$DEFAULT_PID_FILE"
+    CELERY_CREATE_RUNDIR=1
+fi
+if [ -z "$CELERYD_LOG_FILE" ]; then
+    CELERYD_LOG_FILE="$DEFAULT_LOG_FILE"
+    CELERY_CREATE_LOGDIR=1
+fi
+
 CELERYD_LOG_LEVEL=${CELERYD_LOG_LEVEL:-${CELERYD_LOGLEVEL:-$DEFAULT_LOG_LEVEL}}
 CELERYD_MULTI=${CELERYD_MULTI:-"celery multi"}
 CELERYD=${CELERYD:-$DEFAULT_CELERYD}
@@ -51,21 +60,13 @@ fi
 
 CELERYD_LOG_DIR=`dirname $CELERYD_LOG_FILE`
 CELERYD_PID_DIR=`dirname $CELERYD_PID_FILE`
-if [ ! -d "$CELERYD_LOG_DIR" ]; then
-    mkdir -p $CELERYD_LOG_DIR
-fi
-if [ ! -d "$CELERYD_PID_DIR" ]; then
-    mkdir -p $CELERYD_PID_DIR
-fi
 
 # Extra start-stop-daemon options, like user/group.
 if [ -n "$CELERYD_USER" ]; then
     DAEMON_OPTS="$DAEMON_OPTS --uid=$CELERYD_USER"
-    chown "$CELERYD_USER" $CELERYD_LOG_DIR $CELERYD_PID_DIR
 fi
 if [ -n "$CELERYD_GROUP" ]; then
     DAEMON_OPTS="$DAEMON_OPTS --gid=$CELERYD_GROUP"
-    chgrp "$CELERYD_GROUP" $CELERYD_LOG_DIR $CELERYD_PID_DIR
 fi
 
 if [ -n "$CELERYD_CHDIR" ]; then
@@ -76,21 +77,52 @@ fi
 check_dev_null() {
     if [ ! -c /dev/null ]; then
         echo "/dev/null is not a character device!"
-        exit 1
+        exit 75  # EX_TEMPFAIL
     fi
 }
 
-ensure_dir() {
-    if [ -d "$1" ]; then
+
+maybe_die() {
+    if [ $? -ne 0 ]; then
+        echo "Exiting: $* (errno $?)"
+        exit 77  # EX_NOPERM
+    fi
+}
+
+create_default_dir() {
+    if [ ! -d "$1" ]; then
+        echo "- Creating default directory: '$1'"
         mkdir -p "$1"
-        chown $CELERYD_USER:$CELERYD_GROUP "$1"
+        maybe_die "Couldn't create directory $1"
+        echo "- Changing permissions of '$1' to 02755"
         chmod 02755 "$1"
+        maybe_die "Couldn't change permissions for $1"
+        if [ -n "$CELERYD_USER" ]; then
+            echo "- Changing owner of '$1' to '$CELERYD_USER'"
+            chown "$CELERYD_USER" "$1"
+            maybe_die "Couldn't change owner of $1"
+        fi
+        if [ -n "$CELERYD_GROUP" ]; then
+            echo "- Changing group of '$1' to '$CELERYD_GROUP'"
+            chgrp "$CELERYD_GROUP" "$1"
+            maybe_die "Couldn't change group of $1"
+        fi
     fi
 }
 
+
 check_paths() {
-    ensure_dir "$(dirname $CELERYD_PID_FILE)"
-    ensure_dir "$(dirname $CELERYD_LOG_FILE)"
+    if [ $CELERY_CREATE_LOGDIR -eq 1 ]; then
+        create_default_dir "$CELERYD_LOG_DIR"
+    fi
+    if [ $CELERY_CREATE_RUNDIR -eq 1 ]; then
+        create_default_dir "$CELERYD_PID_DIR"
+    fi
+}
+
+create_paths() {
+    create_default_dir "$CELERYD_LOG_DIR"
+    create_default_dir "$CELERYD_PID_DIR"
 }
 
 export PATH="${PATH:+$PATH:}/usr/sbin:/sbin"
@@ -148,16 +180,22 @@ case "$1" in
         check_paths
         restart_workers
     ;;
-
     try-restart)
         check_dev_null
         check_paths
         restart_workers
     ;;
-
+    create-paths)
+        check_dev_null
+        create_paths
+    ;;
+    check-paths)
+        check_dev_null
+        check_paths
+    ;;
     *)
-        echo "Usage: /etc/init.d/celeryd {start|stop|restart|try-restart|kill}"
-        exit 1
+        echo "Usage: /etc/init.d/celeryd {start|stop|restart|kill|create_paths}"
+        exit 64  # EX_USAGE
     ;;
 esac
 

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz
-billiard>=2.7.3.15
+billiard>=2.7.3.17
 kombu>=2.4.7,<3.0

+ 1 - 1
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 [bdist_rpm]
 requires = pytz
-           billiard>=2.7.3.15
+           billiard>=2.7.3.17
            kombu >= 2.4.7