Browse Source

Merge branch 'httprefactor'

Ask Solem 15 years ago
parent
commit
b1f7c7995f

+ 2 - 1
celery/task/__init__.py

@@ -10,9 +10,10 @@ from celery.registry import tasks
 from celery.task.base import Task, TaskSet, PeriodicTask, ExecuteRemoteTask
 from celery.task.control import discard_all
 from celery.task.builtins import PingTask
+from celery.task.http import HttpDispatchTask
 
 __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
-           "dmap", "dmap_async", "execute_remote", "ping"]
+           "dmap", "dmap_async", "execute_remote", "ping", "HttpDispatchTask"]
 
 
 def dmap(fun, args, timeout=None):

+ 212 - 0
celery/task/http.py

@@ -0,0 +1,212 @@
+import urllib2
+import warnings
+try:
+    from urlparse import parse_qsl
+except ImportError:
+    from cgi import parse_qsl
+from urllib import urlencode
+from urlparse import urlparse
+
+from anyjson import serialize, deserialize
+from billiard.utils.functional import wraps
+
+from celery import __version__ as celery_version
+from celery.task.base import Task as BaseTask
+
+
+class InvalidResponseError(Exception):
+    """The remote server gave an invalid response."""
+
+
+class RemoteExecuteError(Exception):
+    """The remote task gave a custom error."""
+
+
+class UnknownStatusError(InvalidResponseError):
+    """The remote server gave an unknown status."""
+
+
+def maybe_utf8(value):
+    """Encode utf-8 value, only if the value is actually utf-8."""
+    if isinstance(value, unicode):
+        return value.encode("utf-8")
+    return value
+
+
+def utf8dict(self, tup):
+    """With a dict's items() tuple return a new dict with any utf-8
+    keys/values encoded."""
+    return dict((key.encode("utf-8"), maybe_utf8(value))
+                    for key, value in tup)
+
+
+class MutableURL(object):
+    """Object wrapping a Uniform Resource Locator.
+
+    Supports editing the query parameter list.
+    You can convert the object back to a string, the query will be
+    properly urlencoded.
+
+    Examples
+
+        >>> url = URL("http://www.google.com:6580/foo/bar?x=3&y=4#foo")
+        >>> url.query
+        {'x': '3', 'y': '4'}
+        >>> str(url)
+        'http://www.google.com:6580/foo/bar?y=4&x=3#foo'
+        >>> url.query["x"] = 10
+        >>> url.query.update({"George": "Constanza"})
+        >>> str(url)
+        'http://www.google.com:6580/foo/bar?y=4&x=10&George=Constanza#foo'
+
+    """
+    def __init__(self, url):
+        self.url = urlparse(url)
+        self._query = dict(parse_qsl(self.url.query))
+
+
+    def __str__(self):
+        u = self.url
+        query = urlencode(utf8dict(self.query.items()))
+        components = ["%s://" % u.scheme,
+                      "%s" % u.netloc,
+                      u.path and "%s" % u.path or "/",
+                      u.params and ";%s" % u.params or None,
+                      query and "?%s" % query or None,
+                      u.fragment and "#%s" % u.fragment or None]
+        return "".join(filter(None, components))
+
+    def __repr__(self):
+        return "<%s %s>" % (self.__class__.__name__, str(self))
+
+    def _get_query(self):
+        return self._query
+
+    def _set_query(self, query):
+        self._query = query
+
+
+    query = property(_get_query, _set_query)
+
+
+class HttpDispatch(object):
+    """Make task HTTP request and collect the task result.
+
+    :param url: The URL to request.
+    :param method: HTTP method used. Currently supported methods are ``GET``
+        and``POST``.
+    :param task_kwargs: Task keyword arguments.
+    :param logger: Logger used for user/system feedback.
+
+    """
+    user_agent = "celery/%s" % celery_version
+    timeout = 5
+
+    def __init__(self, url, method, task_kwargs, logger):
+        self.url = url
+        self.method = method
+        self.task_kwargs = task_kwargs
+        self.logger = logger
+
+    def make_request(self, url, method, params):
+        """Makes an HTTP request and returns the response."""
+        request = urllib2.Request(url, params, headers=self.http_headers)
+        request.headers.update(self.http_headers)
+        response = urllib2.urlopen(request) # user catches errors.
+        return response.read()
+
+    def _dispatch_raw(self):
+        """Dispatches the callback and returns the raw response text."""
+        url = MutableURL(self.url)
+        params = None
+        if self.method == "GET":
+            url.query.update(self.task_kwargs)
+        elif self.method == "POST":
+            params = urlencode(utf8dict(self.task_kwargs.items()))
+        return self.make_request(str(url), self.method, params)
+
+    def execute(self):
+        warnings.warn(DeprecationWarning(
+            "execute() has been deprecated and is scheduled for removal in \
+            celery v1.2, please use dispatch() instead."))
+
+    def dispatch(self):
+        """Dispatch callback and return result."""
+        response = self._dispatch()
+        if not response:
+            raise InvalidResponseError("Empty response")
+        try:
+            payload = deserialize(response)
+        except ValueError, exc:
+            raise InvalidResponseError(str(exc))
+
+        status = payload["status"]
+        if status == "success":
+            return payload["retval"]
+        elif status == "failure":
+            raise RemoteExecuteError(payload.get("reason"))
+        else:
+            raise UnknownStatusError(str(status))
+
+    @property
+    def http_headers(self):
+        headers = {"Content-Type": "application/json",
+                   "User-Agent": self.user_agent}
+        return headers
+
+
+class HttpDispatchTask(BaseTask):
+    """Task dispatching to an URL.
+
+    :keyword url: The URL location of the HTTP callback task.
+    :keyword method: Method to use when dispatching the callback. Usually
+        ``GET`` or ``POST``.
+    :keyword \*\*kwargs: Keyword arguments to pass on to the HTTP callback.
+
+    .. attribute:: url
+
+        If this is set, this is used as the default URL for requests.
+        Default is to require the user of the task to supply the url as an
+        argument, as this attribute is intended for subclasses.
+
+    .. attribute:: method
+
+        If this is set, this is the default method used for requests.
+        Default is to require the user of the task to supply the method as an
+        argument, as this attribute is intended for subclasses.
+
+    """
+
+    url = None
+    method = None
+
+    def run(self, url=None, method="GET", **kwargs):
+        url = url or self.url
+        method = method or self.method
+        logger = self.get_logger(**kwargs)
+        return HttpDispatch(url, method, kwargs, logger).execute()
+
+
+class URL(MutableURL):
+    """HTTP Callback URL
+
+    Supports requesting an URL asynchronously.
+
+    :param url: URL to request.
+    :keyword dispatcher: Class used to dispatch the request.
+        By default this is :class:`HttpDispatchTask`.
+
+    """
+    dispatcher = HttpDispatchTask
+
+    def __init__(self, url, dispatcher=None):
+        super(URL, self).__init__(url)
+        self.dispatcher = dispatcher or self.dispatcher
+
+    def get_async(self, **kwargs):
+        return self.dispatcher.delay(str(self), "GET", **kwargs)
+
+    def post_async(self, **kwargs):
+        return self.dispatcher.delay(str(self), "POST", **kwargs)
+
+

+ 14 - 155
celery/task/rest.py

@@ -1,160 +1,19 @@
-import urllib2
-try:
-    from urlparse import parse_qsl
-except ImportError:
-    from cgi import parse_qsl
-from urllib import urlencode
-from urlparse import urlparse
+from celery.task.http import (InvalidResponseError, RemoteExecuteError,
+                              UnknownStatusError)
+from celery.task.http import URL
+from celery.task.http import HttpDispatch as RESTProxy
+from celery.task.http import HttpDispatchTask as RESTProxyTask
 
-from anyjson import serialize, deserialize
+import warnings
+warnings.warn(DeprecationWarning(
+"""celery.task.rest has been deprecated and is scheduled for removal in
+v1.2. Please use celery.task.http instead.
 
-from celery import __version__ as celery_version
-from celery.task.base import Task as BaseTask
+The following objects has been renamed:
 
+    celery.task.rest.RESTProxy -> celery.task.http.HttpDispatch
+    celery.task.rest.RESTProxyTask -> celery.task.http.HttpDispatchTask
 
-class InvalidResponseError(Exception):
-    """The remote server gave an invalid response."""
+Other objects have the same name, just moved to the celery.task.http module.
 
-
-class RemoteExecuteError(Exception):
-    """The remote task gave a custom error."""
-
-
-class UnknownStatusError(InvalidResponseError):
-    """The remote server gave an unknown status."""
-
-
-class URL(object):
-    """Object wrapping a Uniform Resource Locator.
-
-    Supports editing the query parameter list.
-    You can convert the object back to a string, the query will be
-    properly urlencoded.
-
-    Examples
-
-        >>> url = URL("http://www.google.com:6580/foo/bar?x=3&y=4#foo")
-        >>> url.query
-        {'x': '3', 'y': '4'}
-        >>> str(url)
-        'http://www.google.com:6580/foo/bar?y=4&x=3#foo'
-        >>> url.query["x"] = 10
-        >>> url.query.update({"George": "Constanza"})
-        >>> str(url)
-        'http://www.google.com:6580/foo/bar?y=4&x=10&George=Constanza#foo'
-
-    """
-
-    def __init__(self, url):
-        self.url = urlparse(url)
-        self._query = dict(parse_qsl(self.url.query))
-
-    def _utf8dict(self, tuple_):
-
-        def value_encode(val):
-            if isinstance(val, unicode):
-                return val.encode("utf-8")
-            return val
-
-        return dict((key.encode("utf-8"), value_encode(value))
-                        for key, value in tuple_)
-
-    def __str__(self):
-        u = self.url
-        query = urlencode(self._utf8dict(self.query.items()))
-        components = ["%s://" % u.scheme,
-                      "%s" % u.netloc,
-                      u.path and "%s" % u.path or "/",
-                      u.params and ";%s" % u.params or None,
-                      query and "?%s" % query or None,
-                      u.fragment and "#%s" % u.fragment or None]
-        return "".join(filter(None, components))
-
-    def __repr__(self):
-        return "<%s %s>" % (self.__class__.__name__, str(self))
-
-    def _get_query(self):
-        return self._query
-
-    def _set_query(self, query):
-        self._query = query
-
-    query = property(_get_query, _set_query)
-
-
-class RESTProxy(object):
-    user_agent = "celery/%s" % celery_version
-    timeout = 5
-
-    def __init__(self, url, task_kwargs, logger):
-        self.url = url
-        self.task_kwargs = task_kwargs
-        self.logger = logger
-
-    def _create_request(self):
-        url = URL(self.url)
-        url.query.update(self.task_kwargs)
-        req = urllib2.Request(str(url))
-        req.headers.update(self.http_headers)
-        return req
-
-    def _make_request(self):
-        request = self._create_request()
-        opener = urllib2.build_opener()
-        response = opener.open(request)
-        return response.read()
-
-    def execute(self):
-        response = self._make_request()
-        if not response:
-            raise InvalidResponseError("Empty response")
-        try:
-            payload = deserialize(response)
-        except ValueError, exc:
-            raise InvalidResponseError(str(exc))
-
-        # {"status": "success", "retval": 300}
-        # {"status": "failure": "reason": "Invalid moon alignment."}
-        status = payload["status"]
-        if status == "success":
-            return payload["retval"]
-        elif status == "failure":
-            raise RemoteExecuteError(payload["reason"])
-        else:
-            raise UnknownStatusError(str(status))
-
-    @property
-    def http_headers(self):
-        headers = {"Content-Type": "application/json",
-                   "User-Agent": self.user_agent}
-        return headers
-
-
-class RESTProxyTask(BaseTask):
-    name = "celery.task.rest.RESTProxyTask"
-    user_agent = "celery %s" % celery_version
-
-    def run(self, url, **kwargs):
-        logger = self.get_logger(**kwargs)
-        proxy = RESTProxy(url, kwargs, logger)
-        return proxy.execute()
-
-
-def task_response(fun, *args, **kwargs):
-    import sys
-    try:
-        sys.stderr.write("executing %s\n" % fun)
-        retval = fun(*args, **kwargs)
-        sys.stderr.write("got: %s\n" % retval)
-    except Exception, exc:
-        response = {"status": "failure", "reason": str(exc)}
-    else:
-        response = {"status": "success", "retval": retval}
-
-    return serialize(response)
-
-
-class Task(BaseTask):
-
-    def __call__(self, *args, **kwargs):
-        return task_response(self.run, *args, **kwargs)
+"""))

+ 37 - 0
celery/views.py

@@ -70,3 +70,40 @@ def task_status(request, task_id):
 
     return HttpResponse(JSON_dump({"task": response_data}),
             mimetype="application/json")
+
+
+
+def task_webhook(fun):
+    """Decorator turning a function into a task webhook.
+
+    If an exception is raised within the function, the decorated
+    function catches this and returns an error JSON response, otherwise
+    it returns the result as a JSON response.
+
+
+    Example:
+
+        @task_webhook
+        def add(request):
+            x = int(request.GET["x"])
+            y = int(request.GET["y"])
+            return x + y
+
+        >>> response = add(request)
+        >>> response.content
+        '{"status": "success", "retval": 100}'
+
+    """
+
+    @wraps(fun)
+    def _inner(*args, **kwargs):
+        try:
+            retval = fun(*args, **kwargs)
+        except Exception, exc:
+            response = {"status": "failure", "reason": str(exc)}
+        else:
+            response = {"status": "success", "retval": retval}
+
+        return HttpResponse(JSON_dump(response), mimetype="application/json")
+
+    return _inner

+ 7 - 0
celery/worker/job.py

@@ -38,6 +38,10 @@ celeryd at %%(hostname)s.
 """ % {"EMAIL_SIGNATURE_SEP": EMAIL_SIGNATURE_SEP}
 
 
+class InvalidTaskError(Exception):
+    """The task has invalid data or is not properly constructed."""
+
+
 class AlreadyExecutedError(Exception):
     """Tasks can only be executed once, as they might change
     world-wide state."""
@@ -214,6 +218,9 @@ class TaskWrapper(object):
         kwargs = message_data["kwargs"]
         retries = message_data.get("retries", 0)
 
+        if not hasattr(kwargs, "items"):
+            raise InvalidTaskError("Task kwargs must be a dictionary.")
+
         # Convert any unicode keys in the keyword arguments to ascii.
         kwargs = dict((key.encode("utf-8"), value)
                         for key, value in kwargs.items())

+ 7 - 2
celery/worker/listener.py

@@ -7,7 +7,7 @@ from dateutil.parser import parse as parse_iso8601
 from celery import conf
 from celery import signals
 from celery.utils import retry_over_time
-from celery.worker.job import TaskWrapper
+from celery.worker.job import TaskWrapper, InvalidTaskError
 from celery.worker.revoke import revoked
 from celery.worker.control import ControlDispatch
 from celery.worker.heartbeat import Heart
@@ -134,7 +134,12 @@ class CarrotListener(object):
                                                 logger=self.logger,
                                                 eventer=self.event_dispatcher)
             except NotRegistered, exc:
-                self.logger.error("Unknown task ignored: %s" % (exc))
+                self.logger.error("Unknown task ignored: %s: %s" % (
+                        str(exc), message_data))
+                message.ack()
+            except InvalidTaskError, exc:
+                self.logger.error("Invalid task ignored: %s: %s" % (
+                        str(exc), message_data))
                 message.ack()
             else:
                 self.on_task(task, eta=message_data.get("eta"))

+ 46 - 18
docs/userguide/remote-tasks.rst

@@ -1,31 +1,37 @@
 ==============
- Remote Tasks 
+ Remote Tasks
 ==============
 
-.. module:: celery.task.rest
+.. module:: celery.task.http
 
 Executing tasks on a remote web server
 --------------------------------------
 
 If you need to call into another language, framework or similar, you can
-do so by using HTTP tasks.
+do so by using HTTP callback tasks.
 
-The HTTP tasks (or REST task) uses a simple REST+JSON scheme to take arguments
-and return results, the scheme to call a task is::
+The HTTP callback tasks uses GET/POST arguments and uses a simple JSON response
+to return results. The scheme to call a task is::
 
     GET http://example.com/mytask/?arg1=a,arg2=b,arg3=c
 
+or using POST::
+
+    POST http://example.com/mytask
+
+Whether to use GET or POST is up to you and your requirements.
+
 The web page should then return a response in the following format
 if the execution was successful::
 
     {"status": "success", "retval": ....}
 
-or in the following format if there was an error::
+or if there was an error::
 
     {"status": "failure": "reason": "Invalid moon alignment."}
 
 
-With this information we can define a simple task in Django:
+With this information you could define a simple task in Django:
 
 .. code-block:: python
 
@@ -40,21 +46,43 @@ With this information we can define a simple task in Django:
         response = {"status": "success", "retval": result}
         return HttpResponse(serialize(response), mimetype="application/json")
 
-I'm sure you'll be able to port this scheme to any language/framework.
-New examples and libraries are very welcome!
 
-To execute the task you use :class:`RESTProxyTask`:
+or in Ruby on Rails:
+
+.. code-block:: ruby
+
+    def multiply
+        @x = params[:x].to_i
+        @y = params[:y].to_i
+
+        @status = {:status => "success", :retval => @x * @y}
+
+        render :json => @status
+    end
+
+You can easily port this scheme to any language/framework;
+New examples and libraries are very welcome.
+
+To execute the task you use the :class:`URL` class:
+
+    >>> from celery.task.http import URL
+    >>> res = URL("http://example.com/multiply").get_async(x=10, y=10)
+
+
+:class:`URL` is a shortcut to the :class:`HttpDispatchTask`. You can subclass this to extend the
+functionality.
 
-    >>> from celery.task import RESTProxyTask
-    >>> res = RESTProxyTask.delay("http://example.com/multiply", x=10, y=10)
+    >>> from celery.task.http import HttpDispatchTask
+    >>> res = HttpDispatchTask.delay(url="http://example.com/multiply", method="GET", x=10, y=10)
     >>> res.get()
     100
 
-In your ``celeryd.log`` file you should see the task being processed::
+The output of celeryd (or the logfile if you've enabled it) should show the task being processed::
 
-    [INFO/MainProcess] Task celery.task.rest.RESTProxyTask
-        [f2cc8efc-2a14-40cd-85ad-f1c77c94beeb] processed: 100
+    [INFO/MainProcess] Task celery.task.http.HttpDispatchTask
+            [f2cc8efc-2a14-40cd-85ad-f1c77c94beeb] processed: 100
 
-Since applying tasks can also simply be done via the web and the
-``celery.views.apply`` view, executing tasks from other languages should be a
-no-brainer.
+Since applying tasks can be done via HTTP using the
+``celery.views.apply`` view, executing tasks from other languages is easy.
+For an example service exposing tasks via HTTP you should have a look at
+``examples/celery_http_gateway``.

+ 29 - 0
examples/httpexample/README.rst

@@ -0,0 +1,29 @@
+This example is a simple Django HTTP service exposing a single task
+multiplying two numbers:
+
+The multiply http callback task is in ``views.py``, mapped to an url using
+``urls.py``.
+
+There's no models, so to start it do::
+
+    $ python manage.py runserver
+
+To execute the task you could use curl::
+
+    $ curl http://localhost:8000/multiply?x=10&y=10
+
+which then gives the expected JSON response::
+
+    {"status": "success": "retval": 100}
+
+
+To execute this http callback task asynchronously you could fire up
+a python shell with a properly configured celery and do:
+
+    >>> from celery.task.http import URL
+    >>> res = URL("http://localhost:8000/multiply").get_async(x=10, y=10)
+    >>> res.wait()
+    100
+
+
+That's all!

+ 0 - 0
examples/httpexample/__init__.py


+ 11 - 0
examples/httpexample/manage.py

@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+from django.core.management import execute_manager
+try:
+    import settings # Assumed to be in the same directory.
+except ImportError:
+    import sys
+    sys.stderr.write("Error: Can't find the file 'settings.py' in the directory containing %r. It appears you've customized things.\nYou'll have to run django-admin.py, passing it your settings module.\n(If the file settings.py does indeed exist, it's causing an ImportError somehow.)\n" % __file__)
+    sys.exit(1)
+
+if __name__ == "__main__":
+    execute_manager(settings)

+ 79 - 0
examples/httpexample/settings.py

@@ -0,0 +1,79 @@
+# Django settings for httpexample project.
+
+DEBUG = True
+TEMPLATE_DEBUG = DEBUG
+
+ADMINS = (
+    # ('Your Name', 'your_email@domain.com'),
+)
+
+MANAGERS = ADMINS
+
+DATABASE_ENGINE = ''           # 'postgresql_psycopg2', 'postgresql', 'mysql', 'sqlite3' or 'oracle'.
+DATABASE_NAME = ''             # Or path to database file if using sqlite3.
+DATABASE_USER = ''             # Not used with sqlite3.
+DATABASE_PASSWORD = ''         # Not used with sqlite3.
+DATABASE_HOST = ''             # Set to empty string for localhost. Not used with sqlite3.
+DATABASE_PORT = ''             # Set to empty string for default. Not used with sqlite3.
+
+# Local time zone for this installation. Choices can be found here:
+# http://en.wikipedia.org/wiki/List_of_tz_zones_by_name
+# although not all choices may be available on all operating systems.
+# If running in a Windows environment this must be set to the same as your
+# system time zone.
+TIME_ZONE = 'America/Chicago'
+
+# Language code for this installation. All choices can be found here:
+# http://www.i18nguy.com/unicode/language-identifiers.html
+LANGUAGE_CODE = 'en-us'
+
+SITE_ID = 1
+
+# If you set this to False, Django will make some optimizations so as not
+# to load the internationalization machinery.
+USE_I18N = True
+
+# Absolute path to the directory that holds media.
+# Example: "/home/media/media.lawrence.com/"
+MEDIA_ROOT = ''
+
+# URL that handles the media served from MEDIA_ROOT. Make sure to use a
+# trailing slash if there is a path component (optional in other cases).
+# Examples: "http://media.lawrence.com", "http://example.com/media/"
+MEDIA_URL = ''
+
+# URL prefix for admin media -- CSS, JavaScript and images. Make sure to use a
+# trailing slash.
+# Examples: "http://foo.com/media/", "/media/".
+ADMIN_MEDIA_PREFIX = '/media/'
+
+# Make this unique, and don't share it with anybody.
+SECRET_KEY = 'p^@q$@nal#-0+w@v_3bcj2ug(zbh5_m2on8^kkn&!e!b=a@o__'
+
+# List of callables that know how to import templates from various sources.
+TEMPLATE_LOADERS = (
+    'django.template.loaders.filesystem.load_template_source',
+    'django.template.loaders.app_directories.load_template_source',
+#     'django.template.loaders.eggs.load_template_source',
+)
+
+MIDDLEWARE_CLASSES = (
+    'django.middleware.common.CommonMiddleware',
+    'django.contrib.sessions.middleware.SessionMiddleware',
+    'django.contrib.auth.middleware.AuthenticationMiddleware',
+)
+
+ROOT_URLCONF = 'httpexample.urls'
+
+TEMPLATE_DIRS = (
+    # Put strings here, like "/home/html/django_templates" or "C:/www/django/templates".
+    # Always use forward slashes, even on Windows.
+    # Don't forget to use absolute paths, not relative paths.
+)
+
+INSTALLED_APPS = (
+    'django.contrib.auth',
+    'django.contrib.contenttypes',
+    'django.contrib.sessions',
+    'django.contrib.sites',
+)

+ 10 - 0
examples/httpexample/urls.py

@@ -0,0 +1,10 @@
+from django.conf.urls.defaults import *
+import views
+
+# Uncomment the next two lines to enable the admin:
+# from django.contrib import admin
+# admin.autodiscover()
+
+urlpatterns = patterns('',
+        url(r'^multiply/', views.multiply, name="multiply"),
+)

+ 12 - 0
examples/httpexample/views.py

@@ -0,0 +1,12 @@
+from django.http import HttpResponse
+
+from anyjson import serialize
+
+
+def multiply(request):
+    x = int(request.GET["x"])
+    y = int(request.GET["y"])
+
+    retval = x * y
+    response = {"status": "success", "retval": retval}
+    return HttpResponse(serialize(response), mimetype="application/json")