__init__.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task
  4. ===========
  5. Creating tasks and subtasks
  6. """
  7. from __future__ import absolute_import
  8. import warnings
  9. from ..app import app_or_default
  10. from ..exceptions import CDeprecationWarning
  11. from .base import Task, PeriodicTask
  12. from .sets import TaskSet, subtask
  13. from .chords import chord
  14. from .control import discard_all
  15. __all__ = ["Task", "TaskSet", "PeriodicTask", "subtask",
  16. "discard_all", "chord", "group"]
  17. group = TaskSet
  18. def task(*args, **kwargs):
  19. """Decorator to create a task class out of any callable.
  20. **Examples**
  21. .. code-block:: python
  22. @task
  23. def refresh_feed(url):
  24. return Feed.objects.get(url=url).refresh()
  25. With setting extra options and using retry.
  26. .. code-block:: python
  27. @task(max_retries=10)
  28. def refresh_feed(url):
  29. try:
  30. return Feed.objects.get(url=url).refresh()
  31. except socket.error, exc:
  32. refresh_feed.retry(exc=exc)
  33. Calling the resulting task:
  34. >>> refresh_feed("http://example.com/rss") # Regular
  35. <Feed: http://example.com/rss>
  36. >>> refresh_feed.delay("http://example.com/rss") # Async
  37. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  38. """
  39. kwargs.setdefault("accept_magic_kwargs", False)
  40. return app_or_default().task(*args, **kwargs)
  41. def periodic_task(*args, **options):
  42. """Decorator to create a task class out of any callable.
  43. .. admonition:: Examples
  44. .. code-block:: python
  45. @task
  46. def refresh_feed(url):
  47. return Feed.objects.get(url=url).refresh()
  48. With setting extra options and using retry.
  49. .. code-block:: python
  50. @task(exchange="feeds")
  51. def refresh_feed(url, **kwargs):
  52. try:
  53. return Feed.objects.get(url=url).refresh()
  54. except socket.error, exc:
  55. refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
  56. Calling the resulting task:
  57. >>> refresh_feed("http://example.com/rss") # Regular
  58. <Feed: http://example.com/rss>
  59. >>> refresh_feed.delay("http://example.com/rss") # Async
  60. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  61. """
  62. return task(**dict({"base": PeriodicTask}, **options))
  63. @task(name="celery.backend_cleanup")
  64. def backend_cleanup():
  65. backend_cleanup.backend.cleanup()
  66. class PingTask(Task): # ✞
  67. name = "celery.ping"
  68. def run(self, **kwargs):
  69. return "pong"
  70. def ping(): # ✞
  71. """Deprecated and scheduled for removal in Celery 2.3.
  72. Please use :meth:`celery.task.control.ping` instead.
  73. """
  74. warnings.warn(CDeprecationWarning(
  75. "The ping task has been deprecated and will be removed in Celery "
  76. "v2.3. Please use inspect.ping instead."))
  77. return PingTask.apply_async().get()