test_timeutils.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from __future__ import absolute_import
  2. import pytz
  3. from datetime import datetime, timedelta
  4. from celery.utils.timeutils import (
  5. delta_resolution,
  6. humanize_seconds,
  7. maybe_iso8601,
  8. maybe_timedelta,
  9. timedelta_seconds,
  10. timezone,
  11. rate,
  12. remaining,
  13. )
  14. from celery.utils.iso8601 import parse_iso8601
  15. from celery.tests.utils import Case
  16. class test_iso8601(Case):
  17. def test_parse_with_timezone(self):
  18. d = datetime.utcnow().replace(tzinfo=pytz.utc)
  19. self.assertEqual(parse_iso8601(d.isoformat()), d)
  20. # 2013-06-07T20:12:51.775877+00:00
  21. iso = d.isoformat()
  22. iso1 = iso.replace('+00:00', '-01:00')
  23. d1 = parse_iso8601(iso1)
  24. self.assertEqual(d1.tzinfo._minutes, -60)
  25. iso2 = iso.replace('+00:00', '+01:00')
  26. d2 = parse_iso8601(iso2)
  27. self.assertEqual(d2.tzinfo._minutes, +60)
  28. class test_timeutils(Case):
  29. def test_delta_resolution(self):
  30. D = delta_resolution
  31. dt = datetime(2010, 3, 30, 11, 50, 58, 41065)
  32. deltamap = ((timedelta(days=2), datetime(2010, 3, 30, 0, 0)),
  33. (timedelta(hours=2), datetime(2010, 3, 30, 11, 0)),
  34. (timedelta(minutes=2), datetime(2010, 3, 30, 11, 50)),
  35. (timedelta(seconds=2), dt))
  36. for delta, shoulda in deltamap:
  37. self.assertEqual(D(dt, delta), shoulda)
  38. def test_timedelta_seconds(self):
  39. deltamap = ((timedelta(seconds=1), 1),
  40. (timedelta(seconds=27), 27),
  41. (timedelta(minutes=3), 3 * 60),
  42. (timedelta(hours=4), 4 * 60 * 60),
  43. (timedelta(days=3), 3 * 86400))
  44. for delta, seconds in deltamap:
  45. self.assertEqual(timedelta_seconds(delta), seconds)
  46. def test_timedelta_seconds_returns_0_on_negative_time(self):
  47. delta = timedelta(days=-2)
  48. self.assertEqual(timedelta_seconds(delta), 0)
  49. def test_humanize_seconds(self):
  50. t = ((4 * 60 * 60 * 24, '4.00 days'),
  51. (1 * 60 * 60 * 24, '1.00 day'),
  52. (4 * 60 * 60, '4.00 hours'),
  53. (1 * 60 * 60, '1.00 hour'),
  54. (4 * 60, '4.00 minutes'),
  55. (1 * 60, '1.00 minute'),
  56. (4, '4.00 seconds'),
  57. (1, '1.00 second'),
  58. (4.3567631221, '4.36 seconds'),
  59. (0, 'now'))
  60. for seconds, human in t:
  61. self.assertEqual(humanize_seconds(seconds), human)
  62. self.assertEqual(humanize_seconds(4, prefix='about '),
  63. 'about 4.00 seconds')
  64. def test_maybe_iso8601_datetime(self):
  65. now = datetime.now()
  66. self.assertIs(maybe_iso8601(now), now)
  67. def test_maybe_timedelta(self):
  68. D = maybe_timedelta
  69. for i in (30, 30.6):
  70. self.assertEqual(D(i), timedelta(seconds=i))
  71. self.assertEqual(D(timedelta(days=2)), timedelta(days=2))
  72. def test_remaining_relative(self):
  73. remaining(datetime.utcnow(), timedelta(hours=1), relative=True)
  74. class test_timezone(Case):
  75. def test_get_timezone_with_pytz(self):
  76. self.assertTrue(timezone.get_timezone('UTC'))
  77. class test_rate_limit_string(Case):
  78. def test_conversion(self):
  79. self.assertEqual(rate(999), 999)
  80. self.assertEqual(rate(7.5), 7.5)
  81. self.assertEqual(rate('2.5/s'), 2.5)
  82. self.assertEqual(rate('1456/s'), 1456)
  83. self.assertEqual(rate('100/m'),
  84. 100 / 60.0)
  85. self.assertEqual(rate('10/h'),
  86. 10 / 60.0 / 60.0)
  87. for zero in (0, None, '0', '0/m', '0/h', '0/s', '0.0/s'):
  88. self.assertEqual(rate(zero), 0)