http.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.http
  4. ~~~~~~~~~~~~~~~~
  5. Webhook task implementation.
  6. """
  7. from __future__ import absolute_import
  8. import anyjson
  9. import sys
  10. try:
  11. from urllib.parse import parse_qsl, urlencode, urlparse # Py3
  12. except ImportError: # pragma: no cover
  13. from urllib import urlencode # noqa
  14. from urlparse import urlparse, parse_qsl # noqa
  15. from celery import __version__ as celery_version
  16. from celery.five import items, reraise
  17. from celery.utils.log import get_task_logger
  18. from .base import Task as BaseTask
  19. GET_METHODS = frozenset(['GET', 'HEAD'])
  20. logger = get_task_logger(__name__)
  21. if sys.version_info[0] == 3: # pragma: no cover
  22. from urllib.request import Request, urlopen
  23. def utf8dict(tup):
  24. if not isinstance(tup, dict):
  25. return dict(tup)
  26. return tup
  27. else:
  28. from urllib2 import Request, urlopen # noqa
  29. def maybe_utf8(value): # noqa
  30. """Encode to utf-8, only if the value is Unicode."""
  31. if isinstance(value, unicode):
  32. return value.encode('utf-8')
  33. return value
  34. def utf8dict(tup): # noqa
  35. """With a dict's items() tuple return a new dict with any utf-8
  36. keys/values encoded."""
  37. return dict((key.encode('utf-8'), maybe_utf8(value))
  38. for key, value in tup)
  39. class InvalidResponseError(Exception):
  40. """The remote server gave an invalid response."""
  41. class RemoteExecuteError(Exception):
  42. """The remote task gave a custom error."""
  43. class UnknownStatusError(InvalidResponseError):
  44. """The remote server gave an unknown status."""
  45. def extract_response(raw_response, loads=anyjson.loads):
  46. """Extract the response text from a raw JSON response."""
  47. if not raw_response:
  48. raise InvalidResponseError('Empty response')
  49. try:
  50. payload = loads(raw_response)
  51. except ValueError as exc:
  52. reraise(InvalidResponseError, InvalidResponseError(
  53. str(exc)), sys.exc_info()[2])
  54. status = payload['status']
  55. if status == 'success':
  56. return payload['retval']
  57. elif status == 'failure':
  58. raise RemoteExecuteError(payload.get('reason'))
  59. else:
  60. raise UnknownStatusError(str(status))
  61. class MutableURL(object):
  62. """Object wrapping a Uniform Resource Locator.
  63. Supports editing the query parameter list.
  64. You can convert the object back to a string, the query will be
  65. properly urlencoded.
  66. Examples
  67. >>> url = URL('http://www.google.com:6580/foo/bar?x=3&y=4#foo')
  68. >>> url.query
  69. {'x': '3', 'y': '4'}
  70. >>> str(url)
  71. 'http://www.google.com:6580/foo/bar?y=4&x=3#foo'
  72. >>> url.query['x'] = 10
  73. >>> url.query.update({'George': 'Costanza'})
  74. >>> str(url)
  75. 'http://www.google.com:6580/foo/bar?y=4&x=10&George=Costanza#foo'
  76. """
  77. def __init__(self, url):
  78. self.parts = urlparse(url)
  79. self.query = dict(parse_qsl(self.parts[4]))
  80. def __str__(self):
  81. scheme, netloc, path, params, query, fragment = self.parts
  82. query = urlencode(utf8dict(items(self.query)))
  83. components = [scheme + '://', netloc, path or '/',
  84. ';{0}'.format(params) if params else '',
  85. '?{0}'.format(query) if query else '',
  86. '#{0}'.format(fragment) if fragment else '']
  87. return ''.join(c for c in components if c)
  88. def __repr__(self):
  89. return '<{0}: {1}>'.format(type(self).__name__, self)
  90. class HttpDispatch(object):
  91. """Make task HTTP request and collect the task result.
  92. :param url: The URL to request.
  93. :param method: HTTP method used. Currently supported methods are `GET`
  94. and `POST`.
  95. :param task_kwargs: Task keyword arguments.
  96. :param logger: Logger used for user/system feedback.
  97. """
  98. user_agent = 'celery/{version}'.format(version=celery_version)
  99. timeout = 5
  100. def __init__(self, url, method, task_kwargs, **kwargs):
  101. self.url = url
  102. self.method = method
  103. self.task_kwargs = task_kwargs
  104. self.logger = kwargs.get('logger') or logger
  105. def make_request(self, url, method, params):
  106. """Makes an HTTP request and returns the response."""
  107. request = Request(url, params)
  108. for key, val in items(self.http_headers):
  109. request.add_header(key, val)
  110. response = urlopen(request) # user catches errors.
  111. return response.read()
  112. def dispatch(self):
  113. """Dispatch callback and return result."""
  114. url = MutableURL(self.url)
  115. params = None
  116. if self.method in GET_METHODS:
  117. url.query.update(self.task_kwargs)
  118. else:
  119. params = urlencode(utf8dict(items(self.task_kwargs)))
  120. raw_response = self.make_request(str(url), self.method, params)
  121. return extract_response(raw_response)
  122. @property
  123. def http_headers(self):
  124. headers = {'User-Agent': self.user_agent}
  125. return headers
  126. class HttpDispatchTask(BaseTask):
  127. """Task dispatching to an URL.
  128. :keyword url: The URL location of the HTTP callback task.
  129. :keyword method: Method to use when dispatching the callback. Usually
  130. `GET` or `POST`.
  131. :keyword \*\*kwargs: Keyword arguments to pass on to the HTTP callback.
  132. .. attribute:: url
  133. If this is set, this is used as the default URL for requests.
  134. Default is to require the user of the task to supply the url as an
  135. argument, as this attribute is intended for subclasses.
  136. .. attribute:: method
  137. If this is set, this is the default method used for requests.
  138. Default is to require the user of the task to supply the method as an
  139. argument, as this attribute is intended for subclasses.
  140. """
  141. url = None
  142. method = None
  143. accept_magic_kwargs = False
  144. def run(self, url=None, method='GET', **kwargs):
  145. url = url or self.url
  146. method = method or self.method
  147. return HttpDispatch(url, method, kwargs).dispatch()
  148. class URL(MutableURL):
  149. """HTTP Callback URL
  150. Supports requesting an URL asynchronously.
  151. :param url: URL to request.
  152. :keyword dispatcher: Class used to dispatch the request.
  153. By default this is :class:`HttpDispatchTask`.
  154. """
  155. dispatcher = HttpDispatchTask
  156. def __init__(self, url, dispatcher=None):
  157. super(URL, self).__init__(url)
  158. self.dispatcher = dispatcher or self.dispatcher
  159. def get_async(self, **kwargs):
  160. return self.dispatcher.delay(str(self), 'GET', **kwargs)
  161. def post_async(self, **kwargs):
  162. return self.dispatcher.delay(str(self), 'POST', **kwargs)