decorators.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from inspect import getargspec
  2. from celery.task.base import Task, PeriodicTask
  3. def task(**options):
  4. """Make a task out of any callable.
  5. Examples:
  6. >>> @task()
  7. ... def refresh_feed(url):
  8. ... return Feed.objects.get(url=url).refresh()
  9. >>> refresh_feed("http://example.com/rss") # Regular
  10. <Feed: http://example.com/rss>
  11. >>> refresh_feed.delay("http://example.com/rss") # Async
  12. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  13. # With setting extra options and using retry.
  14. >>> @task(exchange="feeds")
  15. ... def refresh_feed(url, **kwargs):
  16. ... try:
  17. ... return Feed.objects.get(url=url).refresh()
  18. ... except socket.error, exc:
  19. ... refresh_feed.retry(args=[url], kwargs=kwargs,
  20. ... exc=exc)
  21. """
  22. def _create_task_cls(fun):
  23. base = options.pop("base", Task)
  24. cls_name = fun.__name__
  25. def run(self, *args, **kwargs):
  26. return fun(*args, **kwargs)
  27. run.__name__ = fun.__name__
  28. run.argspec = getargspec(fun)
  29. cls_dict = dict(options)
  30. cls_dict["run"] = run
  31. cls_dict["__module__"] = fun.__module__
  32. task = type(cls_name, (base, ), cls_dict)()
  33. return task
  34. return _create_task_cls
  35. def periodic_task(**options):
  36. """Task decorator to create a periodic task.
  37. **Usage**
  38. Run a task once every day:
  39. .. code-block:: python
  40. from datetime import timedelta
  41. @periodic_task(run_every=timedelta(days=1))
  42. def cronjob(**kwargs):
  43. logger = cronjob.get_logger(**kwargs)
  44. logger.warn("Task running...")
  45. """
  46. options["base"] = PeriodicTask
  47. return task(**options)