Browse Source

Cleaned up celery.task.http some more, e.g. it's now easier to override the
make_request method, to implement other http client libraries, also a new
decorator has been added for Django applications: celery.views.task_webhook, which turns any function
into a webhook task.

Ask Solem 15 years ago
parent
commit
5e9d7bf3a4

+ 1 - 1
celery/task/__init__.py

@@ -13,7 +13,7 @@ from celery.task.builtins import PingTask
 from celery.task.http import HttpDispatchTask
 
 __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
-           "dmap", "dmap_async", "execute_remote", "ping"]
+           "dmap", "dmap_async", "execute_remote", "ping", "HttpDispatchTask"]
 
 
 def dmap(fun, args, timeout=None):

+ 93 - 45
celery/task/http.py

@@ -1,4 +1,5 @@
 import urllib2
+import warnings
 try:
     from urlparse import parse_qsl
 except ImportError:
@@ -25,7 +26,21 @@ class UnknownStatusError(InvalidResponseError):
     """The remote server gave an unknown status."""
 
 
-class URL(object):
+def maybe_utf8(value):
+    """Encode utf-8 value, only if the value is actually utf-8."""
+    if isinstance(value, unicode):
+        return value.encode("utf-8")
+    return value
+
+
+def utf8dict(self, tup):
+    """With a dict's items() tuple return a new dict with any utf-8
+    keys/values encoded."""
+    return dict((key.encode("utf-8"), maybe_utf8(value))
+                    for key, value in tup)
+
+
+class MutableURL(object):
     """Object wrapping a Uniform Resource Locator.
 
     Supports editing the query parameter list.
@@ -45,24 +60,14 @@ class URL(object):
         'http://www.google.com:6580/foo/bar?y=4&x=10&George=Constanza#foo'
 
     """
-
     def __init__(self, url):
         self.url = urlparse(url)
         self._query = dict(parse_qsl(self.url.query))
 
-    def _utf8dict(self, tuple_):
-
-        def value_encode(val):
-            if isinstance(val, unicode):
-                return val.encode("utf-8")
-            return val
-
-        return dict((key.encode("utf-8"), value_encode(value))
-                        for key, value in tuple_)
 
     def __str__(self):
         u = self.url
-        query = urlencode(self._utf8dict(self.query.items()))
+        query = urlencode(utf8dict(self.query.items()))
         components = ["%s://" % u.scheme,
                       "%s" % u.netloc,
                       u.path and "%s" % u.path or "/",
@@ -80,16 +85,20 @@ class URL(object):
     def _set_query(self, query):
         self._query = query
 
-    def get_async(self, **kwargs):
-        return HttpDispatchTask.delay(str(self), "GET", **kwargs)
-
-    def post_async(self, **kwargs):
-        return HttpDispatchTask.delay(str(self), "POST", **kwargs)
 
     query = property(_get_query, _set_query)
 
 
 class HttpDispatch(object):
+    """Make task HTTP request and collect the task result.
+
+    :param url: The URL to request.
+    :param method: HTTP method used. Currently supported methods are ``GET``
+        and``POST``.
+    :param task_kwargs: Task keyword arguments.
+    :param logger: Logger used for user/system feedback.
+
+    """
     user_agent = "celery/%s" % celery_version
     timeout = 5
 
@@ -99,22 +108,31 @@ class HttpDispatch(object):
         self.task_kwargs = task_kwargs
         self.logger = logger
 
-    def _create_request(self):
-        url = URL(self.url)
-        url.query.update(self.task_kwargs)
-        print("URL: %s" % str(url))
-        req = urllib2.Request(str(url))
-        req.headers.update(self.http_headers)
-        return req
-
-    def _make_request(self):
-        request = self._create_request()
-        opener = urllib2.build_opener()
-        response = opener.open(request)
+    def make_request(self, url, method, params):
+        """Makes an HTTP request and returns the response."""
+        request = urllib2.Request(url, params, headers=self.http_headers)
+        request.headers.update(self.http_headers)
+        response = urllib2.urlopen(request) # user catches errors.
         return response.read()
 
+    def _dispatch_raw(self):
+        """Dispatches the callback and returns the raw response text."""
+        url = MutableURL(self.url)
+        params = None
+        if self.method == "GET":
+            url.query.update(self.task_kwargs)
+        elif self.method == "POST":
+            params = urlencode(utf8dict(self.task_kwargs.items()))
+        return self.make_request(str(url), self.method, params)
+
     def execute(self):
-        response = self._make_request()
+        warnings.warn(DeprecationWarning(
+            "execute() has been deprecated and is scheduled for removal in \
+            celery v1.2, please use dispatch() instead."))
+
+    def dispatch(self):
+        """Dispatch callback and return result."""
+        response = self._dispatch()
         if not response:
             raise InvalidResponseError("Empty response")
         try:
@@ -122,8 +140,6 @@ class HttpDispatch(object):
         except ValueError, exc:
             raise InvalidResponseError(str(exc))
 
-        # {"status": "success", "retval": 300}
-        # {"status": "failure": "reason": "Invalid moon alignment."}
         status = payload["status"]
         if status == "success":
             return payload["retval"]
@@ -140,25 +156,57 @@ class HttpDispatch(object):
 
 
 class HttpDispatchTask(BaseTask):
+    """Task dispatching to an URL.
 
-    def run(self, url, method="GET", **kwargs):
+    :keyword url: The URL location of the HTTP callback task.
+    :keyword method: Method to use when dispatching the callback. Usually
+        ``GET`` or ``POST``.
+    :keyword \*\*kwargs: Keyword arguments to pass on to the HTTP callback.
+
+    .. attribute:: url
+
+        If this is set, this is used as the default URL for requests.
+        Default is to require the user of the task to supply the url as an
+        argument, as this attribute is intended for subclasses.
+
+    .. attribute:: method
+
+        If this is set, this is the default method used for requests.
+        Default is to require the user of the task to supply the method as an
+        argument, as this attribute is intended for subclasses.
+
+    """
+
+    url = None
+    method = None
+
+    def run(self, url=None, method="GET", **kwargs):
+        url = url or self.url
+        method = method or self.method
         logger = self.get_logger(**kwargs)
         return HttpDispatch(url, method, kwargs, logger).execute()
 
 
-def http_task_response(fun, *args, **kwargs):
-    try:
-        retval = fun(*args, **kwargs)
-    except Exception, exc:
-        response = {"status": "failure", "reason": str(exc)}
-    else:
-        response = {"status": "success", "retval": retval}
+class URL(MutableURL):
+    """HTTP Callback URL
 
-    return serialize(response)
+    Supports requesting an URL asynchronously.
 
+    :param url: URL to request.
+    :keyword dispatcher: Class used to dispatch the request.
+        By default this is :class:`HttpDispatchTask`.
+
+    """
+    dispatcher = HttpDispatchTask
+
+    def __init__(self, url, dispatcher=None):
+        super(URL, self).__init__(url)
+        self.dispatcher = dispatcher or self.dispatcher
+
+    def get_async(self, **kwargs):
+        return self.dispatcher.delay(str(self), "GET", **kwargs)
+
+    def post_async(self, **kwargs):
+        return self.dispatcher.delay(str(self), "POST", **kwargs)
 
-class Task(BaseTask):
-    abstract = True
 
-    def __call__(self, *args, **kwargs):
-        return http_task_response(self.run, *args, **kwargs)

+ 1 - 3
celery/task/rest.py

@@ -1,9 +1,8 @@
 from celery.task.http import (InvalidResponseError, RemoteExecuteError,
                               UnknownStatusError)
+from celery.task.http import URL
 from celery.task.http import HttpDispatch as RESTProxy
 from celery.task.http import HttpDispatchTask as RESTProxyTask
-from celery.task.http import http_task_response as task_response
-from celery.task.http import URL, Task
 
 import warnings
 warnings.warn(DeprecationWarning(
@@ -14,7 +13,6 @@ The following objects has been renamed:
 
     celery.task.rest.RESTProxy -> celery.task.http.HttpDispatch
     celery.task.rest.RESTProxyTask -> celery.task.http.HttpDispatchTask
-    celery.task.rest.task_response -> celery.task.http.http_task_response
 
 Other objects have the same name, just moved to the celery.task.http module.
 

+ 38 - 0
celery/views.py

@@ -2,6 +2,7 @@
 from django.http import HttpResponse, Http404
 
 from anyjson import serialize as JSON_dump
+from billiard.utils.functional import wraps
 
 from celery.utils import get_full_cls_name
 from celery.result import AsyncResult
@@ -54,3 +55,40 @@ def task_status(request, task_id):
 
     return HttpResponse(JSON_dump({"task": response_data}),
             mimetype="application/json")
+
+
+
+def task_webhook(fun):
+    """Decorator turning a function into a task webhook.
+
+    If an exception is raised within the function, the decorated
+    function catches this and returns an error JSON response, otherwise
+    it returns the result as a JSON response.
+
+
+    Example:
+
+        @task_webhook
+        def add(request):
+            x = int(request.GET["x"])
+            y = int(request.GET["y"])
+            return x + y
+
+        >>> response = add(request)
+        >>> response.content
+        '{"status": "success", "retval": 100}'
+
+    """
+
+    @wraps(fun)
+    def _inner(*args, **kwargs):
+        try:
+            retval = fun(*args, **kwargs)
+        except Exception, exc:
+            response = {"status": "failure", "reason": str(exc)}
+        else:
+            response = {"status": "success", "retval": retval}
+
+        return HttpResponse(JSON_dump(response), mimetype="application/json")
+
+    return _inner

+ 5 - 5
docs/userguide/remote-tasks.rst

@@ -68,19 +68,19 @@ To execute the task you use the :class:`URL` class:
     >>> from celery.task.http import URL
     >>> res = URL("http://example.com/multiply").get_async(x=10, y=10)
 
+
+:class:`URL` is a shortcut to the :class:`HttpDispatchTask`. You can subclass this to extend the
+functionality.
+
     >>> from celery.task.http import HttpDispatchTask
     >>> res = HttpDispatchTask.delay(url="http://example.com/multiply", method="GET", x=10, y=10)
     >>> res.get()
     100
 
-which uses the :class:`HttpDispatchTask` in the background; This class you can use
-to subclass and extend the functionality.
-
-
 The output of celeryd (or the logfile if you've enabled it) should show the task being processed::
 
     [INFO/MainProcess] Task celery.task.http.HttpDispatchTask
-        [f2cc8efc-2a14-40cd-85ad-f1c77c94beeb] processed: 100
+            [f2cc8efc-2a14-40cd-85ad-f1c77c94beeb] processed: 100
 
 Since applying tasks can be done via HTTP using the
 ``celery.views.apply`` view, executing tasks from other languages is easy.

+ 29 - 0
examples/httpexample/README.rst

@@ -0,0 +1,29 @@
+This example is a simple Django HTTP service exposing a single task
+multiplying two numbers:
+
+The multiply http callback task is in ``views.py``, mapped to an url using
+``urls.py``.
+
+There's no models, so to start it do::
+
+    $ python manage.py runserver
+
+To execute the task you could use curl::
+
+    $ curl http://localhost:8000/multiply?x=10&y=10
+
+which then gives the expected JSON response::
+
+    {"status": "success": "retval": 100}
+
+
+To execute this http callback task asynchronously you could fire up
+a python shell with a properly configured celery and do:
+
+    >>> from celery.task.http import URL
+    >>> res = URL("http://localhost:8000/multiply").get_async(x=10, y=10)
+    >>> res.wait()
+    100
+
+
+That's all!