base.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879
  1. import sys
  2. import threading
  3. import warnings
  4. from celery.app import app_or_default
  5. from celery.datastructures import ExceptionInfo
  6. from celery.exceptions import MaxRetriesExceededError, RetryTaskError
  7. from celery.execute.trace import TaskTrace
  8. from celery.registry import tasks
  9. from celery.result import EagerResult
  10. from celery.schedules import maybe_schedule
  11. from celery.utils import mattrgetter, gen_unique_id, fun_takes_kwargs
  12. from celery.utils.timeutils import timedelta_seconds
  13. from celery.task import sets
  14. TaskSet = sets.TaskSet
  15. subtask = sets.subtask
  16. PERIODIC_DEPRECATION_TEXT = """\
  17. Periodic task classes has been deprecated and will be removed
  18. in celery v3.0.
  19. Please use the CELERYBEAT_SCHEDULE setting instead:
  20. CELERYBEAT_SCHEDULE = {
  21. name: dict(task=task_name, schedule=run_every,
  22. args=(), kwargs={}, options={}, relative=False)
  23. }
  24. """
  25. extract_exec_options = mattrgetter("queue", "routing_key",
  26. "exchange", "immediate",
  27. "mandatory", "priority",
  28. "serializer", "delivery_mode")
  29. _default_context = {"logfile": None,
  30. "loglevel": None,
  31. "id": None,
  32. "args": None,
  33. "kwargs": None,
  34. "retries": 0,
  35. "is_eager": False,
  36. "delivery_info": None}
  37. def _unpickle_task(name):
  38. return tasks[name]
  39. class Context(threading.local):
  40. def update(self, d, **kwargs):
  41. self.__dict__.update(d, **kwargs)
  42. def clear(self):
  43. self.__dict__.clear()
  44. self.update(_default_context)
  45. def get(self, key, default=None):
  46. return self.__dict__.get(key, default)
  47. class TaskType(type):
  48. """Metaclass for tasks.
  49. Automatically registers the task in the task registry, except
  50. if the ``abstract`` attribute is set.
  51. If no ``name`` attribute is provided, the name is automatically
  52. set to the name of the module it was defined in, and the class name.
  53. """
  54. def __new__(cls, name, bases, attrs):
  55. super_new = super(TaskType, cls).__new__
  56. task_module = attrs["__module__"]
  57. # Abstract class, remove the abstract attribute so
  58. # any class inheriting from this won't be abstract by default.
  59. if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
  60. return super_new(cls, name, bases, attrs)
  61. # Automatically generate missing name.
  62. if not attrs.get("name"):
  63. task_name = ".".join([sys.modules[task_module].__name__, name])
  64. attrs["name"] = task_name
  65. # Because of the way import happens (recursively)
  66. # we may or may not be the first time the task tries to register
  67. # with the framework. There should only be one class for each task
  68. # name, so we always return the registered version.
  69. task_name = attrs["name"]
  70. if task_name not in tasks:
  71. task_cls = super_new(cls, name, bases, attrs)
  72. if task_module == "__main__" and task_cls.app.main:
  73. task_name = task_cls.name = ".".join([task_cls.app.main,
  74. name])
  75. tasks.register(task_cls)
  76. task = tasks[task_name].__class__
  77. return task
  78. class BaseTask(object):
  79. """A celery task.
  80. All subclasses of :class:`Task` must define the :meth:`run` method,
  81. which is the actual method the ``celery`` daemon executes.
  82. The :meth:`run` method can take use of the default keyword arguments,
  83. as listed in the :meth:`run` documentation.
  84. The resulting class is callable, which if called will apply the
  85. :meth:`run` method.
  86. .. attribute:: app
  87. The application instance associated with this task class.
  88. .. attribute:: name
  89. Name of the task.
  90. .. attribute:: abstract
  91. If :const:`True` the task is an abstract base class.
  92. .. attribute:: type
  93. The type of task, currently unused.
  94. .. attribute:: queue
  95. Select a destination queue for this task. The queue needs to exist
  96. in :setting:`CELERY_QUEUES`. The ``routing_key``, ``exchange`` and
  97. ``exchange_type`` attributes will be ignored if this is set.
  98. .. attribute:: routing_key
  99. Override the global default ``routing_key`` for this task.
  100. .. attribute:: exchange
  101. Override the global default ``exchange`` for this task.
  102. .. attribute:: exchange_type
  103. Override the global default exchange type for this task.
  104. .. attribute:: delivery_mode
  105. Override the global default delivery mode for this task.
  106. By default this is set to ``2`` (persistent). You can change this
  107. to ``1`` to get non-persistent behavior, which means the messages
  108. are lost if the broker is restarted.
  109. .. attribute:: mandatory
  110. Mandatory message routing. An exception will be raised if the task
  111. can't be routed to a queue.
  112. .. attribute:: immediate:
  113. Request immediate delivery. An exception will be raised if the task
  114. can't be routed to a worker immediately.
  115. .. attribute:: priority:
  116. The message priority. A number from ``0`` to ``9``, where ``0``
  117. is the highest. Note that RabbitMQ doesn't support priorities yet.
  118. .. attribute:: max_retries
  119. Maximum number of retries before giving up.
  120. If set to :const:`None`, it will never stop retrying.
  121. .. attribute:: default_retry_delay
  122. Default time in seconds before a retry of the task should be
  123. executed. Default is a 3 minute delay.
  124. .. attribute:: rate_limit
  125. Set the rate limit for this task type, Examples: :const:`None` (no
  126. rate limit), ``"100/s"`` (hundred tasks a second), ``"100/m"``
  127. (hundred tasks a minute), ``"100/h"`` (hundred tasks an hour)
  128. .. attribute:: ignore_result
  129. Don't store the return value of this task.
  130. .. attribute:: store_errors_even_if_ignored
  131. If true, errors will be stored even if the task is configured
  132. to ignore results.
  133. .. attribute:: send_error_emails
  134. If true, an e-mail will be sent to the admins whenever
  135. a task of this type raises an exception.
  136. .. attribute:: error_whitelist
  137. List of exception types to send error e-mails for.
  138. .. attribute:: serializer
  139. The name of a serializer that has been registered with
  140. :mod:`carrot.serialization.registry`. Example: ``"json"``.
  141. .. attribute:: backend
  142. The result store backend used for this task.
  143. .. attribute:: autoregister
  144. If :const:`True` the task is automatically registered in the task
  145. registry, which is the default behaviour.
  146. .. attribute:: track_started
  147. If :const:`True` the task will report its status as "started"
  148. when the task is executed by a worker.
  149. The default value is ``False`` as the normal behaviour is to not
  150. report that level of granularity. Tasks are either pending,
  151. finished, or waiting to be retried.
  152. Having a "started" status can be useful for when there are long
  153. running tasks and there is a need to report which task is
  154. currently running.
  155. The global default can be overridden with the
  156. :setting:`CELERY_TRACK_STARTED` setting.
  157. .. attribute:: acks_late
  158. If set to :const:`True` messages for this task will be acknowledged
  159. **after** the task has been executed, not *just before*, which is
  160. the default behavior.
  161. Note that this means the task may be executed twice if the worker
  162. crashes in the middle of execution, which may be acceptable for some
  163. applications.
  164. The global default can be overriden by the :setting:`CELERY_ACKS_LATE`
  165. setting.
  166. .. attribute:: expires
  167. Default task expiry time in seconds or a :class:`~datetime.datetime`.
  168. """
  169. __metaclass__ = TaskType
  170. app = None
  171. name = None
  172. abstract = True
  173. autoregister = True
  174. type = "regular"
  175. accept_magic_kwargs = True
  176. request = Context()
  177. queue = None
  178. routing_key = None
  179. exchange = None
  180. exchange_type = None
  181. delivery_mode = None
  182. immediate = False
  183. mandatory = False
  184. priority = None
  185. ignore_result = False
  186. store_errors_even_if_ignored = False
  187. send_error_emails = False
  188. error_whitelist = ()
  189. disable_error_emails = False # FIXME
  190. max_retries = 3
  191. default_retry_delay = 3 * 60
  192. serializer = "pickle"
  193. rate_limit = None
  194. backend = None
  195. track_started = False
  196. acks_late = False
  197. expires = None
  198. MaxRetriesExceededError = MaxRetriesExceededError
  199. def __call__(self, *args, **kwargs):
  200. return self.run(*args, **kwargs)
  201. def __reduce__(self):
  202. return (_unpickle_task, (self.name, ), None)
  203. def run(self, *args, **kwargs):
  204. """The body of the task executed by the worker.
  205. The following standard keyword arguments are reserved and is
  206. automatically passed by the worker if the function/method
  207. supports them:
  208. * task_id
  209. * task_name
  210. * task_retries
  211. * task_is_eager
  212. * logfile
  213. * loglevel
  214. * delivery_info
  215. Additional standard keyword arguments may be added in the future.
  216. To take these default arguments, the task can either list the ones
  217. it wants explicitly or just take an arbitrary list of keyword
  218. arguments (\*\*kwargs).
  219. """
  220. raise NotImplementedError("Tasks must define the run method.")
  221. @classmethod
  222. def get_logger(self, loglevel=None, logfile=None, **kwargs):
  223. """Get task-aware logger object.
  224. See :func:`celery.log.setup_task_logger`.
  225. """
  226. if loglevel is None:
  227. loglevel = self.request.loglevel
  228. if logfile is None:
  229. logfile = self.request.logfile
  230. return self.app.log.setup_task_logger(loglevel=loglevel,
  231. logfile=logfile,
  232. task_kwargs=self.request.kwargs)
  233. @classmethod
  234. def establish_connection(self, connect_timeout=None):
  235. """Establish a connection to the message broker."""
  236. return self.app.broker_connection(connect_timeout=connect_timeout)
  237. @classmethod
  238. def get_publisher(self, connection=None, exchange=None,
  239. connect_timeout=None, exchange_type=None):
  240. """Get a celery task message publisher.
  241. :rtype :class:`~celery.app.amqp.TaskPublisher`:
  242. Please be sure to close the AMQP connection when you're done
  243. with this object, i.e.:
  244. >>> publisher = self.get_publisher()
  245. >>> # do something with publisher
  246. >>> publisher.connection.close()
  247. """
  248. if exchange is None:
  249. exchange = self.exchange
  250. if exchange_type is None:
  251. exchange_type = self.exchange_type
  252. connection = connection or self.establish_connection(connect_timeout)
  253. return self.app.amqp.TaskPublisher(connection=connection,
  254. exchange=exchange,
  255. exchange_type=exchange_type,
  256. routing_key=self.routing_key)
  257. @classmethod
  258. def get_consumer(self, connection=None, connect_timeout=None):
  259. """Get a celery task message consumer.
  260. :rtype :class:`~celery.app.amqp.TaskConsumer`:
  261. Please be sure to close the AMQP connection when you're done
  262. with this object. i.e.:
  263. >>> consumer = self.get_consumer()
  264. >>> # do something with consumer
  265. >>> consumer.connection.close()
  266. """
  267. connection = connection or self.establish_connection(connect_timeout)
  268. return self.app.amqp.TaskConsumer(connection=connection,
  269. exchange=self.exchange,
  270. routing_key=self.routing_key)
  271. @classmethod
  272. def delay(self, *args, **kwargs):
  273. """Shortcut to :meth:`apply_async`, with star arguments,
  274. but doesn't support the extra options.
  275. :param \*args: positional arguments passed on to the task.
  276. :param \*\*kwargs: keyword arguments passed on to the task.
  277. :returns :class:`celery.result.AsyncResult`:
  278. """
  279. return self.apply_async(args, kwargs)
  280. @classmethod
  281. def apply_async(self, args=None, kwargs=None, countdown=None,
  282. eta=None, task_id=None, publisher=None, connection=None,
  283. connect_timeout=None, router=None, expires=None, queues=None,
  284. **options):
  285. """Run a task asynchronously by the celery daemon(s).
  286. :keyword args: The positional arguments to pass on to the
  287. task (a :class:`list` or :class:`tuple`).
  288. :keyword kwargs: The keyword arguments to pass on to the
  289. task (a :class:`dict`)
  290. :keyword countdown: Number of seconds into the future that the
  291. task should execute. Defaults to immediate delivery (Do not
  292. confuse that with the ``immediate`` setting, they are
  293. unrelated).
  294. :keyword eta: A :class:`~datetime.datetime` object that describes
  295. the absolute time and date of when the task should execute.
  296. May not be specified if ``countdown`` is also supplied. (Do
  297. not confuse this with the ``immediate`` setting, they are
  298. unrelated).
  299. :keyword expires: Either a :class:`int`, describing the number of
  300. seconds, or a :class:`~datetime.datetime` object that
  301. describes the absolute time and date of when the task should
  302. expire. The task will not be executed after the
  303. expiration time.
  304. :keyword connection: Re-use existing broker connection instead
  305. of establishing a new one. The ``connect_timeout`` argument
  306. is not respected if this is set.
  307. :keyword connect_timeout: The timeout in seconds, before we give
  308. up on establishing a connection to the AMQP server.
  309. :keyword routing_key: The routing key used to route the task to a
  310. worker server. Defaults to the tasks
  311. :attr:`routing_key` attribute.
  312. :keyword exchange: The named exchange to send the task to.
  313. Defaults to the tasks :attr:`exchange` attribute.
  314. :keyword exchange_type: The exchange type to initalize the
  315. exchange if not already declared. Defaults to the tasks
  316. :attr:`exchange_type` attribute.
  317. :keyword immediate: Request immediate delivery. Will raise an
  318. exception if the task cannot be routed to a worker
  319. immediately. (Do not confuse this parameter with
  320. the ``countdown`` and ``eta`` settings, as they are
  321. unrelated). Defaults to the tasks :attr:`immediate` attribute.
  322. :keyword mandatory: Mandatory routing. Raises an exception if
  323. there's no running workers able to take on this task.
  324. Defaults to the tasks :attr:`mandatory` attribute.
  325. :keyword priority: The task priority, a number between 0 and 9.
  326. Defaults to the tasks :attr:`priority` attribute.
  327. :keyword serializer: A string identifying the default
  328. serialization method to use. Defaults to the
  329. ``CELERY_TASK_SERIALIZER`` setting. Can be ``pickle``,
  330. ``json``, ``yaml``, or any custom serialization method
  331. that has been registered with
  332. :mod:`carrot.serialization.registry`. Defaults to the tasks
  333. :attr:`serializer` attribute.
  334. **Note**: If the ``CELERY_ALWAYS_EAGER`` setting is set, it will
  335. be replaced by a local :func:`apply` call instead.
  336. """
  337. router = self.app.amqp.Router(queues)
  338. if self.app.conf.CELERY_ALWAYS_EAGER:
  339. return self.apply(args, kwargs, task_id=task_id)
  340. options = dict(extract_exec_options(self), **options)
  341. options = router.route(options, self.name, args, kwargs)
  342. exchange = options.get("exchange")
  343. exchange_type = options.get("exchange_type")
  344. expires = expires or self.expires
  345. publish = publisher or self.get_publisher(connection,
  346. exchange=exchange,
  347. exchange_type=exchange_type)
  348. try:
  349. task_id = publish.delay_task(self.name, args, kwargs,
  350. task_id=task_id,
  351. countdown=countdown,
  352. eta=eta, expires=expires,
  353. **options)
  354. finally:
  355. publisher or publish.close()
  356. if not connection:
  357. # close automatically created connection
  358. publish.connection.close()
  359. return self.AsyncResult(task_id)
  360. @classmethod
  361. def retry(self, args=None, kwargs=None, exc=None, throw=True,
  362. **options):
  363. """Retry the task.
  364. :param args: Positional arguments to retry with.
  365. :param kwargs: Keyword arguments to retry with.
  366. :keyword exc: Optional exception to raise instead of
  367. :exc:`~celery.exceptions.MaxRetriesExceededError` when the max
  368. restart limit has been exceeded.
  369. :keyword countdown: Time in seconds to delay the retry for.
  370. :keyword eta: Explicit time and date to run the retry at
  371. (must be a :class:`~datetime.datetime` instance).
  372. :keyword \*\*options: Any extra options to pass on to
  373. meth:`apply_async`. See :func:`celery.execute.apply_async`.
  374. :keyword throw: If this is ``False``, do not raise the
  375. :exc:`~celery.exceptions.RetryTaskError` exception,
  376. that tells the worker to mark the task as being retried.
  377. Note that this means the task will be marked as failed
  378. if the task raises an exception, or successful if it
  379. returns.
  380. :raises celery.exceptions.RetryTaskError: To tell the worker that
  381. the task has been re-sent for retry. This always happens,
  382. unless the ``throw`` keyword argument has been explicitly set
  383. to ``False``, and is considered normal operation.
  384. Example
  385. >>> class TwitterPostStatusTask(Task):
  386. ...
  387. ... def run(self, username, password, message, **kwargs):
  388. ... twitter = Twitter(username, password)
  389. ... try:
  390. ... twitter.post_status(message)
  391. ... except twitter.FailWhale, exc:
  392. ... # Retry in 5 minutes.
  393. ... self.retry([username, password, message],
  394. ... kwargs,
  395. ... countdown=60 * 5, exc=exc)
  396. """
  397. request = self.request
  398. if args is None:
  399. args = request.args
  400. if kwargs is None:
  401. kwargs = request.kwargs
  402. delivery_info = request.delivery_info
  403. options.setdefault("exchange", delivery_info.get("exchange"))
  404. options.setdefault("routing_key", delivery_info.get("routing_key"))
  405. options["retries"] = request.retries + 1
  406. options["task_id"] = kwargs.pop("task_id", None)
  407. options["countdown"] = options.get("countdown",
  408. self.default_retry_delay)
  409. max_exc = exc or self.MaxRetriesExceededError(
  410. "Can't retry %s[%s] args:%s kwargs:%s" % (
  411. self.name, options["task_id"], args, kwargs))
  412. max_retries = self.max_retries
  413. if max_retries is not None and options["retries"] > max_retries:
  414. raise max_exc
  415. # If task was executed eagerly using apply(),
  416. # then the retry must also be executed eagerly.
  417. if request.is_eager:
  418. result = self.apply(args=args, kwargs=kwargs, **options)
  419. if isinstance(result, EagerResult):
  420. return result.get() # propogates exceptions.
  421. return result
  422. self.apply_async(args=args, kwargs=kwargs, **options)
  423. if throw:
  424. message = "Retry in %d seconds." % options["countdown"]
  425. raise RetryTaskError(message, exc)
  426. @classmethod
  427. def apply(self, args=None, kwargs=None, **options):
  428. """Execute this task locally, by blocking until the task
  429. returns.
  430. :param args: positional arguments passed on to the task.
  431. :param kwargs: keyword arguments passed on to the task.
  432. :keyword throw: Re-raise task exceptions. Defaults to
  433. the :setting:`CELERY_EAGER_PROPAGATES_EXCEPTIONS` setting.
  434. :rtype :class:`celery.result.EagerResult`:
  435. See :func:`celery.execute.apply`.
  436. """
  437. args = args or []
  438. kwargs = kwargs or {}
  439. task_id = options.get("task_id") or gen_unique_id()
  440. retries = options.get("retries", 0)
  441. throw = self.app.either("CELERY_EAGER_PROPAGATES_EXCEPTIONS",
  442. options.pop("throw", None))
  443. # Make sure we get the task instance, not class.
  444. task = tasks[self.name]
  445. request = {"id": task_id,
  446. "retries": retries,
  447. "is_eager": True,
  448. "logfile": options.get("logfile"),
  449. "loglevel": options.get("loglevel", 0),
  450. "delivery_info": {"is_eager": True}}
  451. if self.accept_magic_kwargs:
  452. default_kwargs = {"task_name": task.name,
  453. "task_id": task_id,
  454. "task_retries": retries,
  455. "task_is_eager": True,
  456. "logfile": options.get("logfile"),
  457. "loglevel": options.get("loglevel", 0),
  458. "delivery_info": {"is_eager": True}}
  459. supported_keys = fun_takes_kwargs(task.run, default_kwargs)
  460. extend_with = dict((key, val)
  461. for key, val in default_kwargs.items()
  462. if key in supported_keys)
  463. kwargs.update(extend_with)
  464. trace = TaskTrace(task.name, task_id, args, kwargs,
  465. task=task, request=request)
  466. retval = trace.execute()
  467. if isinstance(retval, ExceptionInfo):
  468. if throw:
  469. raise retval.exception
  470. retval = retval.exception
  471. return EagerResult(task_id, retval, trace.status,
  472. traceback=trace.strtb)
  473. @classmethod
  474. def AsyncResult(self, task_id):
  475. """Get AsyncResult instance for this kind of task.
  476. :param task_id: Task id to get result for.
  477. """
  478. return self.app.AsyncResult(task_id, backend=self.backend)
  479. def update_state(self, task_id=None, state=None, meta=None):
  480. """Update task state.
  481. :param task_id: Id of the task to update.
  482. :param state: New state (:class:`str`).
  483. :param meta: State metadata (:class:`dict`).
  484. """
  485. if task_id is None:
  486. task_id = self.request.id
  487. self.backend.store_result(task_id, meta, state)
  488. def on_retry(self, exc, task_id, args, kwargs, einfo=None):
  489. """Retry handler.
  490. This is run by the worker when the task is to be retried.
  491. :param exc: The exception sent to :meth:`retry`.
  492. :param task_id: Unique id of the retried task.
  493. :param args: Original arguments for the retried task.
  494. :param kwargs: Original keyword arguments for the retried task.
  495. :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
  496. instance, containing the traceback.
  497. The return value of this handler is ignored.
  498. """
  499. pass
  500. def after_return(self, status, retval, task_id, args,
  501. kwargs, einfo=None):
  502. """Handler called after the task returns.
  503. :param status: Current task state.
  504. :param retval: Task return value/exception.
  505. :param task_id: Unique id of the task.
  506. :param args: Original arguments for the task that failed.
  507. :param kwargs: Original keyword arguments for the task
  508. that failed.
  509. :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
  510. instance, containing the traceback (if any).
  511. The return value of this handler is ignored.
  512. """
  513. pass
  514. def on_failure(self, exc, task_id, args, kwargs, einfo=None):
  515. """Error handler.
  516. This is run by the worker when the task fails.
  517. :param exc: The exception raised by the task.
  518. :param task_id: Unique id of the failed task.
  519. :param args: Original arguments for the task that failed.
  520. :param kwargs: Original keyword arguments for the task
  521. that failed.
  522. :keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
  523. instance, containing the traceback.
  524. The return value of this handler is ignored.
  525. """
  526. pass
  527. def on_success(self, retval, task_id, args, kwargs):
  528. """Success handler.
  529. Run by the worker if the task executes successfully.
  530. :param retval: The return value of the task.
  531. :param task_id: Unique id of the executed task.
  532. :param args: Original arguments for the executed task.
  533. :param kwargs: Original keyword arguments for the executed task.
  534. The return value of this handler is ignored.
  535. """
  536. pass
  537. def execute(self, wrapper, pool, loglevel, logfile):
  538. """The method the worker calls to execute the task.
  539. :param wrapper: A :class:`~celery.worker.job.TaskRequest`.
  540. :param pool: A task pool.
  541. :param loglevel: Current loglevel.
  542. :param logfile: Name of the currently used logfile.
  543. """
  544. wrapper.execute_using_pool(pool, loglevel, logfile)
  545. def __repr__(self):
  546. """repr(task)"""
  547. try:
  548. kind = self.__class__.mro()[1].__name__
  549. except (AttributeError, IndexError): # pragma: no cover
  550. kind = "%s(Task)" % self.__class__.__name__
  551. return "<%s: %s (%s)>" % (kind, self.name, self.type)
  552. @classmethod
  553. def subtask(cls, *args, **kwargs):
  554. """Returns a :class:`~celery.task.sets.subtask` object for
  555. this task that wraps arguments and execution options
  556. for a single task invocation."""
  557. return subtask(cls, *args, **kwargs)
  558. @property
  559. def __name__(self):
  560. return self.__class__.__name__
  561. def create_task_cls(app):
  562. apps = [app]
  563. class Task(BaseTask):
  564. app = apps[0]
  565. backend = app.backend
  566. exchange_type = app.conf.CELERY_DEFAULT_EXCHANGE_TYPE
  567. delivery_mode = app.conf.CELERY_DEFAULT_DELIVERY_MODE
  568. send_error_emails = app.conf.CELERY_SEND_TASK_ERROR_EMAILS
  569. error_whitelist = app.conf.CELERY_TASK_ERROR_WHITELIST
  570. serializer = app.conf.CELERY_TASK_SERIALIZER
  571. rate_limit = app.conf.CELERY_DEFAULT_RATE_LIMIT
  572. track_started = app.conf.CELERY_TRACK_STARTED
  573. acks_late = app.conf.CELERY_ACKS_LATE
  574. ignore_result = app.conf.CELERY_IGNORE_RESULT
  575. store_errors_even_if_ignored = \
  576. app.conf.CELERY_STORE_ERRORS_EVEN_IF_IGNORED
  577. return Task
  578. Task = create_task_cls(app_or_default())
  579. class PeriodicTask(Task):
  580. """A periodic task is a task that behaves like a :manpage:`cron` job.
  581. Results of periodic tasks are not stored by default.
  582. .. attribute:: run_every
  583. *REQUIRED* Defines how often the task is run (its interval),
  584. it can be a :class:`~datetime.timedelta` object, a
  585. :class:`~celery.task.schedules.crontab` object or an integer
  586. specifying the time in seconds.
  587. .. attribute:: relative
  588. If set to :const:`True`, run times are relative to the time when the
  589. server was started. This was the previous behaviour, periodic tasks
  590. are now scheduled by the clock.
  591. :raises NotImplementedError: if the :attr:`run_every` attribute is
  592. not defined.
  593. Example
  594. >>> from celery.task import tasks, PeriodicTask
  595. >>> from datetime import timedelta
  596. >>> class EveryThirtySecondsTask(PeriodicTask):
  597. ... run_every = timedelta(seconds=30)
  598. ...
  599. ... def run(self, **kwargs):
  600. ... logger = self.get_logger(**kwargs)
  601. ... logger.info("Execute every 30 seconds")
  602. >>> from celery.task import PeriodicTask
  603. >>> from celery.task.schedules import crontab
  604. >>> class EveryMondayMorningTask(PeriodicTask):
  605. ... run_every = crontab(hour=7, minute=30, day_of_week=1)
  606. ...
  607. ... def run(self, **kwargs):
  608. ... logger = self.get_logger(**kwargs)
  609. ... logger.info("Execute every Monday at 7:30AM.")
  610. >>> class EveryMorningTask(PeriodicTask):
  611. ... run_every = crontab(hours=7, minute=30)
  612. ...
  613. ... def run(self, **kwargs):
  614. ... logger = self.get_logger(**kwargs)
  615. ... logger.info("Execute every day at 7:30AM.")
  616. >>> class EveryQuarterPastTheHourTask(PeriodicTask):
  617. ... run_every = crontab(minute=15)
  618. ...
  619. ... def run(self, **kwargs):
  620. ... logger = self.get_logger(**kwargs)
  621. ... logger.info("Execute every 0:15 past the hour every day.")
  622. """
  623. abstract = True
  624. ignore_result = True
  625. type = "periodic"
  626. relative = False
  627. def __init__(self):
  628. app = app_or_default()
  629. if not hasattr(self, "run_every"):
  630. raise NotImplementedError(
  631. "Periodic tasks must have a run_every attribute")
  632. self.run_every = maybe_schedule(self.run_every, self.relative)
  633. # Periodic task classes is pending deprecation.
  634. warnings.warn(PendingDeprecationWarning(PERIODIC_DEPRECATION_TEXT))
  635. # For backward compatibility, add the periodic task to the
  636. # configuration schedule instead.
  637. app.conf.CELERYBEAT_SCHEDULE[self.name] = {
  638. "task": self.name,
  639. "schedule": self.run_every,
  640. "args": (),
  641. "kwargs": {},
  642. "options": {},
  643. "relative": self.relative,
  644. }
  645. super(PeriodicTask, self).__init__()
  646. def timedelta_seconds(self, delta):
  647. """Convert :class:`~datetime.timedelta` to seconds.
  648. Doesn't account for negative timedeltas.
  649. """
  650. return timedelta_seconds(delta)
  651. def is_due(self, last_run_at):
  652. """Returns tuple of two items ``(is_due, next_time_to_run)``,
  653. where next time to run is in seconds.
  654. See :meth:`celery.schedules.schedule.is_due` for more information.
  655. """
  656. return self.run_every.is_due(last_run_at)
  657. def remaining_estimate(self, last_run_at):
  658. """Returns when the periodic task should run next as a timedelta."""
  659. return self.run_every.remaining_estimate(last_run_at)