test_task_http.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # -*- coding: utf-8 -*-
  2. from __future__ import generators
  3. import sys
  4. import logging
  5. import unittest
  6. from urllib import addinfourl
  7. try:
  8. from contextlib import contextmanager
  9. except ImportError:
  10. from testunits.utils import fallback_contextmanager as contextmanager
  11. try:
  12. from cStringIO import StringIO
  13. except ImportError:
  14. from StringIO import StringIO
  15. from billiard.utils.functional import wraps
  16. from anyjson import serialize
  17. from celery.task import http
  18. from testunits.utils import eager_tasks, execute_context
  19. @contextmanager
  20. def mock_urlopen(response_method):
  21. import urllib2
  22. urlopen = urllib2.urlopen
  23. @wraps(urlopen)
  24. def _mocked(url, *args, **kwargs):
  25. response_data, headers = response_method(url)
  26. return addinfourl(StringIO(response_data), headers, url)
  27. urllib2.urlopen = _mocked
  28. yield True
  29. urllib2.urlopen = urlopen
  30. def _response(res):
  31. return lambda r: (res, [])
  32. def success_response(value):
  33. return _response(serialize({"status": "success", "retval": value}))
  34. def fail_response(reason):
  35. return _response(serialize({"status": "failure", "reason": reason}))
  36. def unknown_response():
  37. return _response(serialize({"status": "u.u.u.u", "retval": True}))
  38. class TestEncodings(unittest.TestCase):
  39. def test_utf8dict(self):
  40. d = {u"følelser ær langé": u"ærbadægzaååÆØÅ",
  41. "foobar".encode("utf-8"): "xuzzybaz".encode("utf-8")}
  42. for key, value in http.utf8dict(d.items()).items():
  43. self.assertTrue(isinstance(key, str))
  44. self.assertTrue(isinstance(value, str))
  45. class TestMutableURL(unittest.TestCase):
  46. def test_url_query(self):
  47. url = http.MutableURL("http://example.com?x=10&y=20&z=Foo")
  48. self.assertEquals(url.query.get("x"), "10")
  49. self.assertEquals(url.query.get("y"), "20")
  50. self.assertEquals(url.query.get("z"), "Foo")
  51. url.query["name"] = "George"
  52. url = http.MutableURL(str(url))
  53. self.assertEquals(url.query.get("x"), "10")
  54. self.assertEquals(url.query.get("y"), "20")
  55. self.assertEquals(url.query.get("z"), "Foo")
  56. self.assertEquals(url.query.get("name"), "George")
  57. def test_url_keeps_everything(self):
  58. url = "https://e.com:808/foo/bar#zeta?x=10&y=20"
  59. url = http.MutableURL(url)
  60. self.assertEquals(str(url).split("?")[0],
  61. "https://e.com:808/foo/bar#zeta")
  62. def test___repr__(self):
  63. url = http.MutableURL("http://e.com/foo/bar")
  64. self.assertTrue(repr(url).startswith("<MutableURL: http://e.com"))
  65. def test_set_query(self):
  66. url = http.MutableURL("http://e.com/foo/bar/?x=10")
  67. url.query = {"zzz": "xxx"}
  68. url = http.MutableURL(str(url))
  69. self.assertEquals(url.query, {"zzz": "xxx"})
  70. class TestHttpDispatch(unittest.TestCase):
  71. def test_dispatch_success(self):
  72. logger = logging.getLogger("celery.unittest")
  73. def with_mock_urlopen(_val):
  74. d = http.HttpDispatch("http://example.com/mul", "GET", {
  75. "x": 10, "y": 10}, logger)
  76. self.assertEquals(d.dispatch(), 100)
  77. context = mock_urlopen(success_response(100))
  78. execute_context(context, with_mock_urlopen)
  79. def test_dispatch_failure(self):
  80. logger = logging.getLogger("celery.unittest")
  81. def with_mock_urlopen(_val):
  82. d = http.HttpDispatch("http://example.com/mul", "GET", {
  83. "x": 10, "y": 10}, logger)
  84. self.assertRaises(http.RemoteExecuteError, d.dispatch)
  85. context = mock_urlopen(fail_response("Invalid moon alignment"))
  86. execute_context(context, with_mock_urlopen)
  87. def test_dispatch_empty_response(self):
  88. logger = logging.getLogger("celery.unittest")
  89. def with_mock_urlopen(_val):
  90. d = http.HttpDispatch("http://example.com/mul", "GET", {
  91. "x": 10, "y": 10}, logger)
  92. self.assertRaises(http.InvalidResponseError, d.dispatch)
  93. context = mock_urlopen(_response(""))
  94. execute_context(context, with_mock_urlopen)
  95. def test_dispatch_non_json(self):
  96. logger = logging.getLogger("celery.unittest")
  97. def with_mock_urlopen(_val):
  98. d = http.HttpDispatch("http://example.com/mul", "GET", {
  99. "x": 10, "y": 10}, logger)
  100. self.assertRaises(http.InvalidResponseError, d.dispatch)
  101. context = mock_urlopen(_response("{'#{:'''"))
  102. execute_context(context, with_mock_urlopen)
  103. def test_dispatch_unknown_status(self):
  104. logger = logging.getLogger("celery.unittest")
  105. def with_mock_urlopen(_val):
  106. d = http.HttpDispatch("http://example.com/mul", "GET", {
  107. "x": 10, "y": 10}, logger)
  108. self.assertRaises(http.UnknownStatusError, d.dispatch)
  109. context = mock_urlopen(unknown_response())
  110. execute_context(context, with_mock_urlopen)
  111. def test_dispatch_POST(self):
  112. logger = logging.getLogger("celery.unittest")
  113. def with_mock_urlopen(_val):
  114. d = http.HttpDispatch("http://example.com/mul", "POST", {
  115. "x": 10, "y": 10}, logger)
  116. self.assertEquals(d.dispatch(), 100)
  117. context = mock_urlopen(success_response(100))
  118. execute_context(context, with_mock_urlopen)
  119. class TestURL(unittest.TestCase):
  120. def test_URL_get_async(self):
  121. def with_eager_tasks(_val):
  122. def with_mock_urlopen(_val):
  123. d = http.URL("http://example.com/mul").get_async(x=10, y=10)
  124. self.assertEquals(d.get(), 100)
  125. context = mock_urlopen(success_response(100))
  126. execute_context(context, with_mock_urlopen)
  127. execute_context(eager_tasks(), with_eager_tasks)
  128. def test_URL_post_async(self):
  129. def with_eager_tasks(_val):
  130. def with_mock_urlopen(_val):
  131. d = http.URL("http://example.com/mul").post_async(x=10, y=10)
  132. self.assertEquals(d.get(), 100)
  133. context = mock_urlopen(success_response(100))
  134. execute_context(context, with_mock_urlopen)
  135. execute_context(eager_tasks(), with_eager_tasks)