decorators.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. """
  2. Decorators
  3. """
  4. from inspect import getargspec
  5. from billiard.utils.functional import wraps
  6. from celery.task.base import Task, PeriodicTask
  7. def task(*args, **options):
  8. """Decorator to create a task class out of any callable.
  9. Examples:
  10. .. code-block:: python
  11. @task()
  12. def refresh_feed(url):
  13. return Feed.objects.get(url=url).refresh()
  14. With setting extra options and using retry.
  15. .. code-block:: python
  16. @task(exchange="feeds")
  17. def refresh_feed(url, **kwargs):
  18. try:
  19. return Feed.objects.get(url=url).refresh()
  20. except socket.error, exc:
  21. refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
  22. Calling the resulting task:
  23. >>> refresh_feed("http://example.com/rss") # Regular
  24. <Feed: http://example.com/rss>
  25. >>> refresh_feed.delay("http://example.com/rss") # Async
  26. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  27. """
  28. def inner_create_task_cls(**options):
  29. def _create_task_cls(fun):
  30. base = options.pop("base", Task)
  31. @wraps(fun)
  32. def run(self, *args, **kwargs):
  33. return fun(*args, **kwargs)
  34. # Save the argspec for this task so we can recognize
  35. # which default task kwargs we're going to pass to it later.
  36. # (this happens in celery.utils.fun_takes_kwargs)
  37. run.argspec = getargspec(fun)
  38. cls_dict = dict(options, run=run,
  39. __module__=fun.__module__,
  40. __doc__=fun.__doc__)
  41. return type(fun.__name__, (base, ), cls_dict)()
  42. return _create_task_cls
  43. if len(args) == 1 and callable(args[0]):
  44. return inner_create_task_cls()(*args)
  45. return inner_create_task_cls(**options)
  46. def periodic_task(**options):
  47. """Task decorator to create a periodic task.
  48. Example task, scheduling a task once every day:
  49. .. code-block:: python
  50. from datetime import timedelta
  51. @periodic_task(run_every=timedelta(days=1))
  52. def cronjob(**kwargs):
  53. logger = cronjob.get_logger(**kwargs)
  54. logger.warn("Task running...")
  55. """
  56. return task(**dict({"base": PeriodicTask}, **options))