Ver Fonte

[task] Raise if countdown/expires is less than INT_MIN. Closes #3078

Ask Solem há 9 anos atrás
pai
commit
b443510af3
3 ficheiros alterados com 36 adições e 1 exclusões
  1. 25 0
      Changelog
  2. 10 0
      celery/app/amqp.py
  3. 1 1
      celery/concurrency/asynpool.py

+ 25 - 0
Changelog

@@ -8,6 +8,31 @@ This document contains change notes for bugfix releases in the 3.1.x series
 (Cipater), please see :ref:`whatsnew-3.1` for an overview of what's
 new in Celery 3.1.
 
+.. _version-3.1.21:
+
+3.1.21
+======
+:release-date: TBA
+:release-by:
+
+- **Prefork pool**: Fixes 100% CPU loop on Linux epoll (Issue #1845).
+
+    Also potential fix for: Issue #2142, Issue #2606
+
+- **Worker**: Fixes crash at startup when trying to censor passwords
+  in MongoDB and Cache result backend URLs (Issue #3079, Issue #3045,
+  Issue #3049, Issue #3068, Issue #3073).
+
+    Fix contributed by Maxime Verger.
+
+- **Programs**: :program:`celery shell --ipython` now compatible with newer
+  IPython versions.
+
+- **Results**: amqp/rpc backends: Fixed deserialization of JSON exceptions
+  (Issue #2518).
+
+    Fix contributed by Allard Hoeve.
+
 .. _version-3.1.20:
 
 3.1.20

+ 10 - 0
celery/app/amqp.py

@@ -30,6 +30,9 @@ from . import routes as _routes
 
 __all__ = ['AMQP', 'Queues', 'TaskProducer', 'TaskConsumer']
 
+#: earliest date supported by time.mktime.
+INT_MIN = -2147483648
+
 #: Human readable queue declaration.
 QUEUE_FORMAT = """
 .> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
@@ -253,11 +256,13 @@ class TaskProducer(Producer):
         if not isinstance(task_kwargs, dict):
             raise ValueError('task kwargs must be a dictionary')
         if countdown:  # Convert countdown to ETA.
+            self._verify_seconds(countdown, 'countdown')
             now = now or self.app.now()
             eta = now + timedelta(seconds=countdown)
             if self.utc:
                 eta = to_utc(eta).astimezone(self.app.timezone)
         if isinstance(expires, numbers.Real):
+            self._verify_seconds(expires, 'expires')
             now = now or self.app.now()
             expires = now + timedelta(seconds=expires)
             if self.utc:
@@ -338,6 +343,11 @@ class TaskProducer(Producer):
         return task_id
     delay_task = publish_task   # XXX Compat
 
+    def _verify_seconds(self, s, what):
+        if s < INT_MIN:
+            raise ValueError('%s is out of range: %r' % (what, s))
+        return s
+
     @cached_property
     def event_dispatcher(self):
         # We call Dispatcher.publish with a custom producer

+ 1 - 1
celery/concurrency/asynpool.py

@@ -794,7 +794,7 @@ class AsynPool(_pool.Pool):
 
         def on_not_recovering(proc, fd, job, exc):
             error('Process inqueue damaged: %r %r: %r',
-                  proc, proc.exitcode, exc, exc_info=1))
+                  proc, proc.exitcode, exc, exc_info=1)
             if proc._is_alive():
                 proc.terminate()
             hub.remove(fd)