beat.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. """
  2. Periodic Task Scheduler
  3. """
  4. import time
  5. import shelve
  6. import sys
  7. import threading
  8. import traceback
  9. import multiprocessing
  10. from datetime import datetime
  11. from UserDict import UserDict
  12. from celery import platforms
  13. from celery import registry
  14. from celery.app import app_or_default
  15. from celery.log import SilenceRepeated
  16. from celery.schedules import maybe_schedule
  17. from celery.utils import instantiate
  18. from celery.utils.timeutils import humanize_seconds
  19. class SchedulingError(Exception):
  20. """An error occured while scheduling a task."""
  21. class ScheduleEntry(object):
  22. """An entry in the scheduler.
  23. :param name: see :attr:`name`.
  24. :param schedule: see :attr:`schedule`.
  25. :param args: see :attr:`args`.
  26. :param kwargs: see :attr:`kwargs`.
  27. :keyword last_run_at: see :attr:`last_run_at`.
  28. :keyword total_run_count: see :attr:`total_run_count`.
  29. .. attribute:: name
  30. The task name.
  31. .. attribute:: schedule
  32. The schedule (run_every/crontab)
  33. .. attribute:: args
  34. Args to apply.
  35. .. attribute:: kwargs
  36. Keyword arguments to apply.
  37. .. attribute:: last_run_at
  38. The time and date of when this task was last run.
  39. .. attribute:: total_run_count
  40. Total number of times this periodic task has been executed.
  41. """
  42. def __init__(self, name=None, task=None, last_run_at=None,
  43. total_run_count=None, schedule=None, args=(), kwargs={},
  44. options={}, relative=False):
  45. self.name = name
  46. self.task = task
  47. self.schedule = maybe_schedule(schedule, relative)
  48. self.args = args
  49. self.kwargs = kwargs
  50. self.options = options
  51. self.last_run_at = last_run_at or datetime.now()
  52. self.total_run_count = total_run_count or 0
  53. def next(self, last_run_at=None):
  54. """Returns a new instance of the same class, but with
  55. its date and count fields updated."""
  56. last_run_at = last_run_at or datetime.now()
  57. total_run_count = self.total_run_count + 1
  58. return self.__class__(**dict(self,
  59. last_run_at=last_run_at,
  60. total_run_count=total_run_count))
  61. def update(self, other):
  62. """Update values from another entry.
  63. Does only update "editable" fields (schedule, args,
  64. kwargs, options).
  65. """
  66. self.task = other.task
  67. self.schedule = other.schedule
  68. self.args = other.args
  69. self.kwargs = other.kwargs
  70. self.options = other.options
  71. def is_due(self):
  72. """See :meth:`celery.task.base.PeriodicTask.is_due`."""
  73. return self.schedule.is_due(self.last_run_at)
  74. def __iter__(self):
  75. return vars(self).iteritems()
  76. def __repr__(self):
  77. return "<Entry: %s %s(*%s, **%s) {%s}>" % (self.name,
  78. self.task,
  79. self.args,
  80. self.kwargs,
  81. self.schedule)
  82. class Scheduler(UserDict):
  83. """Scheduler for periodic tasks.
  84. :keyword schedule: see :attr:`schedule`.
  85. :keyword logger: see :attr:`logger`.
  86. :keyword max_interval: see :attr:`max_interval`.
  87. .. attribute:: schedule
  88. The schedule dict/shelve.
  89. .. attribute:: logger
  90. The logger to use.
  91. .. attribute:: max_interval
  92. Maximum time to sleep between re-checking the schedule.
  93. """
  94. Entry = ScheduleEntry
  95. _connection = None
  96. _publisher = None
  97. def __init__(self, schedule=None, logger=None, max_interval=None,
  98. app=None, Publisher=None, lazy=False, **kwargs):
  99. UserDict.__init__(self)
  100. if schedule is None:
  101. schedule = {}
  102. self.app = app_or_default(app)
  103. conf = self.app.conf
  104. self.data = schedule
  105. self.logger = logger or self.app.log.get_default_logger(
  106. name="celery.beat")
  107. self.max_interval = max_interval or conf.CELERYBEAT_MAX_LOOP_INTERVAL
  108. self.Publisher = Publisher or self.app.amqp.TaskPublisher
  109. if not lazy:
  110. self.setup_schedule()
  111. def maybe_due(self, entry, publisher=None):
  112. is_due, next_time_to_run = entry.is_due()
  113. if is_due:
  114. self.logger.debug("Scheduler: Sending due task %s" % entry.task)
  115. try:
  116. result = self.apply_async(entry, publisher=publisher)
  117. except Exception, exc:
  118. self.logger.error("Message Error: %s\n%s" % (exc,
  119. traceback.format_stack()), exc_info=sys.exc_info())
  120. else:
  121. self.logger.debug("%s sent. id->%s" % (entry.task,
  122. result.task_id))
  123. return next_time_to_run
  124. @property
  125. def connection(self):
  126. if self._connection is None:
  127. self._connection = self.app.broker_connection()
  128. return self._connection
  129. @property
  130. def publisher(self):
  131. if self._publisher is None:
  132. self._publisher = self.Publisher(connection=self.connection)
  133. return self._publisher
  134. def tick(self):
  135. """Run a tick, that is one iteration of the scheduler.
  136. Executes all due tasks.
  137. """
  138. remaining_times = []
  139. try:
  140. for entry in self.schedule.itervalues():
  141. next_time_to_run = self.maybe_due(entry, self.publisher)
  142. if next_time_to_run:
  143. remaining_times.append(next_time_to_run)
  144. except RuntimeError:
  145. pass
  146. return min(remaining_times + [self.max_interval])
  147. def reserve(self, entry):
  148. new_entry = self.schedule[entry.name] = entry.next()
  149. return new_entry
  150. def apply_async(self, entry, publisher=None, **kwargs):
  151. # Update timestamps and run counts before we actually execute,
  152. # so we have that done if an exception is raised (doesn't schedule
  153. # forever.)
  154. entry = self.reserve(entry)
  155. try:
  156. task = registry.tasks[entry.task]
  157. except KeyError:
  158. task = None
  159. try:
  160. if task:
  161. result = task.apply_async(entry.args, entry.kwargs,
  162. publisher=publisher,
  163. **entry.options)
  164. else:
  165. result = self.send_task(entry.task, entry.args, entry.kwargs,
  166. publisher=publisher,
  167. **entry.options)
  168. except Exception, exc:
  169. raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
  170. entry.name, exc))
  171. return result
  172. def send_task(self, *args, **kwargs): # pragma: no cover
  173. return self.app.send_task(*args, **kwargs)
  174. def setup_schedule(self):
  175. pass
  176. def sync(self):
  177. pass
  178. def close(self):
  179. self.sync()
  180. def add(self, **kwargs):
  181. entry = self.Entry(**kwargs)
  182. self.schedule[entry.name] = entry
  183. return entry
  184. def update_from_dict(self, dict_):
  185. self.update(dict((name, self.Entry(name, **entry))
  186. for name, entry in dict_.items()))
  187. def merge_inplace(self, b):
  188. A, B = set(self.keys()), set(b.keys())
  189. # Remove items from disk not in the schedule anymore.
  190. for key in A ^ B:
  191. self.pop(key, None)
  192. # Update and add new items in the schedule
  193. for key in B:
  194. entry = self.Entry(**dict(b[key]))
  195. if self.get(key):
  196. self[key].update(entry)
  197. else:
  198. self[key] = entry
  199. def get_schedule(self):
  200. return self.data
  201. @property
  202. def schedule(self):
  203. return self.get_schedule()
  204. @property
  205. def info(self):
  206. return ""
  207. class PersistentScheduler(Scheduler):
  208. persistence = shelve
  209. _store = None
  210. def __init__(self, *args, **kwargs):
  211. self.schedule_filename = kwargs.get("schedule_filename")
  212. Scheduler.__init__(self, *args, **kwargs)
  213. def setup_schedule(self):
  214. self._store = self.persistence.open(self.schedule_filename)
  215. self.data = self._store
  216. self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
  217. self.sync()
  218. self.data = self._store
  219. def sync(self):
  220. if self._store is not None:
  221. self.logger.debug("CeleryBeat: Syncing schedule to disk...")
  222. self._store.sync()
  223. def close(self):
  224. self.sync()
  225. self._store.close()
  226. @property
  227. def info(self):
  228. return " . db -> %s" % (self.schedule_filename, )
  229. class Service(object):
  230. scheduler_cls = PersistentScheduler
  231. def __init__(self, logger=None,
  232. max_interval=None, schedule=None, schedule_filename=None,
  233. scheduler_cls=None, app=None):
  234. self.app = app_or_default(app)
  235. self.max_interval = max_interval or \
  236. self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
  237. self.scheduler_cls = scheduler_cls or self.scheduler_cls
  238. self.logger = logger or self.app.log.get_default_logger(
  239. name="celery.beat")
  240. self.schedule = schedule or self.app.conf.CELERYBEAT_SCHEDULE
  241. self.schedule_filename = schedule_filename or \
  242. self.app.conf.CELERYBEAT_SCHEDULE_FILENAME
  243. self._scheduler = None
  244. self._shutdown = threading.Event()
  245. self._stopped = threading.Event()
  246. silence = self.max_interval < 60 and 10 or 1
  247. self.debug = SilenceRepeated(self.logger.debug,
  248. max_iterations=silence)
  249. def start(self, embedded_process=False):
  250. self.logger.info("Celerybeat: Starting...")
  251. self.logger.debug("Celerybeat: "
  252. "Ticking with max interval->%s" % (
  253. humanize_seconds(self.scheduler.max_interval)))
  254. if embedded_process:
  255. platforms.set_process_title("celerybeat")
  256. try:
  257. try:
  258. while not self._shutdown.isSet():
  259. interval = self.scheduler.tick()
  260. self.debug("Celerybeat: Waking up %s." % (
  261. humanize_seconds(interval, prefix="in ")))
  262. time.sleep(interval)
  263. except (KeyboardInterrupt, SystemExit):
  264. self._shutdown.set()
  265. finally:
  266. self.sync()
  267. def sync(self):
  268. self.scheduler.close()
  269. self._stopped.set()
  270. def stop(self, wait=False):
  271. self.logger.info("Celerybeat: Shutting down...")
  272. self._shutdown.set()
  273. wait and self._stopped.wait() # block until shutdown done.
  274. def get_scheduler(self, lazy=False):
  275. filename = self.schedule_filename
  276. scheduler = instantiate(self.scheduler_cls,
  277. app=self.app,
  278. schedule_filename=filename,
  279. logger=self.logger,
  280. max_interval=self.max_interval,
  281. lazy=lazy)
  282. if not lazy:
  283. scheduler.update_from_dict(self.schedule)
  284. return scheduler
  285. @property
  286. def scheduler(self):
  287. if self._scheduler is None:
  288. self._scheduler = self.get_scheduler()
  289. return self._scheduler
  290. class _Threaded(threading.Thread):
  291. """Embedded task scheduler using threading."""
  292. def __init__(self, *args, **kwargs):
  293. super(_Threaded, self).__init__()
  294. self.service = Service(*args, **kwargs)
  295. self.setDaemon(True)
  296. self.setName("Beat")
  297. def run(self):
  298. self.service.start()
  299. def stop(self):
  300. self.service.stop(wait=True)
  301. class _Process(multiprocessing.Process):
  302. """Embedded task scheduler using multiprocessing."""
  303. def __init__(self, *args, **kwargs):
  304. super(_Process, self).__init__()
  305. self.service = Service(*args, **kwargs)
  306. self.name = "Beat"
  307. def run(self):
  308. platforms.reset_signal("SIGTERM")
  309. self.service.start(embedded_process=True)
  310. def stop(self):
  311. self.service.stop()
  312. self.terminate()
  313. def EmbeddedService(*args, **kwargs):
  314. """Return embedded clock service.
  315. :keyword thread: Run threaded instead of as a separate process.
  316. Default is :const:`False`.
  317. """
  318. if kwargs.pop("thread", False):
  319. # Need short max interval to be able to stop thread
  320. # in reasonable time.
  321. kwargs.setdefault("max_interval", 1)
  322. return _Threaded(*args, **kwargs)
  323. return _Process(*args, **kwargs)