http.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. import urllib2
  2. from urllib import urlencode
  3. from urlparse import urlparse
  4. from anyjson import deserialize
  5. from celery import __version__ as celery_version
  6. from celery.task.base import Task as BaseTask
  7. from celery.utils.compat import parse_qsl
  8. GET_METHODS = frozenset(["GET", "HEAD"])
  9. class InvalidResponseError(Exception):
  10. """The remote server gave an invalid response."""
  11. class RemoteExecuteError(Exception):
  12. """The remote task gave a custom error."""
  13. class UnknownStatusError(InvalidResponseError):
  14. """The remote server gave an unknown status."""
  15. def maybe_utf8(value):
  16. """Encode utf-8 value, only if the value is actually utf-8."""
  17. if isinstance(value, unicode):
  18. return value.encode("utf-8")
  19. return value
  20. def utf8dict(tup):
  21. """With a dict's items() tuple return a new dict with any utf-8
  22. keys/values encoded."""
  23. return dict((key.encode("utf-8"), maybe_utf8(value))
  24. for key, value in tup)
  25. def extract_response(raw_response):
  26. """Extract the response text from a raw JSON response."""
  27. if not raw_response:
  28. raise InvalidResponseError("Empty response")
  29. try:
  30. payload = deserialize(raw_response)
  31. except ValueError, exc:
  32. raise InvalidResponseError(str(exc))
  33. status = payload["status"]
  34. if status == "success":
  35. return payload["retval"]
  36. elif status == "failure":
  37. raise RemoteExecuteError(payload.get("reason"))
  38. else:
  39. raise UnknownStatusError(str(status))
  40. class MutableURL(object):
  41. """Object wrapping a Uniform Resource Locator.
  42. Supports editing the query parameter list.
  43. You can convert the object back to a string, the query will be
  44. properly urlencoded.
  45. Examples
  46. >>> url = URL("http://www.google.com:6580/foo/bar?x=3&y=4#foo")
  47. >>> url.query
  48. {'x': '3', 'y': '4'}
  49. >>> str(url)
  50. 'http://www.google.com:6580/foo/bar?y=4&x=3#foo'
  51. >>> url.query["x"] = 10
  52. >>> url.query.update({"George": "Costanza"})
  53. >>> str(url)
  54. 'http://www.google.com:6580/foo/bar?y=4&x=10&George=Costanza#foo'
  55. """
  56. def __init__(self, url):
  57. self.parts = urlparse(url)
  58. self._query = dict(parse_qsl(self.parts[4]))
  59. def __str__(self):
  60. scheme, netloc, path, params, query, fragment = self.parts
  61. query = urlencode(utf8dict(self.query.items()))
  62. components = ["%s://" % scheme,
  63. "%s" % netloc,
  64. path and "%s" % path or "/",
  65. params and ";%s" % params or None,
  66. query and "?%s" % query or None,
  67. fragment and "#%s" % fragment or None]
  68. return "".join(filter(None, components))
  69. def __repr__(self):
  70. return "<%s: %s>" % (self.__class__.__name__, str(self))
  71. def _get_query(self):
  72. return self._query
  73. def _set_query(self, query):
  74. self._query = query
  75. query = property(_get_query, _set_query)
  76. class HttpDispatch(object):
  77. """Make task HTTP request and collect the task result.
  78. :param url: The URL to request.
  79. :param method: HTTP method used. Currently supported methods are ``GET``
  80. and ``POST``.
  81. :param task_kwargs: Task keyword arguments.
  82. :param logger: Logger used for user/system feedback.
  83. """
  84. user_agent = "celery/%s" % celery_version
  85. timeout = 5
  86. def __init__(self, url, method, task_kwargs, logger):
  87. self.url = url
  88. self.method = method
  89. self.task_kwargs = task_kwargs
  90. self.logger = logger
  91. def make_request(self, url, method, params):
  92. """Makes an HTTP request and returns the response."""
  93. request = urllib2.Request(url, params, headers=self.http_headers)
  94. request.headers.update(self.http_headers)
  95. response = urllib2.urlopen(request) # user catches errors.
  96. return response.read()
  97. def dispatch(self):
  98. """Dispatch callback and return result."""
  99. url = MutableURL(self.url)
  100. params = None
  101. if self.method in GET_METHODS:
  102. url.query.update(self.task_kwargs)
  103. else:
  104. params = urlencode(utf8dict(self.task_kwargs.items()))
  105. raw_response = self.make_request(str(url), self.method, params)
  106. return extract_response(raw_response)
  107. @property
  108. def http_headers(self):
  109. headers = {"Content-Type": "application/json",
  110. "User-Agent": self.user_agent}
  111. return headers
  112. class HttpDispatchTask(BaseTask):
  113. """Task dispatching to an URL.
  114. :keyword url: The URL location of the HTTP callback task.
  115. :keyword method: Method to use when dispatching the callback. Usually
  116. ``GET`` or ``POST``.
  117. :keyword \*\*kwargs: Keyword arguments to pass on to the HTTP callback.
  118. .. attribute:: url
  119. If this is set, this is used as the default URL for requests.
  120. Default is to require the user of the task to supply the url as an
  121. argument, as this attribute is intended for subclasses.
  122. .. attribute:: method
  123. If this is set, this is the default method used for requests.
  124. Default is to require the user of the task to supply the method as an
  125. argument, as this attribute is intended for subclasses.
  126. """
  127. url = None
  128. method = None
  129. def run(self, url=None, method="GET", **kwargs):
  130. url = url or self.url
  131. method = method or self.method
  132. logger = self.get_logger(**kwargs)
  133. return HttpDispatch(url, method, kwargs, logger).dispatch()
  134. class URL(MutableURL):
  135. """HTTP Callback URL
  136. Supports requesting an URL asynchronously.
  137. :param url: URL to request.
  138. :keyword dispatcher: Class used to dispatch the request.
  139. By default this is :class:`HttpDispatchTask`.
  140. """
  141. dispatcher = HttpDispatchTask
  142. def __init__(self, url, dispatcher=None):
  143. super(URL, self).__init__(url)
  144. self.dispatcher = dispatcher or self.dispatcher
  145. def get_async(self, **kwargs):
  146. return self.dispatcher.delay(str(self), "GET", **kwargs)
  147. def post_async(self, **kwargs):
  148. return self.dispatcher.delay(str(self), "POST", **kwargs)