decorators.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from celery.task.base import Task
  2. from celery.registry import tasks
  3. from inspect import getargspec
  4. def task(**options):
  5. """Make a task out of any callable.
  6. :keyword autoregister: Automatically register the task in the
  7. task registry.
  8. Examples:
  9. >>> @task()
  10. ... def refresh_feed(url):
  11. ... return Feed.objects.get(url=url).refresh()
  12. >>> refresh_feed("http://example.com/rss") # Regular
  13. <Feed: http://example.com/rss>
  14. >>> refresh_feed.delay("http://example.com/rss") # Async
  15. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  16. # With setting extra options and using retry.
  17. >>> @task(exchange="feeds")
  18. ... def refresh_feed(url, **kwargs):
  19. ... try:
  20. ... return Feed.objects.get(url=url).refresh()
  21. ... except socket.error, exc:
  22. ... refresh_feed.retry(args=[url], kwargs=kwargs,
  23. ... exc=exc)
  24. """
  25. def _create_task_cls(fun):
  26. name = options.pop("name", None)
  27. autoregister = options.pop("autoregister", True)
  28. cls_name = fun.__name__
  29. def run(self, *args, **kwargs):
  30. return fun(*args, **kwargs)
  31. run.__name__ = fun.__name__
  32. run.argspec = getargspec(fun)
  33. cls_dict = dict(options)
  34. cls_dict["run"] = run
  35. cls_dict["__module__"] = fun.__module__
  36. task = type(cls_name, (Task, ), cls_dict)()
  37. autoregister and tasks.register(task)
  38. return task
  39. return _create_task_cls