test_task_http.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # -*- coding: utf-8 -*-
  2. from __future__ import with_statement, generators
  3. import sys
  4. import logging
  5. import unittest
  6. from urllib import addinfourl
  7. from contextlib import contextmanager
  8. try:
  9. from cStringIO import StringIO
  10. except ImportError:
  11. from StringIO import StringIO
  12. from billiard.utils.functional import wraps
  13. from anyjson import serialize
  14. from celery.task import http
  15. from celery.tests.utils import eager_tasks
  16. @contextmanager
  17. def mock_urlopen(response_method):
  18. import urllib2
  19. urlopen = urllib2.urlopen
  20. @wraps(urlopen)
  21. def _mocked(url, *args, **kwargs):
  22. response_data, headers = response_method(url)
  23. return addinfourl(StringIO(response_data), headers, url)
  24. urllib2.urlopen = _mocked
  25. yield True
  26. urllib2.urlopen = urlopen
  27. def _response(res):
  28. return lambda r: (res, [])
  29. def success_response(value):
  30. return _response(serialize({"status": "success", "retval": value}))
  31. def fail_response(reason):
  32. return _response(serialize({"status": "failure", "reason": reason}))
  33. def unknown_response():
  34. return _response(serialize({"status": "u.u.u.u", "retval": True}))
  35. class TestEncodings(unittest.TestCase):
  36. def test_utf8dict(self):
  37. d = {u"følelser ær langé": u"ærbadægzaå寨Å",
  38. "foobar".encode("utf-8"): "xuzzybaz".encode("utf-8")}
  39. for key, value in http.utf8dict(d.items()).items():
  40. self.assertTrue(isinstance(key, str))
  41. self.assertTrue(isinstance(value, str))
  42. class TestMutableURL(unittest.TestCase):
  43. def test_url_query(self):
  44. url = http.MutableURL("http://example.com?x=10&y=20&z=Foo")
  45. self.assertEquals(url.query.get("x"), "10")
  46. self.assertEquals(url.query.get("y"), "20")
  47. self.assertEquals(url.query.get("z"), "Foo")
  48. url.query["name"] = "George"
  49. url = http.MutableURL(str(url))
  50. self.assertEquals(url.query.get("x"), "10")
  51. self.assertEquals(url.query.get("y"), "20")
  52. self.assertEquals(url.query.get("z"), "Foo")
  53. self.assertEquals(url.query.get("name"), "George")
  54. def test_url_keeps_everything(self):
  55. url = "https://e.com:808/foo/bar#zeta?x=10&y=20"
  56. url = http.MutableURL(url)
  57. self.assertEquals(str(url).split("?")[0],
  58. "https://e.com:808/foo/bar#zeta")
  59. def test___repr__(self):
  60. url = http.MutableURL("http://e.com/foo/bar")
  61. self.assertTrue(repr(url).startswith("<MutableURL: http://e.com"))
  62. def test_set_query(self):
  63. url = http.MutableURL("http://e.com/foo/bar/?x=10")
  64. url.query = {"zzz": "xxx"}
  65. url = http.MutableURL(str(url))
  66. self.assertEquals(url.query, {"zzz": "xxx"})
  67. class TestHttpDispatch(unittest.TestCase):
  68. def test_dispatch_success(self):
  69. logger = logging.getLogger("celery.unittest")
  70. with mock_urlopen(success_response(100)):
  71. d = http.HttpDispatch("http://example.com/mul", "GET", {
  72. "x": 10, "y": 10}, logger)
  73. self.assertEquals(d.dispatch(), 100)
  74. def test_dispatch_failure(self):
  75. logger = logging.getLogger("celery.unittest")
  76. with mock_urlopen(fail_response("Invalid moon alignment")):
  77. d = http.HttpDispatch("http://example.com/mul", "GET", {
  78. "x": 10, "y": 10}, logger)
  79. self.assertRaises(http.RemoteExecuteError, d.dispatch)
  80. def test_dispatch_empty_response(self):
  81. logger = logging.getLogger("celery.unittest")
  82. with mock_urlopen(_response("")):
  83. d = http.HttpDispatch("http://example.com/mul", "GET", {
  84. "x": 10, "y": 10}, logger)
  85. self.assertRaises(http.InvalidResponseError, d.dispatch)
  86. def test_dispatch_non_json(self):
  87. logger = logging.getLogger("celery.unittest")
  88. with mock_urlopen(_response("{'#{:'''")):
  89. d = http.HttpDispatch("http://example.com/mul", "GET", {
  90. "x": 10, "y": 10}, logger)
  91. self.assertRaises(http.InvalidResponseError, d.dispatch)
  92. def test_dispatch_unknown_status(self):
  93. logger = logging.getLogger("celery.unittest")
  94. with mock_urlopen(unknown_response()):
  95. d = http.HttpDispatch("http://example.com/mul", "GET", {
  96. "x": 10, "y": 10}, logger)
  97. self.assertRaises(http.UnknownStatusError, d.dispatch)
  98. def test_dispatch_POST(self):
  99. logger = logging.getLogger("celery.unittest")
  100. with mock_urlopen(success_response(100)):
  101. d = http.HttpDispatch("http://example.com/mul", "POST", {
  102. "x": 10, "y": 10}, logger)
  103. self.assertEquals(d.dispatch(), 100)
  104. class TestURL(unittest.TestCase):
  105. def test_URL_get_async(self):
  106. with eager_tasks():
  107. with mock_urlopen(success_response(100)):
  108. d = http.URL("http://example.com/mul").get_async(x=10, y=10)
  109. self.assertEquals(d.get(), 100)
  110. def test_URL_post_async(self):
  111. with eager_tasks():
  112. with mock_urlopen(success_response(100)):
  113. d = http.URL("http://example.com/mul").post_async(x=10, y=10)
  114. self.assertEquals(d.get(), 100)