base.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.base
  4. ~~~~~~~~~~~~~~~~
  5. The task implementation has been moved to :mod:`celery.app.task`.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from .. import current_app
  11. from ..app.task import Context, TaskType, BaseTask # noqa
  12. from ..schedules import maybe_schedule
  13. from ..utils import timeutils
  14. Task = current_app.Task
  15. class PeriodicTask(Task):
  16. """A periodic task is a task that behaves like a :manpage:`cron` job.
  17. Results of periodic tasks are not stored by default.
  18. .. attribute:: run_every
  19. *REQUIRED* Defines how often the task is run (its interval),
  20. it can be a :class:`~datetime.timedelta` object, a
  21. :class:`~celery.schedules.crontab` object or an integer
  22. specifying the time in seconds.
  23. .. attribute:: relative
  24. If set to :const:`True`, run times are relative to the time when the
  25. server was started. This was the previous behaviour, periodic tasks
  26. are now scheduled by the clock.
  27. :raises NotImplementedError: if the :attr:`run_every` attribute is
  28. not defined.
  29. Example
  30. >>> from celery.task import tasks, PeriodicTask
  31. >>> from datetime import timedelta
  32. >>> class EveryThirtySecondsTask(PeriodicTask):
  33. ... run_every = timedelta(seconds=30)
  34. ...
  35. ... def run(self, **kwargs):
  36. ... logger = self.get_logger(**kwargs)
  37. ... logger.info("Execute every 30 seconds")
  38. >>> from celery.task import PeriodicTask
  39. >>> from celery.schedules import crontab
  40. >>> class EveryMondayMorningTask(PeriodicTask):
  41. ... run_every = crontab(hour=7, minute=30, day_of_week=1)
  42. ...
  43. ... def run(self, **kwargs):
  44. ... logger = self.get_logger(**kwargs)
  45. ... logger.info("Execute every Monday at 7:30AM.")
  46. >>> class EveryMorningTask(PeriodicTask):
  47. ... run_every = crontab(hours=7, minute=30)
  48. ...
  49. ... def run(self, **kwargs):
  50. ... logger = self.get_logger(**kwargs)
  51. ... logger.info("Execute every day at 7:30AM.")
  52. >>> class EveryQuarterPastTheHourTask(PeriodicTask):
  53. ... run_every = crontab(minute=15)
  54. ...
  55. ... def run(self, **kwargs):
  56. ... logger = self.get_logger(**kwargs)
  57. ... logger.info("Execute every 0:15 past the hour every day.")
  58. """
  59. abstract = True
  60. ignore_result = True
  61. type = "periodic"
  62. relative = False
  63. options = None
  64. def __init__(self):
  65. app = current_app
  66. if not hasattr(self, "run_every"):
  67. raise NotImplementedError(
  68. "Periodic tasks must have a run_every attribute")
  69. self.run_every = maybe_schedule(self.run_every, self.relative)
  70. # For backward compatibility, add the periodic task to the
  71. # configuration schedule instead.
  72. app.conf.CELERYBEAT_SCHEDULE[self.name] = {
  73. "task": self.name,
  74. "schedule": self.run_every,
  75. "args": (),
  76. "kwargs": {},
  77. "options": self.options or {},
  78. "relative": self.relative,
  79. }
  80. super(PeriodicTask, self).__init__()
  81. def timedelta_seconds(self, delta):
  82. """Convert :class:`~datetime.timedelta` to seconds.
  83. Doesn't account for negative timedeltas.
  84. """
  85. return timeutils.timedelta_seconds(delta)
  86. def is_due(self, last_run_at):
  87. """Returns tuple of two items `(is_due, next_time_to_run)`,
  88. where next time to run is in seconds.
  89. See :meth:`celery.schedules.schedule.is_due` for more information.
  90. """
  91. return self.run_every.is_due(last_run_at)
  92. def remaining_estimate(self, last_run_at):
  93. """Returns when the periodic task should run next as a timedelta."""
  94. return self.run_every.remaining_estimate(last_run_at)