base.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. import sys
  2. import warnings
  3. from celery import conf
  4. from celery.backends import default_backend
  5. from celery.exceptions import MaxRetriesExceededError, RetryTaskError
  6. from celery.execute import apply_async, apply
  7. from celery.log import setup_task_logger
  8. from celery.messaging import TaskPublisher, TaskConsumer
  9. from celery.messaging import establish_connection as _establish_connection
  10. from celery.registry import tasks
  11. from celery.result import BaseAsyncResult, EagerResult
  12. from celery.schedules import maybe_schedule
  13. from celery.utils.timeutils import timedelta_seconds
  14. from celery.task.sets import TaskSet, subtask
  15. PERIODIC_DEPRECATION_TEXT = """\
  16. Periodic task classes has been deprecated and will be removed
  17. in celery v3.0.
  18. Please use the CELERYBEAT_SCHEDULE setting instead:
  19. CELERYBEAT_SCHEDULE = {
  20. name: dict(task=task_name, schedule=run_every,
  21. args=(), kwargs={}, options={}, relative=False)
  22. }
  23. """
  24. def _unpickle_task(name):
  25. return tasks[name]
  26. class TaskType(type):
  27. """Metaclass for tasks.
  28. Automatically registers the task in the task registry, except
  29. if the ``abstract`` attribute is set.
  30. If no ``name`` attribute is provided, the name is automatically
  31. set to the name of the module it was defined in, and the class name.
  32. """
  33. def __new__(cls, name, bases, attrs):
  34. super_new = super(TaskType, cls).__new__
  35. task_module = attrs["__module__"]
  36. # Abstract class, remove the abstract attribute so
  37. # any class inheriting from this won't be abstract by default.
  38. if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
  39. return super_new(cls, name, bases, attrs)
  40. # Automatically generate missing name.
  41. if not attrs.get("name"):
  42. task_module = sys.modules[task_module]
  43. task_name = ".".join([task_module.__name__, name])
  44. attrs["name"] = task_name
  45. # Because of the way import happens (recursively)
  46. # we may or may not be the first time the task tries to register
  47. # with the framework. There should only be one class for each task
  48. # name, so we always return the registered version.
  49. task_name = attrs["name"]
  50. if task_name not in tasks:
  51. task_cls = super_new(cls, name, bases, attrs)
  52. tasks.register(task_cls)
  53. return tasks[task_name].__class__
  54. class Task(object):
  55. """A celery task.
  56. All subclasses of :class:`Task` must define the :meth:`run` method,
  57. which is the actual method the ``celery`` daemon executes.
  58. The :meth:`run` method can take use of the default keyword arguments,
  59. as listed in the :meth:`run` documentation.
  60. The resulting class is callable, which if called will apply the
  61. :meth:`run` method.
  62. .. attribute:: name
  63. Name of the task.
  64. .. attribute:: abstract
  65. If :const:`True` the task is an abstract base class.
  66. .. attribute:: type
  67. The type of task, currently this can be ``regular``, or ``periodic``,
  68. however if you want a periodic task, you should subclass
  69. :class:`PeriodicTask` instead.
  70. .. attribute:: queue
  71. Select a destination queue for this task. The queue needs to exist
  72. in :setting:`CELERY_QUEUES`. The ``routing_key``, ``exchange`` and
  73. ``exchange_type`` attributes will be ignored if this is set.
  74. .. attribute:: routing_key
  75. Override the global default ``routing_key`` for this task.
  76. .. attribute:: exchange
  77. Override the global default ``exchange`` for this task.
  78. .. attribute:: exchange_type
  79. Override the global default exchange type for this task.
  80. .. attribute:: delivery_mode
  81. Override the global default delivery mode for this task.
  82. By default this is set to ``2`` (persistent). You can change this
  83. to ``1`` to get non-persistent behavior, which means the messages
  84. are lost if the broker is restarted.
  85. .. attribute:: mandatory
  86. Mandatory message routing. An exception will be raised if the task
  87. can't be routed to a queue.
  88. .. attribute:: immediate:
  89. Request immediate delivery. An exception will be raised if the task
  90. can't be routed to a worker immediately.
  91. .. attribute:: priority:
  92. The message priority. A number from ``0`` to ``9``, where ``0`` is the
  93. highest. Note that RabbitMQ doesn't support priorities yet.
  94. .. attribute:: max_retries
  95. Maximum number of retries before giving up.
  96. If set to :const:`None`, it will never stop retrying.
  97. .. attribute:: default_retry_delay
  98. Default time in seconds before a retry of the task should be
  99. executed. Default is a 3 minute delay.
  100. .. attribute:: rate_limit
  101. Set the rate limit for this task type, Examples: :const:`None` (no
  102. rate limit), ``"100/s"`` (hundred tasks a second), ``"100/m"``
  103. (hundred tasks a minute), ``"100/h"`` (hundred tasks an hour)
  104. .. attribute:: ignore_result
  105. Don't store the return value of this task.
  106. .. attribute:: store_errors_even_if_ignored
  107. If true, errors will be stored even if the task is configured
  108. to ignore results.
  109. .. attribute:: send_error_emails
  110. If true, an e-mail will be sent to the admins whenever
  111. a task of this type raises an exception.
  112. .. attribute:: error_whitelist
  113. List of exception types to send error e-mails for.
  114. .. attribute:: serializer
  115. The name of a serializer that has been registered with
  116. :mod:`carrot.serialization.registry`. Example: ``"json"``.
  117. .. attribute:: backend
  118. The result store backend used for this task.
  119. .. attribute:: autoregister
  120. If :const:`True` the task is automatically registered in the task
  121. registry, which is the default behaviour.
  122. .. attribute:: track_started
  123. If :const:`True` the task will report its status as "started"
  124. when the task is executed by a worker.
  125. The default value is ``False`` as the normal behaviour is to not
  126. report that level of granularity. Tasks are either pending, finished,
  127. or waiting to be retried. Having a "started" status can be useful for
  128. when there are long running tasks and there is a need to report which
  129. task is currently running.
  130. The global default can be overridden by the
  131. :setting:`CELERY_TRACK_STARTED` setting.
  132. .. attribute:: acks_late
  133. If set to :const:`True` messages for this task will be acknowledged
  134. **after** the task has been executed, not *just before*, which is
  135. the default behavior.
  136. Note that this means the task may be executed twice if the worker
  137. crashes in the middle of execution, which may be acceptable for some
  138. applications.
  139. The global default can be overriden by the :setting:`CELERY_ACKS_LATE`
  140. setting.
  141. """
  142. __metaclass__ = TaskType
  143. name = None
  144. abstract = True
  145. autoregister = True
  146. type = "regular"
  147. queue = None
  148. routing_key = None
  149. exchange = None
  150. exchange_type = conf.DEFAULT_EXCHANGE_TYPE
  151. delivery_mode = conf.DEFAULT_DELIVERY_MODE
  152. immediate = False
  153. mandatory = False
  154. priority = None
  155. ignore_result = conf.IGNORE_RESULT
  156. store_errors_even_if_ignored = conf.STORE_ERRORS_EVEN_IF_IGNORED
  157. send_error_emails = conf.CELERY_SEND_TASK_ERROR_EMAILS
  158. error_whitelist = conf.CELERY_TASK_ERROR_WHITELIST
  159. disable_error_emails = False # FIXME
  160. max_retries = 5
  161. default_retry_delay = 3 * 60
  162. serializer = conf.TASK_SERIALIZER
  163. rate_limit = conf.DEFAULT_RATE_LIMIT
  164. backend = default_backend
  165. track_started = conf.TRACK_STARTED
  166. acks_late = conf.ACKS_LATE
  167. MaxRetriesExceededError = MaxRetriesExceededError
  168. def __call__(self, *args, **kwargs):
  169. return self.run(*args, **kwargs)
  170. def __reduce__(self):
  171. return (_unpickle_task, (self.name, ), None)
  172. def run(self, *args, **kwargs):
  173. """The body of the task executed by the worker.
  174. The following standard keyword arguments are reserved and is passed
  175. by the worker if the function/method supports them:
  176. * task_id
  177. * task_name
  178. * task_retries
  179. * task_is_eager
  180. * logfile
  181. * loglevel
  182. * delivery_info
  183. Additional standard keyword arguments may be added in the future.
  184. To take these default arguments, the task can either list the ones
  185. it wants explicitly or just take an arbitrary list of keyword
  186. arguments (\*\*kwargs).
  187. """
  188. raise NotImplementedError("Tasks must define the run method.")
  189. @classmethod
  190. def get_logger(self, loglevel=None, logfile=None, **kwargs):
  191. """Get task-aware logger object.
  192. See :func:`celery.log.setup_task_logger`.
  193. """
  194. return setup_task_logger(loglevel=loglevel, logfile=logfile,
  195. task_kwargs=kwargs)
  196. @classmethod
  197. def establish_connection(self,
  198. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  199. """Establish a connection to the message broker."""
  200. return _establish_connection(connect_timeout=connect_timeout)
  201. @classmethod
  202. def get_publisher(self, connection=None, exchange=None,
  203. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT,
  204. exchange_type=None):
  205. """Get a celery task message publisher.
  206. :rtype :class:`celery.messaging.TaskPublisher`:
  207. Please be sure to close the AMQP connection when you're done
  208. with this object, i.e.:
  209. >>> publisher = self.get_publisher()
  210. >>> # do something with publisher
  211. >>> publisher.connection.close()
  212. """
  213. if exchange is None:
  214. exchange = self.exchange
  215. if exchange_type is None:
  216. exchange_type = self.exchange_type
  217. connection = connection or self.establish_connection(connect_timeout)
  218. return TaskPublisher(connection=connection,
  219. exchange=exchange,
  220. exchange_type=exchange_type,
  221. routing_key=self.routing_key)
  222. @classmethod
  223. def get_consumer(self, connection=None,
  224. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  225. """Get a celery task message consumer.
  226. :rtype :class:`celery.messaging.TaskConsumer`:
  227. Please be sure to close the AMQP connection when you're done
  228. with this object. i.e.:
  229. >>> consumer = self.get_consumer()
  230. >>> # do something with consumer
  231. >>> consumer.connection.close()
  232. """
  233. connection = connection or self.establish_connection(connect_timeout)
  234. return TaskConsumer(connection=connection, exchange=self.exchange,
  235. routing_key=self.routing_key)
  236. @classmethod
  237. def delay(self, *args, **kwargs):
  238. """Shortcut to :meth:`apply_async`, with star arguments,
  239. but doesn't support the extra options.
  240. :param \*args: positional arguments passed on to the task.
  241. :param \*\*kwargs: keyword arguments passed on to the task.
  242. :returns :class:`celery.result.AsyncResult`:
  243. """
  244. return self.apply_async(args, kwargs)
  245. @classmethod
  246. def apply_async(self, args=None, kwargs=None, **options):
  247. """Delay this task for execution by the ``celery`` daemon(s).
  248. :param args: positional arguments passed on to the task.
  249. :param kwargs: keyword arguments passed on to the task.
  250. :keyword \*\*options: Any keyword arguments to pass on to
  251. :func:`celery.execute.apply_async`.
  252. See :func:`celery.execute.apply_async` for more information.
  253. :returns :class:`celery.result.AsyncResult`:
  254. """
  255. return apply_async(self, args, kwargs, **options)
  256. @classmethod
  257. def retry(self, args=None, kwargs=None, exc=None, throw=True, **options):
  258. """Retry the task.
  259. :param args: Positional arguments to retry with.
  260. :param kwargs: Keyword arguments to retry with.
  261. :keyword exc: Optional exception to raise instead of
  262. :exc:`~celery.exceptions.MaxRetriesExceededError` when the max
  263. restart limit has been exceeded.
  264. :keyword countdown: Time in seconds to delay the retry for.
  265. :keyword eta: Explicit time and date to run the retry at (must be a
  266. :class:`datetime.datetime` instance).
  267. :keyword \*\*options: Any extra options to pass on to
  268. meth:`apply_async`. See :func:`celery.execute.apply_async`.
  269. :keyword throw: If this is ``False``, do not raise the
  270. :exc:`~celery.exceptions.RetryTaskError` exception,
  271. that tells the worker to mark the task as being retried.
  272. Note that this means the task will be marked as failed
  273. if the task raises an exception, or successful if it
  274. returns.
  275. :raises celery.exceptions.RetryTaskError: To tell the worker that the
  276. task has been re-sent for retry. This always happens, unless
  277. the ``throw`` keyword argument has been explicitly set
  278. to ``False``, and is considered normal operation.
  279. Example
  280. >>> class TwitterPostStatusTask(Task):
  281. ...
  282. ... def run(self, username, password, message, **kwargs):
  283. ... twitter = Twitter(username, password)
  284. ... try:
  285. ... twitter.post_status(message)
  286. ... except twitter.FailWhale, exc:
  287. ... # Retry in 5 minutes.
  288. ... self.retry([username, password, message], kwargs,
  289. ... countdown=60 * 5, exc=exc)
  290. """
  291. if not kwargs:
  292. raise TypeError(
  293. "kwargs argument to retries can't be empty. "
  294. "Task must accept **kwargs, see http://bit.ly/cAx3Bg")
  295. delivery_info = kwargs.pop("delivery_info", {})
  296. options.setdefault("exchange", delivery_info.get("exchange"))
  297. options.setdefault("routing_key", delivery_info.get("routing_key"))
  298. options["retries"] = kwargs.pop("task_retries", 0) + 1
  299. options["task_id"] = kwargs.pop("task_id", None)
  300. options["countdown"] = options.get("countdown",
  301. self.default_retry_delay)
  302. max_exc = exc or self.MaxRetriesExceededError(
  303. "Can't retry %s[%s] args:%s kwargs:%s" % (
  304. self.name, options["task_id"], args, kwargs))
  305. max_retries = self.max_retries
  306. if max_retries is not None and options["retries"] > max_retries:
  307. raise max_exc
  308. # If task was executed eagerly using apply(),
  309. # then the retry must also be executed eagerly.
  310. if kwargs.get("task_is_eager", False):
  311. result = self.apply(args=args, kwargs=kwargs, **options)
  312. if isinstance(result, EagerResult):
  313. return result.get() # propogates exceptions.
  314. return result
  315. self.apply_async(args=args, kwargs=kwargs, **options)
  316. if throw:
  317. message = "Retry in %d seconds." % options["countdown"]
  318. raise RetryTaskError(message, exc)
  319. @classmethod
  320. def apply(self, args=None, kwargs=None, **options):
  321. """Execute this task locally, by blocking until the task
  322. has finished executing.
  323. :param args: positional arguments passed on to the task.
  324. :param kwargs: keyword arguments passed on to the task.
  325. :keyword throw: Re-raise task exceptions. Defaults to
  326. the :setting:`CELERY_EAGER_PROPAGATES_EXCEPTIONS` setting.
  327. :rtype :class:`celery.result.EagerResult`:
  328. See :func:`celery.execute.apply`.
  329. """
  330. return apply(self, args, kwargs, **options)
  331. @classmethod
  332. def AsyncResult(self, task_id):
  333. """Get AsyncResult instance for this kind of task.
  334. :param task_id: Task id to get result for.
  335. """
  336. return BaseAsyncResult(task_id, backend=self.backend)
  337. def update_state(self, task_id, state, meta=None):
  338. """Update task state.
  339. :param task_id: Id of the task to update.
  340. :param state: New state (:class:`str`).
  341. :param meta: State metadata (:class:`dict`).
  342. """
  343. self.backend.store_result(task_id, meta, state)
  344. def on_retry(self, exc, task_id, args, kwargs, einfo=None):
  345. """Retry handler.
  346. This is run by the worker when the task is to be retried.
  347. :param exc: The exception sent to :meth:`retry`.
  348. :param task_id: Unique id of the retried task.
  349. :param args: Original arguments for the retried task.
  350. :param kwargs: Original keyword arguments for the retried task.
  351. :keyword einfo: :class:`~celery.datastructures.ExceptionInfo` instance,
  352. containing the traceback.
  353. The return value of this handler is ignored.
  354. """
  355. pass
  356. def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
  357. """Handler called after the task returns.
  358. :param status: Current task state.
  359. :param retval: Task return value/exception.
  360. :param task_id: Unique id of the task.
  361. :param args: Original arguments for the task that failed.
  362. :param kwargs: Original keyword arguments for the task that failed.
  363. :keyword einfo: :class:`~celery.datastructures.ExceptionInfo` instance,
  364. containing the traceback (if any).
  365. The return value of this handler is ignored.
  366. """
  367. pass
  368. def on_failure(self, exc, task_id, args, kwargs, einfo=None):
  369. """Error handler.
  370. This is run by the worker when the task fails.
  371. :param exc: The exception raised by the task.
  372. :param task_id: Unique id of the failed task.
  373. :param args: Original arguments for the task that failed.
  374. :param kwargs: Original keyword arguments for the task that failed.
  375. :keyword einfo: :class:`~celery.datastructures.ExceptionInfo` instance,
  376. containing the traceback.
  377. The return value of this handler is ignored.
  378. """
  379. pass
  380. def on_success(self, retval, task_id, args, kwargs):
  381. """Success handler.
  382. Run by the worker if the task executes successfully.
  383. :param retval: The return value of the task.
  384. :param task_id: Unique id of the executed task.
  385. :param args: Original arguments for the executed task.
  386. :param kwargs: Original keyword arguments for the executed task.
  387. The return value of this handler is ignored.
  388. """
  389. pass
  390. def execute(self, wrapper, pool, loglevel, logfile):
  391. """The method the worker calls to execute the task.
  392. :param wrapper: A :class:`~celery.worker.job.TaskRequest`.
  393. :param pool: A task pool.
  394. :param loglevel: Current loglevel.
  395. :param logfile: Name of the currently used logfile.
  396. """
  397. wrapper.execute_using_pool(pool, loglevel, logfile)
  398. def __repr__(self):
  399. """repr(task)"""
  400. try:
  401. kind = self.__class__.mro()[1].__name__
  402. except (AttributeError, IndexError): # pragma: no cover
  403. kind = "%s(Task)" % self.__class__.__name__
  404. return "<%s: %s (%s)>" % (kind, self.name, self.type)
  405. @classmethod
  406. def subtask(cls, *args, **kwargs):
  407. """Returns a :class:`~celery.task.sets.subtask` object for
  408. this task that wraps arguments and execution options
  409. for a single task invocation."""
  410. return subtask(cls, *args, **kwargs)
  411. @property
  412. def __name__(self):
  413. return self.__class__.__name__
  414. class PeriodicTask(Task):
  415. """A periodic task is a task that behaves like a :manpage:`cron` job.
  416. Results of periodic tasks are not stored by default.
  417. .. attribute:: run_every
  418. *REQUIRED* Defines how often the task is run (its interval),
  419. it can be a :class:`~datetime.timedelta` object, a
  420. :class:`~celery.task.schedules.crontab` object or an integer
  421. specifying the time in seconds.
  422. .. attribute:: relative
  423. If set to :const:`True`, run times are relative to the time when the
  424. server was started. This was the previous behaviour, periodic tasks
  425. are now scheduled by the clock.
  426. :raises NotImplementedError: if the :attr:`run_every` attribute is
  427. not defined.
  428. Example
  429. >>> from celery.task import tasks, PeriodicTask
  430. >>> from datetime import timedelta
  431. >>> class EveryThirtySecondsTask(PeriodicTask):
  432. ... run_every = timedelta(seconds=30)
  433. ...
  434. ... def run(self, **kwargs):
  435. ... logger = self.get_logger(**kwargs)
  436. ... logger.info("Execute every 30 seconds")
  437. >>> from celery.task import PeriodicTask
  438. >>> from celery.task.schedules import crontab
  439. >>> class EveryMondayMorningTask(PeriodicTask):
  440. ... run_every = crontab(hour=7, minute=30, day_of_week=1)
  441. ...
  442. ... def run(self, **kwargs):
  443. ... logger = self.get_logger(**kwargs)
  444. ... logger.info("Execute every Monday at 7:30AM.")
  445. >>> class EveryMorningTask(PeriodicTask):
  446. ... run_every = crontab(hours=7, minute=30)
  447. ...
  448. ... def run(self, **kwargs):
  449. ... logger = self.get_logger(**kwargs)
  450. ... logger.info("Execute every day at 7:30AM.")
  451. >>> class EveryQuarterPastTheHourTask(PeriodicTask):
  452. ... run_every = crontab(minute=15)
  453. ...
  454. ... def run(self, **kwargs):
  455. ... logger = self.get_logger(**kwargs)
  456. ... logger.info("Execute every 0:15 past the hour every day.")
  457. """
  458. abstract = True
  459. ignore_result = True
  460. type = "periodic"
  461. relative = False
  462. def __init__(self):
  463. if not hasattr(self, "run_every"):
  464. raise NotImplementedError(
  465. "Periodic tasks must have a run_every attribute")
  466. self.run_every = maybe_schedule(self.run_every, self.relative)
  467. # Periodic task classes is pending deprecation.
  468. warnings.warn(PendingDeprecationWarning(PERIODIC_DEPRECATION_TEXT))
  469. # For backward compatibility, add the periodic task to the
  470. # configuration schedule instead.
  471. conf.CELERYBEAT_SCHEDULE[self.name] = {
  472. "task": self.name,
  473. "schedule": self.run_every,
  474. "args": (),
  475. "kwargs": {},
  476. "options": {},
  477. "relative": self.relative,
  478. }
  479. super(PeriodicTask, self).__init__()
  480. def timedelta_seconds(self, delta):
  481. """Convert :class:`~datetime.timedelta` to seconds.
  482. Doesn't account for negative timedeltas.
  483. """
  484. return timedelta_seconds(delta)
  485. def is_due(self, last_run_at):
  486. """Returns tuple of two items ``(is_due, next_time_to_run)``,
  487. where next time to run is in seconds.
  488. See :meth:`celery.schedules.schedule.is_due` for more information.
  489. """
  490. return self.run_every.is_due(last_run_at)
  491. def remaining_estimate(self, last_run_at):
  492. """Returns when the periodic task should run next as a timedelta."""
  493. return self.run_every.remaining_estimate(last_run_at)