rest.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import urllib2
  2. try:
  3. from urlparse import parse_qsl
  4. except ImportError:
  5. from cgi import parse_qsl
  6. from urllib import urlencode
  7. from urlparse import urlparse
  8. from anyjson import serialize, deserialize
  9. from celery import __version__ as celery_version
  10. from celery.registry import tasks
  11. from celery.task.base import Task as BaseTask
  12. class InvalidResponseError(Exception):
  13. """The remote server gave an invalid response."""
  14. class RemoteExecuteError(Exception):
  15. """The remote task gave a custom error."""
  16. class UnknownStatusError(InvalidResponseError):
  17. """The remote server gave an unknown status."""
  18. class URL(object):
  19. """Object wrapping a Uniform Resource Locator.
  20. Supports editing the query parameter list.
  21. You can convert the object back to a string, the query will be
  22. properly urlencoded.
  23. Examples
  24. >>> url = URL("http://www.google.com:6580/foo/bar?x=3&y=4#foo")
  25. >>> url.query
  26. {'x': '3', 'y': '4'}
  27. >>> str(url)
  28. 'http://www.google.com:6580/foo/bar?y=4&x=3#foo'
  29. >>> url.query["x"] = 10
  30. >>> url.query.update({"George": "Constanza"})
  31. >>> str(url)
  32. 'http://www.google.com:6580/foo/bar?y=4&x=10&George=Constanza#foo'
  33. """
  34. def __init__(self, url):
  35. self.url = urlparse(url)
  36. self._query = dict(parse_qsl(self.url.query))
  37. def _utf8dict(self, tuple_):
  38. def value_encode(val):
  39. if isinstance(val, unicode):
  40. return val.encode("utf-8")
  41. return val
  42. return dict((key.encode("utf-8"), value_encode(value))
  43. for key, value in tuple_)
  44. def __str__(self):
  45. u = self.url
  46. query = urlencode(self._utf8dict(self.query.items()))
  47. components = ["%s://" % u.scheme,
  48. "%s" % u.netloc,
  49. u.path and "%s" % u.path or "/",
  50. u.params and ";%s" % u.params or None,
  51. query and "?%s" % query or None,
  52. u.fragment and "#%s" % u.fragment or None]
  53. return "".join(filter(None, components))
  54. def __repr__(self):
  55. return "<%s %s>" % (self.__class__.__name__, str(self))
  56. def _get_query(self):
  57. return self._query
  58. def _set_query(self, query):
  59. self._query = query
  60. query = property(_get_query, _set_query)
  61. class RESTProxy(object):
  62. user_agent = "celery/%s" % celery_version
  63. timeout = 5
  64. def __init__(self, url, task_kwargs, logger):
  65. self.url = url
  66. self.task_kwargs = task_kwargs
  67. self.logger = logger
  68. def _create_request(self):
  69. url = URL(self.url)
  70. url.query.update(self.task_kwargs)
  71. req = urllib2.Request(str(url))
  72. req.headers.update(self.http_headers)
  73. return req
  74. def _make_request(self):
  75. request = self._create_request()
  76. opener = urllib2.build_opener()
  77. response = opener.open(request)
  78. return response.read()
  79. def execute(self):
  80. response = self._make_request()
  81. if not response:
  82. raise InvalidResponseError("Empty response")
  83. try:
  84. payload = deserialize(response)
  85. except ValueError, exc:
  86. raise InvalidResponseError(str(exc))
  87. # {"status": "success", "retval": 300}
  88. # {"status": "failure": "reason": "Invalid moon alignment."}
  89. status = payload["status"]
  90. if status == "success":
  91. return payload["retval"]
  92. elif status == "failure":
  93. raise RemoteExecuteError(payload["reason"])
  94. else:
  95. raise UnknownStatusError(str(status))
  96. @property
  97. def http_headers(self):
  98. headers = {"Content-Type": "application/json",
  99. "User-Agent": self.user_agent}
  100. return headers
  101. class RESTProxyTask(BaseTask):
  102. name = "celery.task.rest.RESTProxyTask"
  103. user_agent = "celery %s" % celery_version
  104. def run(self, url, **kwargs):
  105. logger = self.get_logger(**kwargs)
  106. proxy = RESTProxy(url, kwargs, logger)
  107. return proxy.execute()
  108. def task_response(fun, *args, **kwargs):
  109. import sys
  110. try:
  111. sys.stderr.write("executing %s\n" % fun)
  112. retval = fun(*args, **kwargs)
  113. sys.stderr.write("got: %s\n" % retval)
  114. except Exception, exc:
  115. response = {"status": "failure", "reason": str(exc)}
  116. else:
  117. response = {"status": "success", "retval": retval}
  118. return serialize(response)
  119. class Task(BaseTask):
  120. def __call__(self, *args, **kwargs):
  121. return task_response(self.run, *args, **kwargs)