__init__.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task
  4. ~~~~~~~~~~~
  5. Creating tasks, subtasks, sets and chords.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from .. import current_app
  11. from ..app import app_or_default, current_task as _current_task
  12. from ..local import Proxy
  13. from ..utils import uuid
  14. from .base import BaseTask, Task, PeriodicTask # noqa
  15. from .sets import group, TaskSet, subtask # noqa
  16. from .chords import chord # noqa
  17. from .control import discard_all # noqa
  18. current = Proxy(_current_task)
  19. def task(*args, **kwargs):
  20. """Decorator to create a task class out of any callable.
  21. **Examples**
  22. .. code-block:: python
  23. @task
  24. def refresh_feed(url):
  25. return Feed.objects.get(url=url).refresh()
  26. With setting extra options and using retry.
  27. .. code-block:: python
  28. @task(max_retries=10)
  29. def refresh_feed(url):
  30. try:
  31. return Feed.objects.get(url=url).refresh()
  32. except socket.error, exc:
  33. refresh_feed.retry(exc=exc)
  34. Calling the resulting task:
  35. >>> refresh_feed("http://example.com/rss") # Regular
  36. <Feed: http://example.com/rss>
  37. >>> refresh_feed.delay("http://example.com/rss") # Async
  38. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  39. """
  40. kwargs.setdefault("accept_magic_kwargs", False)
  41. return app_or_default().task(*args, **kwargs)
  42. def periodic_task(*args, **options):
  43. """Decorator to create a task class out of any callable.
  44. .. admonition:: Examples
  45. .. code-block:: python
  46. @task
  47. def refresh_feed(url):
  48. return Feed.objects.get(url=url).refresh()
  49. With setting extra options and using retry.
  50. .. code-block:: python
  51. from celery.task import current
  52. @task(exchange="feeds")
  53. def refresh_feed(url):
  54. try:
  55. return Feed.objects.get(url=url).refresh()
  56. except socket.error, exc:
  57. current.retry(exc=exc)
  58. Calling the resulting task:
  59. >>> refresh_feed("http://example.com/rss") # Regular
  60. <Feed: http://example.com/rss>
  61. >>> refresh_feed.delay("http://example.com/rss") # Async
  62. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  63. """
  64. return task(**dict({"base": PeriodicTask}, **options))
  65. backend_cleanup = Proxy(lambda: current_app.tasks["celery.backend_cleanup"])
  66. class chain(object):
  67. def __init__(self, *tasks):
  68. self.tasks = tasks
  69. def apply_async(self, **kwargs):
  70. tasks = [task.clone(task_id=uuid(), **kwargs)
  71. for task in self.tasks]
  72. reduce(lambda a, b: a.link(b), tasks)
  73. tasks[0].apply_async()
  74. results = [task.type.AsyncResult(task.options["task_id"])
  75. for task in tasks]
  76. def update_parent(result, parent):
  77. result.parent = parent
  78. return parent
  79. reduce(update_parent, reversed(results))
  80. return results[-1]