Quellcode durchsuchen

[task] Raise if countdown/expires is less than INT_MIN. Closes #3078

Ask Solem vor 9 Jahren
Ursprung
Commit
5295ef8ff5
1 geänderte Dateien mit 12 neuen und 0 gelöschten Zeilen
  1. 12 0
      celery/app/amqp.py

+ 12 - 0
celery/app/amqp.py

@@ -35,6 +35,9 @@ __all__ = ['AMQP', 'Queues', 'task_message']
 
 PY3 = sys.version_info[0] == 3
 
+#: earliest date supported by time.mktime.
+INT_MIN = -2147483648
+
 # json in Python 2.7 borks if dict contains byte keys.
 JSON_NEEDS_UNICODE_KEYS = not PY3 and not try_import('simplejson')
 
@@ -313,12 +316,14 @@ class AMQP(object):
         if not isinstance(kwargs, Mapping):
             raise TypeError('task keyword arguments must be a mapping')
         if countdown:  # convert countdown to ETA
+            self._verify_seconds(countdown, 'countdown')
             now = now or self.app.now()
             timezone = timezone or self.app.timezone
             eta = maybe_make_aware(
                 now + timedelta(seconds=countdown), tz=timezone,
             )
         if isinstance(expires, numbers.Real):
+            self._verify_seconds(expires, 'expires')
             now = now or self.app.now()
             timezone = timezone or self.app.timezone
             expires = maybe_make_aware(
@@ -394,12 +399,14 @@ class AMQP(object):
         if not isinstance(kwargs, Mapping):
             raise ValueError('task keyword arguments must be a mapping')
         if countdown:  # convert countdown to ETA
+            self._verify_seconds(countdown, 'countdown')
             now = now or self.app.now()
             timezone = timezone or self.app.timezone
             eta = now + timedelta(seconds=countdown)
             if utc:
                 eta = to_utc(eta).astimezone(timezone)
         if isinstance(expires, numbers.Real):
+            self._verify_seconds(expires, 'expires')
             now = now or self.app.now()
             timezone = timezone or self.app.timezone
             expires = now + timedelta(seconds=expires)
@@ -449,6 +456,11 @@ class AMQP(object):
             } if create_sent_event else None,
         )
 
+    def _verify_seconds(self, s, what):
+        if s < INT_MIN:
+            raise ValueError('%s is out of range: %r' % (what, s))
+        return s
+
     def _create_task_sender(self):
         default_retry = self.app.conf.task_publish_retry
         default_policy = self.app.conf.task_publish_retry_policy