test_task_http.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. # -*- coding: utf-8 -*-
  2. from __future__ import generators
  3. import logging
  4. import unittest2 as unittest
  5. from urllib import addinfourl
  6. try:
  7. from contextlib import contextmanager
  8. except ImportError:
  9. from celery.tests.utils import fallback_contextmanager as contextmanager
  10. try:
  11. from cStringIO import StringIO
  12. except ImportError:
  13. from StringIO import StringIO
  14. from billiard.utils.functional import wraps
  15. from anyjson import serialize
  16. from celery.task import http
  17. from celery.tests.utils import eager_tasks, execute_context
  18. @contextmanager
  19. def mock_urlopen(response_method):
  20. import urllib2
  21. urlopen = urllib2.urlopen
  22. @wraps(urlopen)
  23. def _mocked(url, *args, **kwargs):
  24. response_data, headers = response_method(url)
  25. return addinfourl(StringIO(response_data), headers, url)
  26. urllib2.urlopen = _mocked
  27. yield True
  28. urllib2.urlopen = urlopen
  29. def _response(res):
  30. return lambda r: (res, [])
  31. def success_response(value):
  32. return _response(serialize({"status": "success", "retval": value}))
  33. def fail_response(reason):
  34. return _response(serialize({"status": "failure", "reason": reason}))
  35. def unknown_response():
  36. return _response(serialize({"status": "u.u.u.u", "retval": True}))
  37. class TestEncodings(unittest.TestCase):
  38. def test_utf8dict(self):
  39. d = {u"følelser ær langé": u"ærbadægzaå寨Å",
  40. "foobar".encode("utf-8"): "xuzzybaz".encode("utf-8")}
  41. for key, value in http.utf8dict(d.items()).items():
  42. self.assertIsInstance(key, str)
  43. self.assertIsInstance(value, str)
  44. class TestMutableURL(unittest.TestCase):
  45. def test_url_query(self):
  46. url = http.MutableURL("http://example.com?x=10&y=20&z=Foo")
  47. self.assertDictContainsSubset({"x": "10",
  48. "y": "20",
  49. "z": "Foo"}, url.query)
  50. url.query["name"] = "George"
  51. url = http.MutableURL(str(url))
  52. self.assertDictContainsSubset({"x": "10",
  53. "y": "20",
  54. "z": "Foo",
  55. "name": "George"}, url.query)
  56. def test_url_keeps_everything(self):
  57. url = "https://e.com:808/foo/bar#zeta?x=10&y=20"
  58. url = http.MutableURL(url)
  59. self.assertEqual(str(url).split("?")[0],
  60. "https://e.com:808/foo/bar#zeta")
  61. def test___repr__(self):
  62. url = http.MutableURL("http://e.com/foo/bar")
  63. self.assertTrue(repr(url).startswith("<MutableURL: http://e.com"))
  64. def test_set_query(self):
  65. url = http.MutableURL("http://e.com/foo/bar/?x=10")
  66. url.query = {"zzz": "xxx"}
  67. url = http.MutableURL(str(url))
  68. self.assertEqual(url.query, {"zzz": "xxx"})
  69. class TestHttpDispatch(unittest.TestCase):
  70. def test_dispatch_success(self):
  71. logger = logging.getLogger("celery.unittest")
  72. def with_mock_urlopen(_val):
  73. d = http.HttpDispatch("http://example.com/mul", "GET", {
  74. "x": 10, "y": 10}, logger)
  75. self.assertEqual(d.dispatch(), 100)
  76. context = mock_urlopen(success_response(100))
  77. execute_context(context, with_mock_urlopen)
  78. def test_dispatch_failure(self):
  79. logger = logging.getLogger("celery.unittest")
  80. def with_mock_urlopen(_val):
  81. d = http.HttpDispatch("http://example.com/mul", "GET", {
  82. "x": 10, "y": 10}, logger)
  83. self.assertRaises(http.RemoteExecuteError, d.dispatch)
  84. context = mock_urlopen(fail_response("Invalid moon alignment"))
  85. execute_context(context, with_mock_urlopen)
  86. def test_dispatch_empty_response(self):
  87. logger = logging.getLogger("celery.unittest")
  88. def with_mock_urlopen(_val):
  89. d = http.HttpDispatch("http://example.com/mul", "GET", {
  90. "x": 10, "y": 10}, logger)
  91. self.assertRaises(http.InvalidResponseError, d.dispatch)
  92. context = mock_urlopen(_response(""))
  93. execute_context(context, with_mock_urlopen)
  94. def test_dispatch_non_json(self):
  95. logger = logging.getLogger("celery.unittest")
  96. def with_mock_urlopen(_val):
  97. d = http.HttpDispatch("http://example.com/mul", "GET", {
  98. "x": 10, "y": 10}, logger)
  99. self.assertRaises(http.InvalidResponseError, d.dispatch)
  100. context = mock_urlopen(_response("{'#{:'''"))
  101. execute_context(context, with_mock_urlopen)
  102. def test_dispatch_unknown_status(self):
  103. logger = logging.getLogger("celery.unittest")
  104. def with_mock_urlopen(_val):
  105. d = http.HttpDispatch("http://example.com/mul", "GET", {
  106. "x": 10, "y": 10}, logger)
  107. self.assertRaises(http.UnknownStatusError, d.dispatch)
  108. context = mock_urlopen(unknown_response())
  109. execute_context(context, with_mock_urlopen)
  110. def test_dispatch_POST(self):
  111. logger = logging.getLogger("celery.unittest")
  112. def with_mock_urlopen(_val):
  113. d = http.HttpDispatch("http://example.com/mul", "POST", {
  114. "x": 10, "y": 10}, logger)
  115. self.assertEqual(d.dispatch(), 100)
  116. context = mock_urlopen(success_response(100))
  117. execute_context(context, with_mock_urlopen)
  118. class TestURL(unittest.TestCase):
  119. def test_URL_get_async(self):
  120. def with_eager_tasks(_val):
  121. def with_mock_urlopen(_val):
  122. d = http.URL("http://example.com/mul").get_async(x=10, y=10)
  123. self.assertEqual(d.get(), 100)
  124. context = mock_urlopen(success_response(100))
  125. execute_context(context, with_mock_urlopen)
  126. execute_context(eager_tasks(), with_eager_tasks)
  127. def test_URL_post_async(self):
  128. def with_eager_tasks(_val):
  129. def with_mock_urlopen(_val):
  130. d = http.URL("http://example.com/mul").post_async(x=10, y=10)
  131. self.assertEqual(d.get(), 100)
  132. context = mock_urlopen(success_response(100))
  133. execute_context(context, with_mock_urlopen)
  134. execute_context(eager_tasks(), with_eager_tasks)