test_task_http.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # -*- coding: utf-8 -*-
  2. from __future__ import generators
  3. import logging
  4. import unittest2 as unittest
  5. from urllib import addinfourl
  6. try:
  7. from contextlib import contextmanager
  8. except ImportError:
  9. from celery.tests.utils import fallback_contextmanager as contextmanager
  10. from StringIO import StringIO
  11. from anyjson import serialize
  12. from celery.task import http
  13. from celery.utils.functional import wraps
  14. from celery.tests.utils import eager_tasks, execute_context
  15. @contextmanager
  16. def mock_urlopen(response_method):
  17. import urllib2
  18. urlopen = urllib2.urlopen
  19. @wraps(urlopen)
  20. def _mocked(url, *args, **kwargs):
  21. response_data, headers = response_method(url)
  22. return addinfourl(StringIO(response_data), headers, url)
  23. urllib2.urlopen = _mocked
  24. yield True
  25. urllib2.urlopen = urlopen
  26. def _response(res):
  27. return lambda r: (res, [])
  28. def success_response(value):
  29. return _response(serialize({"status": "success", "retval": value}))
  30. def fail_response(reason):
  31. return _response(serialize({"status": "failure", "reason": reason}))
  32. def unknown_response():
  33. return _response(serialize({"status": "u.u.u.u", "retval": True}))
  34. class TestEncodings(unittest.TestCase):
  35. def test_utf8dict(self):
  36. d = {u"følelser ær langé": u"ærbadægzaååÆØÅ",
  37. "foobar".encode("utf-8"): "xuzzybaz".encode("utf-8")}
  38. for key, value in http.utf8dict(d.items()).items():
  39. self.assertIsInstance(key, str)
  40. self.assertIsInstance(value, str)
  41. class TestMutableURL(unittest.TestCase):
  42. def test_url_query(self):
  43. url = http.MutableURL("http://example.com?x=10&y=20&z=Foo")
  44. self.assertDictContainsSubset({"x": "10",
  45. "y": "20",
  46. "z": "Foo"}, url.query)
  47. url.query["name"] = "George"
  48. url = http.MutableURL(str(url))
  49. self.assertDictContainsSubset({"x": "10",
  50. "y": "20",
  51. "z": "Foo",
  52. "name": "George"}, url.query)
  53. def test_url_keeps_everything(self):
  54. url = "https://e.com:808/foo/bar#zeta?x=10&y=20"
  55. url = http.MutableURL(url)
  56. self.assertEqual(str(url).split("?")[0],
  57. "https://e.com:808/foo/bar#zeta")
  58. def test___repr__(self):
  59. url = http.MutableURL("http://e.com/foo/bar")
  60. self.assertTrue(repr(url).startswith("<MutableURL: http://e.com"))
  61. def test_set_query(self):
  62. url = http.MutableURL("http://e.com/foo/bar/?x=10")
  63. url.query = {"zzz": "xxx"}
  64. url = http.MutableURL(str(url))
  65. self.assertEqual(url.query, {"zzz": "xxx"})
  66. class TestHttpDispatch(unittest.TestCase):
  67. def test_dispatch_success(self):
  68. logger = logging.getLogger("celery.unittest")
  69. def with_mock_urlopen(_val):
  70. d = http.HttpDispatch("http://example.com/mul", "GET", {
  71. "x": 10, "y": 10}, logger)
  72. self.assertEqual(d.dispatch(), 100)
  73. context = mock_urlopen(success_response(100))
  74. execute_context(context, with_mock_urlopen)
  75. def test_dispatch_failure(self):
  76. logger = logging.getLogger("celery.unittest")
  77. def with_mock_urlopen(_val):
  78. d = http.HttpDispatch("http://example.com/mul", "GET", {
  79. "x": 10, "y": 10}, logger)
  80. self.assertRaises(http.RemoteExecuteError, d.dispatch)
  81. context = mock_urlopen(fail_response("Invalid moon alignment"))
  82. execute_context(context, with_mock_urlopen)
  83. def test_dispatch_empty_response(self):
  84. logger = logging.getLogger("celery.unittest")
  85. def with_mock_urlopen(_val):
  86. d = http.HttpDispatch("http://example.com/mul", "GET", {
  87. "x": 10, "y": 10}, logger)
  88. self.assertRaises(http.InvalidResponseError, d.dispatch)
  89. context = mock_urlopen(_response(""))
  90. execute_context(context, with_mock_urlopen)
  91. def test_dispatch_non_json(self):
  92. logger = logging.getLogger("celery.unittest")
  93. def with_mock_urlopen(_val):
  94. d = http.HttpDispatch("http://example.com/mul", "GET", {
  95. "x": 10, "y": 10}, logger)
  96. self.assertRaises(http.InvalidResponseError, d.dispatch)
  97. context = mock_urlopen(_response("{'#{:'''"))
  98. execute_context(context, with_mock_urlopen)
  99. def test_dispatch_unknown_status(self):
  100. logger = logging.getLogger("celery.unittest")
  101. def with_mock_urlopen(_val):
  102. d = http.HttpDispatch("http://example.com/mul", "GET", {
  103. "x": 10, "y": 10}, logger)
  104. self.assertRaises(http.UnknownStatusError, d.dispatch)
  105. context = mock_urlopen(unknown_response())
  106. execute_context(context, with_mock_urlopen)
  107. def test_dispatch_POST(self):
  108. logger = logging.getLogger("celery.unittest")
  109. def with_mock_urlopen(_val):
  110. d = http.HttpDispatch("http://example.com/mul", "POST", {
  111. "x": 10, "y": 10}, logger)
  112. self.assertEqual(d.dispatch(), 100)
  113. context = mock_urlopen(success_response(100))
  114. execute_context(context, with_mock_urlopen)
  115. class TestURL(unittest.TestCase):
  116. def test_URL_get_async(self):
  117. http.HttpDispatchTask.app.conf.CELERY_ALWAYS_EAGER = True
  118. try:
  119. def with_mock_urlopen(_val):
  120. d = http.URL("http://example.com/mul").get_async(x=10, y=10)
  121. self.assertEqual(d.get(), 100)
  122. context = mock_urlopen(success_response(100))
  123. execute_context(context, with_mock_urlopen)
  124. finally:
  125. http.HttpDispatchTask.app.conf.CELERY_ALWAYS_EAGER = False
  126. def test_URL_post_async(self):
  127. http.HttpDispatchTask.app.conf.CELERY_ALWAYS_EAGER = True
  128. try:
  129. def with_mock_urlopen(_val):
  130. d = http.URL("http://example.com/mul").post_async(x=10, y=10)
  131. self.assertEqual(d.get(), 100)
  132. context = mock_urlopen(success_response(100))
  133. execute_context(context, with_mock_urlopen)
  134. finally:
  135. http.HttpDispatchTask.app.conf.CELERY_ALWAYS_EAGER = False