base.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861
  1. import sys
  2. import warnings
  3. from datetime import datetime, timedelta
  4. from billiard.serialization import pickle
  5. from celery import conf
  6. from celery.log import setup_task_logger
  7. from celery.utils import gen_unique_id, padlist, timedelta_seconds
  8. from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
  9. from celery.execute import apply_async, apply
  10. from celery.registry import tasks
  11. from celery.backends import default_backend
  12. from celery.messaging import TaskPublisher, TaskConsumer
  13. from celery.messaging import establish_connection as _establish_connection
  14. from celery.exceptions import MaxRetriesExceededError, RetryTaskError
  15. def get_current_time():
  16. return datetime.now()
  17. class TaskType(type):
  18. """Metaclass for tasks.
  19. Automatically registers the task in the task registry, except
  20. if the ``abstract`` attribute is set.
  21. If no ``name`` attribute is provided, the name is automatically
  22. set to the name of the module it was defined in, and the class name.
  23. """
  24. def __new__(cls, name, bases, attrs):
  25. super_new = super(TaskType, cls).__new__
  26. task_module = attrs["__module__"]
  27. # Abstract class, remove the abstract attribute so
  28. # any class inheriting from this won't be abstract by default.
  29. if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
  30. return super_new(cls, name, bases, attrs)
  31. # Automatically generate missing name.
  32. if not attrs.get("name"):
  33. task_module = sys.modules[task_module]
  34. task_name = ".".join([task_module.__name__, name])
  35. attrs["name"] = task_name
  36. # Because of the way import happens (recursively)
  37. # we may or may not be the first time the task tries to register
  38. # with the framework. There should only be one class for each task
  39. # name, so we always return the registered version.
  40. task_name = attrs["name"]
  41. if task_name not in tasks:
  42. task_cls = super_new(cls, name, bases, attrs)
  43. tasks.register(task_cls)
  44. return tasks[task_name].__class__
  45. class Task(object):
  46. """A celery task.
  47. All subclasses of :class:`Task` must define the :meth:`run` method,
  48. which is the actual method the ``celery`` daemon executes.
  49. The :meth:`run` method can take use of the default keyword arguments,
  50. as listed in the :meth:`run` documentation.
  51. The resulting class is callable, which if called will apply the
  52. :meth:`run` method.
  53. .. attribute:: name
  54. Name of the task.
  55. .. attribute:: abstract
  56. If ``True`` the task is an abstract base class.
  57. .. attribute:: type
  58. The type of task, currently this can be ``regular``, or ``periodic``,
  59. however if you want a periodic task, you should subclass
  60. :class:`PeriodicTask` instead.
  61. .. attribute:: routing_key
  62. Override the global default ``routing_key`` for this task.
  63. .. attribute:: exchange
  64. Override the global default ``exchange`` for this task.
  65. .. attribute:: exchange_type
  66. Override the global default exchange type for this task.
  67. .. attribute:: delivery_mode
  68. Override the global default delivery mode for this task.
  69. By default this is set to ``2`` (persistent). You can change this
  70. to ``1`` to get non-persistent behavior, which means the messages
  71. are lost if the broker is restarted.
  72. .. attribute:: mandatory
  73. Mandatory message routing. An exception will be raised if the task
  74. can't be routed to a queue.
  75. .. attribute:: immediate:
  76. Request immediate delivery. An exception will be raised if the task
  77. can't be routed to a worker immediately.
  78. .. attribute:: priority:
  79. The message priority. A number from ``0`` to ``9``, where ``0`` is the
  80. highest. Note that RabbitMQ doesn't support priorities yet.
  81. .. attribute:: max_retries
  82. Maximum number of retries before giving up.
  83. If set to ``None``, it will never stop retrying.
  84. .. attribute:: default_retry_delay
  85. Default time in seconds before a retry of the task should be
  86. executed. Default is a 1 minute delay.
  87. .. attribute:: rate_limit
  88. Set the rate limit for this task type, Examples: ``None`` (no rate
  89. limit), ``"100/s"`` (hundred tasks a second), ``"100/m"`` (hundred
  90. tasks a minute), ``"100/h"`` (hundred tasks an hour)
  91. .. attribute:: ignore_result
  92. Don't store the return value of this task.
  93. .. attribute:: disable_error_emails
  94. Disable all error e-mails for this task (only applicable if
  95. ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
  96. .. attribute:: serializer
  97. The name of a serializer that has been registered with
  98. :mod:`carrot.serialization.registry`. Example: ``"json"``.
  99. .. attribute:: backend
  100. The result store backend used for this task.
  101. .. attribute:: autoregister
  102. If ``True`` the task is automatically registered in the task
  103. registry, which is the default behaviour.
  104. .. attribute:: track_started
  105. If ``True`` the task will report its status as "started"
  106. when the task is executed by a worker.
  107. The default value is ``False`` as the normal behaviour is to not
  108. report that level of granularity. Tasks are either pending, finished,
  109. or waiting to be retried. Having a "started" status can be useful for
  110. when there are long running tasks and there is a need to report which
  111. task is currently running.
  112. The global default can be overridden by the ``CELERY_TRACK_STARTED``
  113. setting.
  114. .. attribute:: acks_late
  115. If set to ``True`` messages for this task will be acknowledged
  116. **after** the task has been executed, not *just before*, which is
  117. the default behavior.
  118. Note that this means the task may be executed twice if the worker
  119. crashes in the middle of execution, which may be acceptable for some
  120. applications.
  121. The global default can be overriden by the ``CELERY_ACKS_LATE``
  122. setting.
  123. """
  124. __metaclass__ = TaskType
  125. name = None
  126. abstract = True
  127. autoregister = True
  128. type = "regular"
  129. exchange = None
  130. routing_key = None
  131. immediate = False
  132. mandatory = False
  133. priority = None
  134. ignore_result = conf.IGNORE_RESULT
  135. disable_error_emails = False
  136. max_retries = 3
  137. default_retry_delay = 3 * 60
  138. serializer = conf.TASK_SERIALIZER
  139. rate_limit = conf.DEFAULT_RATE_LIMIT
  140. backend = default_backend
  141. exchange_type = conf.DEFAULT_EXCHANGE_TYPE
  142. delivery_mode = conf.DEFAULT_DELIVERY_MODE
  143. track_started = conf.TRACK_STARTED
  144. acks_late = conf.ACKS_LATE
  145. MaxRetriesExceededError = MaxRetriesExceededError
  146. def __call__(self, *args, **kwargs):
  147. return self.run(*args, **kwargs)
  148. def run(self, *args, **kwargs):
  149. """The body of the task executed by the worker.
  150. The following standard keyword arguments are reserved and is passed
  151. by the worker if the function/method supports them:
  152. * task_id
  153. * task_name
  154. * task_retries
  155. * task_is_eager
  156. * logfile
  157. * loglevel
  158. * delivery_info
  159. Additional standard keyword arguments may be added in the future.
  160. To take these default arguments, the task can either list the ones
  161. it wants explicitly or just take an arbitrary list of keyword
  162. arguments (\*\*kwargs).
  163. """
  164. raise NotImplementedError("Tasks must define the run method.")
  165. @classmethod
  166. def get_logger(self, loglevel=None, logfile=None, **kwargs):
  167. """Get task-aware logger object.
  168. See :func:`celery.log.setup_task_logger`.
  169. """
  170. return setup_task_logger(loglevel=loglevel, logfile=logfile,
  171. task_kwargs=kwargs)
  172. @classmethod
  173. def establish_connection(self,
  174. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  175. """Establish a connection to the message broker."""
  176. return _establish_connection(connect_timeout)
  177. @classmethod
  178. def get_publisher(self, connection=None, exchange=None,
  179. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT,
  180. exchange_type=None):
  181. """Get a celery task message publisher.
  182. :rtype: :class:`celery.messaging.TaskPublisher`.
  183. Please be sure to close the AMQP connection when you're done
  184. with this object, i.e.:
  185. >>> publisher = self.get_publisher()
  186. >>> # do something with publisher
  187. >>> publisher.connection.close()
  188. """
  189. if exchange is None:
  190. exchange = self.exchange
  191. if exchange_type is None:
  192. exchange_type = self.exchange_type
  193. connection = connection or self.establish_connection(connect_timeout)
  194. return TaskPublisher(connection=connection,
  195. exchange=exchange,
  196. exchange_type=exchange_type,
  197. routing_key=self.routing_key)
  198. @classmethod
  199. def get_consumer(self, connection=None,
  200. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  201. """Get a celery task message consumer.
  202. :rtype: :class:`celery.messaging.TaskConsumer`.
  203. Please be sure to close the AMQP connection when you're done
  204. with this object. i.e.:
  205. >>> consumer = self.get_consumer()
  206. >>> # do something with consumer
  207. >>> consumer.connection.close()
  208. """
  209. connection = connection or self.establish_connection(connect_timeout)
  210. return TaskConsumer(connection=connection, exchange=self.exchange,
  211. routing_key=self.routing_key)
  212. @classmethod
  213. def delay(self, *args, **kwargs):
  214. """Shortcut to :meth:`apply_async`, with star arguments,
  215. but doesn't support the extra options.
  216. :param \*args: positional arguments passed on to the task.
  217. :param \*\*kwargs: keyword arguments passed on to the task.
  218. :returns: :class:`celery.result.AsyncResult`
  219. """
  220. return self.apply_async(args, kwargs)
  221. @classmethod
  222. def apply_async(self, args=None, kwargs=None, **options):
  223. """Delay this task for execution by the ``celery`` daemon(s).
  224. :param args: positional arguments passed on to the task.
  225. :param kwargs: keyword arguments passed on to the task.
  226. :keyword \*\*options: Any keyword arguments to pass on to
  227. :func:`celery.execute.apply_async`.
  228. See :func:`celery.execute.apply_async` for more information.
  229. :rtype: :class:`celery.result.AsyncResult`
  230. """
  231. return apply_async(self, args, kwargs, **options)
  232. @classmethod
  233. def retry(self, args=None, kwargs=None, exc=None, throw=True, **options):
  234. """Retry the task.
  235. :param args: Positional arguments to retry with.
  236. :param kwargs: Keyword arguments to retry with.
  237. :keyword exc: Optional exception to raise instead of
  238. :exc:`MaxRestartsExceededError` when the max restart limit has
  239. been exceeded.
  240. :keyword countdown: Time in seconds to delay the retry for.
  241. :keyword eta: Explicit time and date to run the retry at (must be a
  242. :class:`datetime.datetime` instance).
  243. :keyword \*\*options: Any extra options to pass on to
  244. meth:`apply_async`. See :func:`celery.execute.apply_async`.
  245. :keyword throw: If this is ``False``, do not raise the
  246. :exc:`celery.exceptions.RetryTaskError` exception,
  247. that tells the worker to mark the task as being retried.
  248. Note that this means the task will be marked as failed
  249. if the task raises an exception, or successful if it
  250. returns.
  251. :raises celery.exceptions.RetryTaskError: To tell the worker that the
  252. task has been re-sent for retry. This always happens, unless
  253. the ``throw`` keyword argument has been explicitly set
  254. to ``False``, and is considered normal operation.
  255. Example
  256. >>> class TwitterPostStatusTask(Task):
  257. ...
  258. ... def run(self, username, password, message, **kwargs):
  259. ... twitter = Twitter(username, password)
  260. ... try:
  261. ... twitter.post_status(message)
  262. ... except twitter.FailWhale, exc:
  263. ... # Retry in 5 minutes.
  264. ... self.retry([username, password, message], kwargs,
  265. ... countdown=60 * 5, exc=exc)
  266. """
  267. delivery_info = kwargs.pop("delivery_info", {})
  268. options.setdefault("exchange", delivery_info.get("exchange"))
  269. options.setdefault("routing_key", delivery_info.get("routing_key"))
  270. options["retries"] = kwargs.pop("task_retries", 0) + 1
  271. options["task_id"] = kwargs.pop("task_id", None)
  272. options["countdown"] = options.get("countdown",
  273. self.default_retry_delay)
  274. max_exc = exc or self.MaxRetriesExceededError(
  275. "Can't retry %s[%s] args:%s kwargs:%s" % (
  276. self.name, options["task_id"], args, kwargs))
  277. max_retries = self.max_retries
  278. if max_retries is not None and options["retries"] > max_retries:
  279. raise max_exc
  280. # If task was executed eagerly using apply(),
  281. # then the retry must also be executed eagerly.
  282. if kwargs.get("task_is_eager", False):
  283. result = self.apply(args=args, kwargs=kwargs, **options)
  284. if isinstance(result, EagerResult):
  285. return result.get() # propogates exceptions.
  286. return result
  287. self.apply_async(args=args, kwargs=kwargs, **options)
  288. if throw:
  289. message = "Retry in %d seconds." % options["countdown"]
  290. raise RetryTaskError(message, exc)
  291. @classmethod
  292. def apply(self, args=None, kwargs=None, **options):
  293. """Execute this task at once, by blocking until the task
  294. has finished executing.
  295. :param args: positional arguments passed on to the task.
  296. :param kwargs: keyword arguments passed on to the task.
  297. :rtype: :class:`celery.result.EagerResult`
  298. See :func:`celery.execute.apply`.
  299. """
  300. return apply(self, args, kwargs, **options)
  301. @classmethod
  302. def AsyncResult(self, task_id):
  303. """Get AsyncResult instance for this kind of task.
  304. :param task_id: Task id to get result for.
  305. """
  306. return BaseAsyncResult(task_id, backend=self.backend)
  307. def on_retry(self, exc, task_id, args, kwargs):
  308. """Retry handler.
  309. This is run by the worker when the task is to be retried.
  310. :param exc: The exception sent to :meth:`retry`.
  311. :param task_id: Unique id of the retried task.
  312. :param args: Original arguments for the retried task.
  313. :param kwargs: Original keyword arguments for the retried task.
  314. The return value of this handler is ignored.
  315. """
  316. pass
  317. def on_failure(self, exc, task_id, args, kwargs):
  318. """Error handler.
  319. This is run by the worker when the task fails.
  320. :param exc: The exception raised by the task.
  321. :param task_id: Unique id of the failed task.
  322. :param args: Original arguments for the task that failed.
  323. :param kwargs: Original keyword arguments for the task that failed.
  324. The return value of this handler is ignored.
  325. """
  326. pass
  327. def on_success(self, retval, task_id, args, kwargs):
  328. """Success handler.
  329. Run by the worker if the task executes successfully.
  330. :param retval: The return value of the task.
  331. :param task_id: Unique id of the executed task.
  332. :param args: Original arguments for the executed task.
  333. :param kwargs: Original keyword arguments for the executed task.
  334. The return value of this handler is ignored.
  335. """
  336. pass
  337. def execute(self, wrapper, pool, loglevel, logfile):
  338. """The method the worker calls to execute the task.
  339. :param wrapper: A :class:`celery.worker.job.TaskWrapper`.
  340. :param pool: A :class:`celery.worker.pool.TaskPool` object.
  341. :param loglevel: Current loglevel.
  342. :param logfile: Name of the currently used logfile.
  343. """
  344. wrapper.execute_using_pool(pool, loglevel, logfile)
  345. def __repr__(self):
  346. """repr(task)"""
  347. try:
  348. kind = self.__class__.mro()[1].__name__
  349. except (AttributeError, IndexError):
  350. kind = "%s(Task)" % self.__class__.__name__
  351. return "<%s: %s (%s)>" % (kind, self.name, self.type)
  352. class ExecuteRemoteTask(Task):
  353. """Execute an arbitrary function or object.
  354. *Note* You probably want :func:`execute_remote` instead, which this
  355. is an internal component of.
  356. The object must be pickleable, so you can't use lambdas or functions
  357. defined in the REPL (that is the python shell, or ``ipython``).
  358. """
  359. name = "celery.execute_remote"
  360. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  361. """
  362. :param ser_callable: A pickled function or callable object.
  363. :param fargs: Positional arguments to apply to the function.
  364. :param fkwargs: Keyword arguments to apply to the function.
  365. """
  366. return pickle.loads(ser_callable)(*fargs, **fkwargs)
  367. class AsynchronousMapTask(Task):
  368. """Task used internally by :func:`dmap_async` and
  369. :meth:`TaskSet.map_async`. """
  370. name = "celery.map_async"
  371. def run(self, ser_callable, args, timeout=None, **kwargs):
  372. """:see :meth:`TaskSet.dmap_async`."""
  373. return TaskSet.map(pickle.loads(ser_callable), args, timeout=timeout)
  374. class TaskSet(object):
  375. """A task containing several subtasks, making it possible
  376. to track how many, or when all of the tasks has been completed.
  377. :param task: The task class or name.
  378. Can either be a fully qualified task name, or a task class.
  379. :param args: A list of args, kwargs pairs.
  380. e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
  381. .. attribute:: task_name
  382. The name of the task.
  383. .. attribute:: arguments
  384. The arguments, as passed to the task set constructor.
  385. .. attribute:: total
  386. Total number of tasks in this task set.
  387. Example
  388. >>> from djangofeeds.tasks import RefreshFeedTask
  389. >>> taskset = TaskSet(RefreshFeedTask, args=[
  390. ... ([], {"feed_url": "http://cnn.com/rss"}),
  391. ... ([], {"feed_url": "http://bbc.com/rss"}),
  392. ... ([], {"feed_url": "http://xkcd.com/rss"})
  393. ... ])
  394. >>> taskset_result = taskset.apply_async()
  395. >>> list_of_return_values = taskset_result.join()
  396. """
  397. def __init__(self, task, args):
  398. try:
  399. task_name = task.name
  400. task_obj = task
  401. except AttributeError:
  402. task_name = task
  403. task_obj = tasks[task_name]
  404. # Get task instance
  405. task_obj = tasks[task_obj.name]
  406. self.task = task_obj
  407. self.task_name = task_name
  408. self.arguments = args
  409. self.total = len(args)
  410. def run(self, *args, **kwargs):
  411. """Deprecated alias to :meth:`apply_async`"""
  412. warnings.warn(DeprecationWarning(
  413. "TaskSet.run will be deprecated in favor of TaskSet.apply_async "
  414. "in celery v1.2.0"))
  415. return self.apply_async(*args, **kwargs)
  416. def apply_async(self, connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  417. """Run all tasks in the taskset.
  418. :returns: A :class:`celery.result.TaskSetResult` instance.
  419. Example
  420. >>> ts = TaskSet(RefreshFeedTask, args=[
  421. ... (["http://foo.com/rss"], {}),
  422. ... (["http://bar.com/rss"], {}),
  423. ... ])
  424. >>> result = ts.apply_async()
  425. >>> result.taskset_id
  426. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  427. >>> result.subtask_ids
  428. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  429. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  430. >>> result.waiting()
  431. True
  432. >>> time.sleep(10)
  433. >>> result.ready()
  434. True
  435. >>> result.successful()
  436. True
  437. >>> result.failed()
  438. False
  439. >>> result.join()
  440. [True, True]
  441. """
  442. if conf.ALWAYS_EAGER:
  443. return self.apply()
  444. taskset_id = gen_unique_id()
  445. conn = self.task.establish_connection(connect_timeout=connect_timeout)
  446. publisher = self.task.get_publisher(connection=conn)
  447. try:
  448. subtasks = [self.apply_part(arglist, taskset_id, publisher)
  449. for arglist in self.arguments]
  450. finally:
  451. publisher.close()
  452. conn.close()
  453. return TaskSetResult(taskset_id, subtasks)
  454. def apply_part(self, arglist, taskset_id, publisher):
  455. """Apply a single part of the taskset."""
  456. args, kwargs, opts = padlist(arglist, 3, default={})
  457. return apply_async(self.task, args, kwargs,
  458. taskset_id=taskset_id, publisher=publisher, **opts)
  459. def apply(self):
  460. """Applies the taskset locally."""
  461. taskset_id = gen_unique_id()
  462. subtasks = [apply(self.task, args, kwargs)
  463. for args, kwargs in self.arguments]
  464. # This will be filled with EagerResults.
  465. return TaskSetResult(taskset_id, subtasks)
  466. @classmethod
  467. def remote_execute(cls, func, args):
  468. """Apply ``args`` to function by distributing the args to the
  469. celery server(s)."""
  470. pickled = pickle.dumps(func)
  471. arguments = [((pickled, arg, {}), {}) for arg in args]
  472. return cls(ExecuteRemoteTask, arguments)
  473. @classmethod
  474. def map(cls, func, args, timeout=None):
  475. """Distribute processing of the arguments and collect the results."""
  476. remote_task = cls.remote_execute(func, args)
  477. return remote_task.apply_async().join(timeout=timeout)
  478. @classmethod
  479. def map_async(cls, func, args, timeout=None):
  480. """Distribute processing of the arguments and collect the results
  481. asynchronously.
  482. :returns: :class:`celery.result.AsyncResult` instance.
  483. """
  484. serfunc = pickle.dumps(func)
  485. return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
  486. class PeriodicTask(Task):
  487. """A periodic task is a task that behaves like a :manpage:`cron` job.
  488. Results of periodic tasks are not stored by default.
  489. .. attribute:: run_every
  490. *REQUIRED* Defines how often the task is run (its interval),
  491. it can be either a :class:`datetime.timedelta` object or an
  492. integer specifying the time in seconds.
  493. .. attribute:: relative
  494. If set to ``True``, run times are relative to the time when the
  495. server was started. This was the previous behaviour, periodic tasks
  496. are now scheduled by the clock.
  497. :raises NotImplementedError: if the :attr:`run_every` attribute is
  498. not defined.
  499. Example
  500. >>> from celery.task import tasks, PeriodicTask
  501. >>> from datetime import timedelta
  502. >>> class MyPeriodicTask(PeriodicTask):
  503. ... run_every = timedelta(seconds=30)
  504. ...
  505. ... def run(self, **kwargs):
  506. ... logger = self.get_logger(**kwargs)
  507. ... logger.info("Running MyPeriodicTask")
  508. """
  509. abstract = True
  510. ignore_result = True
  511. type = "periodic"
  512. relative = False
  513. def __init__(self):
  514. if not hasattr(self, "run_every"):
  515. raise NotImplementedError(
  516. "Periodic tasks must have a run_every attribute")
  517. # If run_every is a integer, convert it to timedelta seconds.
  518. # Operate on the original class attribute so anyone accessing
  519. # it directly gets the right value.
  520. if isinstance(self.__class__.run_every, int):
  521. self.__class__.run_every = timedelta(seconds=self.run_every)
  522. super(PeriodicTask, self).__init__()
  523. def remaining_estimate(self, last_run_at):
  524. """Returns when the periodic task should run next as a timedelta."""
  525. next_run_at = last_run_at + self.run_every
  526. if not self.relative:
  527. next_run_at = self.delta_resolution(next_run_at, self.run_every)
  528. return next_run_at - datetime.now()
  529. def timedelta_seconds(self, delta):
  530. """Convert :class:`datetime.timedelta` to seconds.
  531. Doesn't account for negative timedeltas.
  532. """
  533. return timedelta_seconds(delta)
  534. def is_due(self, last_run_at):
  535. """Returns tuple of two items ``(is_due, next_time_to_run)``,
  536. where next time to run is in seconds.
  537. e.g.
  538. * ``(True, 20)``, means the task should be run now, and the next
  539. time to run is in 20 seconds.
  540. * ``(False, 12)``, means the task should be run in 12 seconds.
  541. You can override this to decide the interval at runtime,
  542. but keep in mind the value of ``CELERYBEAT_MAX_LOOP_INTERVAL``, which
  543. decides the maximum number of seconds celerybeat can sleep between
  544. re-checking the periodic task intervals. So if you dynamically change
  545. the next run at value, and the max interval is set to 5 minutes, it
  546. will take 5 minutes for the change to take effect, so you may
  547. consider lowering the value of ``CELERYBEAT_MAX_LOOP_INTERVAL`` if
  548. responsiveness if of importance to you.
  549. """
  550. rem_delta = self.remaining_estimate(last_run_at)
  551. rem = self.timedelta_seconds(rem_delta)
  552. if rem == 0:
  553. return True, self.timedelta_seconds(self.run_every)
  554. return False, rem
  555. def delta_resolution(self, dt, delta):
  556. """Round a datetime to the resolution of a timedelta.
  557. If the timedelta is in days, the datetime will be rounded
  558. to the nearest days, if the timedelta is in hours the datetime
  559. will be rounded to the nearest hour, and so on until seconds
  560. which will just return the original datetime.
  561. >>> now = datetime.now()
  562. >>> now
  563. datetime.datetime(2010, 3, 30, 11, 50, 58, 41065)
  564. >>> delta_resolution(now, timedelta(days=2))
  565. datetime.datetime(2010, 3, 30, 0, 0)
  566. >>> delta_resolution(now, timedelta(hours=2))
  567. datetime.datetime(2010, 3, 30, 11, 0)
  568. >>> delta_resolution(now, timedelta(minutes=2))
  569. datetime.datetime(2010, 3, 30, 11, 50)
  570. >>> delta_resolution(now, timedelta(seconds=2))
  571. datetime.datetime(2010, 3, 30, 11, 50, 58, 41065)
  572. """
  573. delta = self.timedelta_seconds(delta)
  574. resolutions = ((3, lambda x: x / 86400),
  575. (4, lambda x: x / 3600),
  576. (5, lambda x: x / 60))
  577. args = dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second
  578. for res, predicate in resolutions:
  579. if predicate(delta) >= 1.0:
  580. return datetime(*args[:res])
  581. return dt
  582. class ScheduledTask(PeriodicTask):
  583. """A scheduled task is a task that adds more precise scheduling to the
  584. features of a :class:`celery.task.base.PeriodicTask`.
  585. Like a :manpage:`cron` job, you can specify units of time of when you would
  586. like the task to execute. While not a full implementation of cron, it
  587. should provide a fair degree of common scheduling needs. You can specify
  588. a minute, an hour, and/or a day of the week.
  589. .. attribute:: minute
  590. An integer from 0-59 that represents the minute of an hour of when
  591. execution should occur.
  592. .. attribute:: hour
  593. An integer from 0-23 that represents the hour of a day of when
  594. execution should occur.
  595. .. attribute:: day_of_week
  596. An integer from 0-6, where Sunday = 0 and Saturday = 6, that represents
  597. the day of week that execution should occur.
  598. Example
  599. >>> from celery.task import ScheduledTask
  600. >>> class EveryMondayMorningTask(ScheduledTask):
  601. ... hour = 7
  602. ... minute = 30
  603. ... day_of_week = 1
  604. ... def run(self, **kwargs):
  605. ... logger = self.get_logger(**kwargs)
  606. ... logger.info("Execute every Monday at 7:30AM.")
  607. >>> from celery.task import ScheduledTask
  608. >>> class EveryMorningTask(ScheduledTask):
  609. ... hour = 7
  610. ... minute = 30
  611. ... def run(self, **kwargs):
  612. ... logger = self.get_logger(**kwargs)
  613. ... logger.info("Execute every day at 7:30AM.")
  614. >>> from celery.task import ScheduledTask
  615. >>> class EveryQuarterPastTheHourTask(ScheduledTask):
  616. ... minute = 15
  617. ... def run(self, **kwargs):
  618. ... logger = self.get_logger(**kwargs)
  619. ... logger.info("Execute every 0:15 past the hour every day.")
  620. """
  621. run_every = timedelta(seconds=1)
  622. hour = None # (0 - 23)
  623. minute = None # (0 - 59)
  624. day_of_week = None # (0 - 6) (Sunday=0)
  625. abstract = True
  626. def check_hour_minute(self, now):
  627. (due, when) = (False, 1)
  628. if self.hour is None and self.minute is None:
  629. (due, when) = (True, 1)
  630. if self.hour is None and self.minute == now.minute:
  631. (due, when) = (True, 1)
  632. if self.hour == now.hour and self.minute is None:
  633. (due, when) = (True, 1)
  634. if self.hour == now.hour and self.minute == now.minute:
  635. (due, when) = (True, 1)
  636. return (due, when)
  637. def is_due(self, last_run_at):
  638. n = get_current_time()
  639. last = (n - last_run_at)
  640. (due, when) = (False, 1)
  641. if last.days > 0 or last.seconds > 60:
  642. if self.day_of_week in (None, n.isoweekday()):
  643. (due, when) = self.check_hour_minute(n)
  644. return (due, when)