decorators.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. """
  2. Decorators
  3. """
  4. from inspect import getargspec
  5. from celery import registry
  6. from celery.task.base import Task, PeriodicTask
  7. from celery.utils.functional import wraps
  8. def task(*args, **options):
  9. """Decorator to create a task class out of any callable.
  10. Examples:
  11. .. code-block:: python
  12. @task()
  13. def refresh_feed(url):
  14. return Feed.objects.get(url=url).refresh()
  15. With setting extra options and using retry.
  16. .. code-block:: python
  17. @task(exchange="feeds")
  18. def refresh_feed(url, **kwargs):
  19. try:
  20. return Feed.objects.get(url=url).refresh()
  21. except socket.error, exc:
  22. refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
  23. Calling the resulting task:
  24. >>> refresh_feed("http://example.com/rss") # Regular
  25. <Feed: http://example.com/rss>
  26. >>> refresh_feed.delay("http://example.com/rss") # Async
  27. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  28. """
  29. def inner_create_task_cls(**options):
  30. def _create_task_cls(fun):
  31. base = options.pop("base", Task)
  32. @wraps(fun, assigned=("__module__", "__name__"))
  33. def run(self, *args, **kwargs):
  34. return fun(*args, **kwargs)
  35. # Save the argspec for this task so we can recognize
  36. # which default task kwargs we're going to pass to it later.
  37. # (this happens in celery.utils.fun_takes_kwargs)
  38. run.argspec = getargspec(fun)
  39. cls_dict = dict(options, run=run,
  40. __module__=fun.__module__,
  41. __doc__=fun.__doc__)
  42. T = type(fun.__name__, (base, ), cls_dict)()
  43. return registry.tasks[T.name] # global instance.
  44. return _create_task_cls
  45. if len(args) == 1 and callable(args[0]):
  46. return inner_create_task_cls()(*args)
  47. return inner_create_task_cls(**options)
  48. def periodic_task(**options):
  49. """Task decorator to create a periodic task.
  50. Example task, scheduling a task once every day:
  51. .. code-block:: python
  52. from datetime import timedelta
  53. @periodic_task(run_every=timedelta(days=1))
  54. def cronjob(**kwargs):
  55. logger = cronjob.get_logger(**kwargs)
  56. logger.warn("Task running...")
  57. """
  58. return task(**dict({"base": PeriodicTask}, **options))