Quellcode durchsuchen

Merge branch '3.0'

Conflicts:
	Changelog
	celery/worker/consumer.py
	celery/worker/control.py
	requirements/default-py3k.txt
	requirements/default.txt
	setup.cfg
Ask Solem vor 12 Jahren
Ursprung
Commit
6a3b1ff6f5

+ 36 - 8
Changelog

@@ -9,6 +9,17 @@
 
 If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
+.. _version-3.1.0:
+
+3.1.0
+=====
+:state: DEVEL
+:branch: master
+
+- `celery inspect stats` now contains worker pid
+- `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
+- `App.control.Inspect.conf` can be used for inspecting worker configuration
+
 .. _version-3.0.11:
 
 3.0.11
@@ -51,16 +62,33 @@ If you're looking for versions prior to 3.0.x you should go to :ref:`history`.
 
             http://github.com/celery/celery/tree/3.0/extra/generic-init.d
 
-.. _version-3.1.0:
+- Now depends on billiard 2.7.3.17
 
-3.1.0
-=====
-:state: DEVEL
-:branch: master
+- Fixes request stack protection when app is initialized more than
+  once (Issue #1003).
 
-- `celery inspect stats` now contains worker pid
-- `Task.apply_async` now supports timeout and soft_timeout arguments (Issue #802)
-- `App.control.Inspect.conf` can be used for inspecting worker configuration
+- ETA tasks now properly works when system timezone is not the same
+  as the configured timezone (Issue #1004).
+
+- Terminating a task now works if the task has been sent to the
+  pool but not yet acknowledged by a pool process (Issue #1007).
+
+    Fix contributed by Alexey Zatelepin
+
+- Terminating a task now properly updates the state of the task to revoked,
+  and sends a ``task-revoked`` event.
+
+- :program:`celery worker` and :program:`celery beat` commands now respects
+  the :option:`--no-color` option (Issue #999).
+
+- Fixed typos in eventlet examples (Issue #1000)
+
+    Fix contributed by Bryan Bishop.
+    Congratulations on opening bug #1000!
+
+- Tasks that raise :exc:`~celery.exceptions.Ignore` are now acknowledged.
+
+- Beat: Now shows the name of the entry in ``sending due task`` logs.
 
 .. _version-3.0.10:
 

+ 1 - 1
celery/concurrency/processes.py

@@ -142,4 +142,4 @@ class TaskPool(BasePool):
 
     @property
     def timers(self):
-        return {self.maintain_pool: 30.0}
+        return {self.maintain_pool: 5.0}

+ 1 - 1
celery/exceptions.py

@@ -9,7 +9,7 @@
 from __future__ import absolute_import
 
 from billiard.exceptions import (  # noqa
-    SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError,
+    SoftTimeLimitExceeded, TimeLimitExceeded, WorkerLostError, Terminated,
 )
 
 UNREGISTERED_FMT = """\

+ 4 - 4
celery/tests/worker/test_control.py

@@ -351,17 +351,17 @@ class test_ControlPanel(Case):
     def test_revoke_terminate(self):
         request = Mock()
         request.id = tid = uuid()
-        state.active_requests.add(request)
+        state.reserved_requests.add(request)
         try:
             r = control.revoke(Mock(), tid, terminate=True)
             self.assertIn(tid, revoked)
             self.assertTrue(request.terminate.call_count)
-            self.assertIn('terminated', r['ok'])
+            self.assertIn('terminating', r['ok'])
             # unknown task id only revokes
             r = control.revoke(Mock(), uuid(), terminate=True)
-            self.assertIn('revoked', r['ok'])
+            self.assertIn('not found', r['ok'])
         finally:
-            state.active_requests.discard(request)
+            state.reserved_requests.discard(request)
 
     def test_autoscale(self):
         self.panel.state.consumer = Mock()

+ 6 - 10
celery/utils/timeutils.py

@@ -90,13 +90,6 @@ class LocalTimezone(tzinfo):
         return tt.tm_isdst > 0
 
 
-def _get_local_timezone():
-    global _local_timezone
-    if _local_timezone is None:
-        _local_timezone = LocalTimezone()
-    return _local_timezone
-
-
 class _Zone(object):
 
     def tz_or_local(self, tzinfo=None):
@@ -109,10 +102,13 @@ class _Zone(object):
             dt = make_aware(dt, orig or self.utc)
         return localize(dt, self.tz_or_local(local))
 
+    def to_system(self, dt):
+        return localize(dt, self.local)
+
     def to_local_fallback(self, dt, *args, **kwargs):
         if is_naive(dt):
-            return make_aware(dt, _get_local_timezone())
-        return localize(dt, _get_local_timezone())
+            return make_aware(dt, self.local)
+        return localize(dt, self.local)
 
     def get_timezone(self, zone):
         if isinstance(zone, basestring):
@@ -121,7 +117,7 @@ class _Zone(object):
 
     @cached_property
     def local(self):
-        return _get_local_timezone()
+        return LocalTimezone()
 
     @cached_property
     def utc(self):

+ 6 - 4
celery/worker/consumer.py

@@ -30,7 +30,7 @@ from celery.utils import text
 from celery.utils import timer2
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
-from celery.utils.timeutils import humanize_seconds
+from celery.utils.timeutils import humanize_seconds, timezone
 
 from . import state
 from .bootsteps import StartStopComponent, RUN, CLOSE
@@ -393,16 +393,18 @@ class Consumer(object):
                     expires=task.expires and task.expires.isoformat())
 
         if task.eta:
+            eta = timezone.to_system(task.eta) if task.utc else task.eta
             try:
-                eta = timer2.to_timestamp(task.eta)
+                eta = timer2.to_timestamp(eta)
             except OverflowError as exc:
                 error("Couldn't convert eta %s to timestamp: %r. Task: %r",
                       task.eta, exc, task.info(safe=True), exc_info=True)
                 task.acknowledge()
             else:
                 self.qos.increment_eventually()
-                self.timer.apply_at(eta, self.apply_eta_task, (task, ),
-                                    priority=6)
+                self.timer.apply_at(
+                    eta, self.apply_eta_task, (task, ), priority=6,
+                )
         else:
             task_reserved(task)
             self._quick_put(task)

+ 7 - 5
celery/worker/control.py

@@ -39,17 +39,19 @@ class Panel(UserDict):
 def revoke(panel, task_id, terminate=False, signal=None, **kwargs):
     """Revoke task by task id."""
     revoked.add(task_id)
-    action = 'revoked'
     if terminate:
         signum = _signals.signum(signal or 'TERM')
-        for request in state.active_requests:
+        for request in state.reserved_requests:
             if request.id == task_id:
-                action = 'terminated ({0})'.format(signum)
+                logger.info('Terminating %s (%s)', task_id, signum)
                 request.terminate(panel.consumer.pool, signal=signum)
                 break
+        else:
+            return {'ok': 'terminate: task {0} not found'.format(task_id)}
+        return {'ok': 'terminating {0} ({1})'.format(task_id, signal)}
 
-    logger.info('Task %s %s.', task_id, action)
-    return {'ok': 'task {0} {1}'.format(task_id, action)}
+    logger.info('Revoking task %s', task_id)
+    return {'ok': 'revoking task {0}'.format(task_id)}
 
 
 @Panel.register

+ 24 - 20
celery/worker/job.py

@@ -34,7 +34,7 @@ from celery.utils.functional import noop
 from celery.utils.log import get_logger
 from celery.utils.serialization import get_pickled_exception
 from celery.utils.text import truncate
-from celery.utils.timeutils import maybe_iso8601, timezone
+from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
 
 from . import state
 
@@ -45,9 +45,8 @@ _does_debug = logger.isEnabledFor(logging.DEBUG)
 _does_info = logger.isEnabledFor(logging.INFO)
 
 # Localize
-tz_to_local = timezone.to_local
-tz_or_local = timezone.tz_or_local
 tz_utc = timezone.utc
+tz_or_local = timezone.tz_or_local
 send_revoked = signals.task_revoked.send
 
 task_accepted = state.task_accepted
@@ -64,7 +63,7 @@ class Request(object):
                  'eventer', 'connection_errors',
                  'task', 'eta', 'expires',
                  'request_dict', 'acknowledged', 'success_msg',
-                 'error_msg', 'retry_msg', 'ignore_msg',
+                 'error_msg', 'retry_msg', 'ignore_msg', 'utc',
                  'time_start', 'worker_pid', '_already_revoked',
                  '_terminate_on_ack', '_tzlocal')
 
@@ -108,7 +107,7 @@ class Request(object):
             self.kwargs = kwdict(self.kwargs)
         eta = body.get('eta')
         expires = body.get('expires')
-        utc = body.get('utc', False)
+        utc = self.utc = body.get('utc', False)
         self.on_ack = on_ack
         self.hostname = hostname or socket.gethostname()
         self.eventer = eventer
@@ -121,14 +120,15 @@ class Request(object):
         # timezone means the message is timezone-aware, and the only timezone
         # supported at this point is UTC.
         if eta is not None:
-            tz = tz_utc if utc else self.tzlocal
-            self.eta = tz_to_local(maybe_iso8601(eta), self.tzlocal, tz)
+            self.eta = maybe_iso8601(eta)
+            if utc:
+                self.eta = maybe_make_aware(self.eta, self.tzlocal)
         else:
             self.eta = None
         if expires is not None:
-            tz = tz_utc if utc else self.tzlocal
-            self.expires = tz_to_local(maybe_iso8601(expires),
-                                       self.tzlocal, tz)
+            self.expires = maybe_iso8601(expires)
+            if utc:
+                self.expires = maybe_make_aware(self.expires, self.tzlocal)
         else:
             self.expires = None
 
@@ -241,9 +241,11 @@ class Request(object):
 
     def maybe_expire(self):
         """If expired, mark the task as revoked."""
-        if self.expires and datetime.now(self.tzlocal) > self.expires:
-            revoked_tasks.add(self.id)
-            return True
+        if self.expires:
+            now = datetime.now(tz_or_local(self.tzlocal) if self.utc else None)
+            if now > self.expires:
+                revoked_tasks.add(self.id)
+                return True
 
     def terminate(self, pool, signal=None):
         if self.time_start:
@@ -355,19 +357,21 @@ class Request(object):
         task_ready(self)
 
         if not exc_info.internal:
+            exc = exc_info.exception
 
-            if isinstance(exc_info.exception, exceptions.RetryTaskError):
+            if isinstance(exc, exceptions.RetryTaskError):
                 return self.on_retry(exc_info)
 
-            # This is a special case as the process would not have had
+            # These are special cases where the process would not have had
             # time to write the result.
-            if isinstance(exc_info.exception, exceptions.WorkerLostError) and \
-                    self.store_errors:
-                self.task.backend.mark_as_failure(self.id, exc_info.exception)
+            if self.store_errors:
+                if isinstance(exc, exceptions.WorkerLostError):
+                    self.task.backend.mark_as_failure(self.id, exc)
+                elif isinstance(exc, exceptions.Terminated):
+                    self._announce_revoked('terminated', True, str(exc), False)
             # (acks_late) acknowledge after result stored.
             if self.task.acks_late:
                 self.acknowledge()
-
         self._log_error(exc_info)
 
     def _log_error(self, einfo):
@@ -456,7 +460,7 @@ class Request(object):
     @property
     def tzlocal(self):
         if self._tzlocal is None:
-            self._tzlocal = tz_or_local(self.app.conf.CELERY_TIMEZONE)
+            self._tzlocal = self.app.conf.CELERY_TIMEZONE
         return self._tzlocal
 
     @property

+ 1 - 1
requirements/default.txt

@@ -1,3 +1,3 @@
 pytz
-billiard>=2.7.3.15
+billiard>=2.7.3.17
 kombu>=2.4.7,<3.0

+ 1 - 1
setup.cfg

@@ -15,5 +15,5 @@ upload-dir = docs/.build/html
 
 [bdist_rpm]
 requires = pytz
-           billiard>=2.7.3.15
+           billiard>=2.7.3.17
            kombu >= 2.4.7