beat.py 12 KB


  1. """
  2. Periodic Task Scheduler
  3. """
  4. import time
  5. import shelve
  6. import threading
  7. import traceback
  8. import multiprocessing
  9. from datetime import datetime
  10. from UserDict import UserDict
  11. from celery import log
  12. from celery import conf
  13. from celery import platforms
  14. from celery import registry
  15. from celery.execute import send_task
  16. from celery.schedules import maybe_schedule
  17. from celery.messaging import establish_connection, TaskPublisher
  18. from celery.utils import instantiate
  19. from celery.utils.info import humanize_seconds
  20. class SchedulingError(Exception):
  21. """An error occured while scheduling a task."""
  22. class ScheduleEntry(object):
  23. """An entry in the scheduler.
  24. :param name: see :attr:`name`.
  25. :param schedule: see :attr:`schedule`.
  26. :param args: see :attr:`args`.
  27. :param kwargs: see :attr:`kwargs`.
  28. :keyword last_run_at: see :attr:`last_run_at`.
  29. :keyword total_run_count: see :attr:`total_run_count`.
  30. .. attribute:: name
  31. The task name.
  32. .. attribute:: schedule
  33. The schedule (run_every/crontab)
  34. .. attribute:: args
  35. Args to apply.
  36. .. attribute:: kwargs
  37. Keyword arguments to apply.
  38. .. attribute:: last_run_at
  39. The time and date of when this task was last run.
  40. .. attribute:: total_run_count
  41. Total number of times this periodic task has been executed.
  42. """
  43. def __init__(self, name=None, task=None, last_run_at=None,
  44. total_run_count=None, schedule=None, args=(), kwargs={},
  45. options={}, relative=False):
  46. self.name = name
  47. self.task = task
  48. self.schedule = maybe_schedule(schedule, relative)
  49. self.args = args
  50. self.kwargs = kwargs
  51. self.options = options
  52. self.last_run_at = last_run_at or datetime.now()
  53. self.total_run_count = total_run_count or 0
  54. def next(self, last_run_at=None):
  55. """Returns a new instance of the same class, but with
  56. its date and count fields updated."""
  57. last_run_at = last_run_at or datetime.now()
  58. total_run_count = self.total_run_count + 1
  59. return self.__class__(**dict(self,
  60. last_run_at=last_run_at,
  61. total_run_count=total_run_count))
  62. def update(self, other):
  63. """Update values from another entry.
  64. Does only update "editable" fields (schedule, args,
  65. kwargs, options).
  66. """
  67. self.task = other.task
  68. self.schedule = other.schedule
  69. self.args = other.args
  70. self.kwargs = other.kwargs
  71. self.options = other.options
  72. def is_due(self):
  73. """See :meth:`celery.task.base.PeriodicTask.is_due`."""
  74. return self.schedule.is_due(self.last_run_at)
  75. def __iter__(self):
  76. return vars(self).iteritems()
  77. def __repr__(self):
  78. return "<Entry: %s %s(*%s, **%s) {%s}>" % (self.name,
  79. self.task,
  80. self.args,
  81. self.kwargs,
  82. self.schedule)
  83. class Scheduler(UserDict):
  84. """Scheduler for periodic tasks.
  85. :keyword schedule: see :attr:`schedule`.
  86. :keyword logger: see :attr:`logger`.
  87. :keyword max_interval: see :attr:`max_interval`.
  88. .. attribute:: schedule
  89. The schedule dict/shelve.
  90. .. attribute:: logger
  91. The logger to use.
  92. .. attribute:: max_interval
  93. Maximum time to sleep between re-checking the schedule.
  94. """
  95. Entry = ScheduleEntry
  96. Publisher = TaskPublisher
  97. def __init__(self, schedule=None, logger=None, max_interval=None,
  98. **kwargs):
  99. UserDict.__init__(self)
  100. if schedule is None:
  101. schedule = {}
  102. self.data = schedule
  103. self.logger = logger or log.get_default_logger(name="celery.beat")
  104. self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
  105. self.setup_schedule()
  106. def maybe_due(self, entry, publisher=None):
  107. is_due, next_time_to_run = entry.is_due()
  108. if is_due:
  109. self.logger.debug("Scheduler: Sending due task %s" % entry.task)
  110. try:
  111. result = self.apply_async(entry, publisher=publisher)
  112. except Exception, exc:
  113. self.logger.error("Message Error: %s\n%s" % (exc,
  114. traceback.format_stack()))
  115. else:
  116. self.logger.debug("%s sent. id->%s" % (entry.task,
  117. result.task_id))
  118. return next_time_to_run
  119. def tick(self):
  120. """Run a tick, that is one iteration of the scheduler.
  121. Executes all due tasks.
  122. """
  123. remaining_times = []
  124. connection = establish_connection()
  125. publisher = self.Publisher(connection=connection)
  126. try:
  127. try:
  128. for entry in self.schedule.itervalues():
  129. next_time_to_run = self.maybe_due(entry, publisher)
  130. if next_time_to_run:
  131. remaining_times.append(next_time_to_run)
  132. except RuntimeError:
  133. pass
  134. finally:
  135. publisher.close()
  136. connection.close()
  137. return min(remaining_times + [self.max_interval])
  138. def reserve(self, entry):
  139. new_entry = self.schedule[entry.name] = entry.next()
  140. return new_entry
  141. def apply_async(self, entry, publisher=None, **kwargs):
  142. # Update timestamps and run counts before we actually execute,
  143. # so we have that done if an exception is raised (doesn't schedule
  144. # forever.)
  145. entry = self.reserve(entry)
  146. try:
  147. task = registry.tasks[entry.task]
  148. except KeyError:
  149. task = None
  150. try:
  151. if task:
  152. result = task.apply_async(entry.args, entry.kwargs,
  153. publisher=publisher,
  154. **entry.options)
  155. else:
  156. result = self.send_task(entry.task, entry.args, entry.kwargs,
  157. publisher=publisher,
  158. **entry.options)
  159. except Exception, exc:
  160. raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
  161. entry.name, exc))
  162. return result
  163. def send_task(self, *args, **kwargs): # pragma: no cover
  164. return send_task(*args, **kwargs)
  165. def setup_schedule(self):
  166. pass
  167. def sync(self):
  168. pass
  169. def close(self):
  170. self.sync()
  171. def add(self, **kwargs):
  172. entry = self.Entry(**kwargs)
  173. self.schedule[entry.name] = entry
  174. return entry
  175. def update_from_dict(self, dict_):
  176. self.update(dict((name, self.Entry(name, **entry))
  177. for name, entry in dict_.items()))
  178. def merge_inplace(self, b):
  179. A, B = set(self.keys()), set(b.keys())
  180. # Remove items from disk not in the schedule anymore.
  181. for key in A ^ B:
  182. self.pop(key, None)
  183. # Update and add new items in the schedule
  184. for key in B:
  185. entry = self.Entry(**dict(b[key]))
  186. if self.get(key):
  187. self[key].update(entry)
  188. else:
  189. self[key] = entry
  190. def get_schedule(self):
  191. return self.data
  192. @property
  193. def schedule(self):
  194. return self.get_schedule()
  195. @property
  196. def info(self):
  197. return ""
  198. class PersistentScheduler(Scheduler):
  199. persistence = shelve
  200. _store = None
  201. def __init__(self, *args, **kwargs):
  202. self.schedule_filename = kwargs.get("schedule_filename")
  203. Scheduler.__init__(self, *args, **kwargs)
  204. def setup_schedule(self):
  205. self._store = self.persistence.open(self.schedule_filename)
  206. self.data = self._store
  207. self.merge_inplace(conf.CELERYBEAT_SCHEDULE)
  208. self.sync()
  209. self.data = self._store
  210. def sync(self):
  211. if self._store is not None:
  212. self.logger.debug("CeleryBeat: Syncing schedule to disk...")
  213. self._store.sync()
  214. def close(self):
  215. self.sync()
  216. self._store.close()
  217. @property
  218. def info(self):
  219. return " . db -> %s" % (self.schedule_filename, )
  220. class Service(object):
  221. scheduler_cls = PersistentScheduler
  222. def __init__(self, logger=None,
  223. max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
  224. schedule=conf.CELERYBEAT_SCHEDULE,
  225. schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
  226. scheduler_cls=None):
  227. self.max_interval = max_interval
  228. self.scheduler_cls = scheduler_cls or self.scheduler_cls
  229. self.logger = logger or log.get_default_logger(name="celery.beat")
  230. self.schedule = schedule
  231. self.schedule_filename = schedule_filename
  232. self._scheduler = None
  233. self._shutdown = threading.Event()
  234. self._stopped = threading.Event()
  235. silence = self.max_interval < 60 and 10 or 1
  236. self.debug = log.SilenceRepeated(self.logger.debug,
  237. max_iterations=silence)
  238. def start(self, embedded_process=False):
  239. self.logger.info("Celerybeat: Starting...")
  240. self.logger.debug("Celerybeat: "
  241. "Ticking with max interval->%s" % (
  242. humanize_seconds(self.scheduler.max_interval)))
  243. if embedded_process:
  244. platforms.set_process_title("celerybeat")
  245. try:
  246. try:
  247. while not self._shutdown.isSet():
  248. interval = self.scheduler.tick()
  249. self.debug("Celerybeat: Waking up %s." % (
  250. humanize_seconds(interval, prefix="in ")))
  251. time.sleep(interval)
  252. except (KeyboardInterrupt, SystemExit):
  253. self._shutdown.set()
  254. finally:
  255. self.sync()
  256. def sync(self):
  257. self.scheduler.close()
  258. self._stopped.set()
  259. def stop(self, wait=False):
  260. self.logger.info("Celerybeat: Shutting down...")
  261. self._shutdown.set()
  262. wait and self._stopped.wait() # block until shutdown done.
  263. @property
  264. def scheduler(self):
  265. if self._scheduler is None:
  266. filename = self.schedule_filename
  267. self._scheduler = instantiate(self.scheduler_cls,
  268. schedule_filename=filename,
  269. logger=self.logger,
  270. max_interval=self.max_interval)
  271. self._scheduler.update_from_dict(self.schedule)
  272. return self._scheduler
  273. class _Threaded(threading.Thread):
  274. """Embedded task scheduler using threading."""
  275. def __init__(self, *args, **kwargs):
  276. super(_Threaded, self).__init__()
  277. self.service = Service(*args, **kwargs)
  278. self.setDaemon(True)
  279. self.setName("Beat")
  280. def run(self):
  281. self.service.start()
  282. def stop(self):
  283. self.service.stop(wait=True)
  284. class _Process(multiprocessing.Process):
  285. """Embedded task scheduler using multiprocessing."""
  286. def __init__(self, *args, **kwargs):
  287. super(_Process, self).__init__()
  288. self.service = Service(*args, **kwargs)
  289. self.name = "Beat"
  290. def run(self):
  291. platforms.reset_signal("SIGTERM")
  292. self.service.start(embedded_process=True)
  293. def stop(self):
  294. self.service.stop()
  295. self.terminate()
  296. def EmbeddedService(*args, **kwargs):
  297. """Return embedded clock service.
  298. :keyword thread: Run threaded instead of as a separate process.
  299. Default is ``False``.
  300. """
  301. if kwargs.pop("thread", False):
  302. # Need short max interval to be able to stop thread
  303. # in reasonable time.
  304. kwargs.setdefault("max_interval", 1)
  305. return _Threaded(*args, **kwargs)
  306. return _Process(*args, **kwargs)