base.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.base
  4. ~~~~~~~~~~~~~~~~
  5. The task implementation has been moved to :mod:`celery.app.task`.
  6. :copyright: (c) 2009 - 2011 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from .. import current_app
  11. from ..app.task import Context, TaskType, BaseTask # noqa
  12. from ..schedules import maybe_schedule
  13. from ..utils import deprecated, timeutils
  14. Task = current_app.Task
  15. @deprecated("Importing TaskSet from celery.task.base",
  16. alternative="Use celery.task.TaskSet instead.",
  17. removal="2.4")
  18. def TaskSet(*args, **kwargs):
  19. from celery.task.sets import TaskSet
  20. return TaskSet(*args, **kwargs)
  21. @deprecated("Importing subtask from celery.task.base",
  22. alternative="Use celery.task.subtask instead.",
  23. removal="2.4")
  24. def subtask(*args, **kwargs):
  25. from celery.task.sets import subtask
  26. return subtask(*args, **kwargs)
  27. class PeriodicTask(Task):
  28. """A periodic task is a task that behaves like a :manpage:`cron` job.
  29. Results of periodic tasks are not stored by default.
  30. .. attribute:: run_every
  31. *REQUIRED* Defines how often the task is run (its interval),
  32. it can be a :class:`~datetime.timedelta` object, a
  33. :class:`~celery.schedules.crontab` object or an integer
  34. specifying the time in seconds.
  35. .. attribute:: relative
  36. If set to :const:`True`, run times are relative to the time when the
  37. server was started. This was the previous behaviour, periodic tasks
  38. are now scheduled by the clock.
  39. :raises NotImplementedError: if the :attr:`run_every` attribute is
  40. not defined.
  41. Example
  42. >>> from celery.task import tasks, PeriodicTask
  43. >>> from datetime import timedelta
  44. >>> class EveryThirtySecondsTask(PeriodicTask):
  45. ... run_every = timedelta(seconds=30)
  46. ...
  47. ... def run(self, **kwargs):
  48. ... logger = self.get_logger(**kwargs)
  49. ... logger.info("Execute every 30 seconds")
  50. >>> from celery.task import PeriodicTask
  51. >>> from celery.schedules import crontab
  52. >>> class EveryMondayMorningTask(PeriodicTask):
  53. ... run_every = crontab(hour=7, minute=30, day_of_week=1)
  54. ...
  55. ... def run(self, **kwargs):
  56. ... logger = self.get_logger(**kwargs)
  57. ... logger.info("Execute every Monday at 7:30AM.")
  58. >>> class EveryMorningTask(PeriodicTask):
  59. ... run_every = crontab(hours=7, minute=30)
  60. ...
  61. ... def run(self, **kwargs):
  62. ... logger = self.get_logger(**kwargs)
  63. ... logger.info("Execute every day at 7:30AM.")
  64. >>> class EveryQuarterPastTheHourTask(PeriodicTask):
  65. ... run_every = crontab(minute=15)
  66. ...
  67. ... def run(self, **kwargs):
  68. ... logger = self.get_logger(**kwargs)
  69. ... logger.info("Execute every 0:15 past the hour every day.")
  70. """
  71. abstract = True
  72. ignore_result = True
  73. type = "periodic"
  74. relative = False
  75. options = None
  76. def __init__(self):
  77. app = current_app
  78. if not hasattr(self, "run_every"):
  79. raise NotImplementedError(
  80. "Periodic tasks must have a run_every attribute")
  81. self.run_every = maybe_schedule(self.run_every, self.relative)
  82. # For backward compatibility, add the periodic task to the
  83. # configuration schedule instead.
  84. app.conf.CELERYBEAT_SCHEDULE[self.name] = {
  85. "task": self.name,
  86. "schedule": self.run_every,
  87. "args": (),
  88. "kwargs": {},
  89. "options": self.options or {},
  90. "relative": self.relative,
  91. }
  92. super(PeriodicTask, self).__init__()
  93. def timedelta_seconds(self, delta):
  94. """Convert :class:`~datetime.timedelta` to seconds.
  95. Doesn't account for negative timedeltas.
  96. """
  97. return timeutils.timedelta_seconds(delta)
  98. def is_due(self, last_run_at):
  99. """Returns tuple of two items `(is_due, next_time_to_run)`,
  100. where next time to run is in seconds.
  101. See :meth:`celery.schedules.schedule.is_due` for more information.
  102. """
  103. return self.run_every.is_due(last_run_at)
  104. def remaining_estimate(self, last_run_at):
  105. """Returns when the periodic task should run next as a timedelta."""
  106. return self.run_every.remaining_estimate(last_run_at)