base.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731
  1. # -*- coding: utf-8 -*-
  2. """Result backend base classes.
  3. - :class:`BaseBackend` defines the interface.
  4. - :class:`KeyValueStoreBackend` is a common base class
  5. using K/V semantics like _get and _put.
  6. """
  7. import sys
  8. import time
  9. from collections import namedtuple
  10. from datetime import timedelta
  11. from weakref import WeakValueDictionary
  12. from billiard.einfo import ExceptionInfo
  13. from kombu.serialization import (
  14. dumps, loads, prepare_accept_content,
  15. registry as serializer_registry,
  16. )
  17. from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
  18. from kombu.utils.url import maybe_sanitize_url
  19. from celery import states
  20. from celery import current_app, group, maybe_signature
  21. from celery.app import current_task
  22. from celery.exceptions import ChordError, TimeoutError, TaskRevokedError
  23. from celery.result import (
  24. GroupResult, ResultBase, allow_join_result, result_from_tuple,
  25. )
  26. from celery.utils.collections import BufferMap
  27. from celery.utils.functional import LRUCache, arity_greater
  28. from celery.utils.log import get_logger
  29. from celery.utils.serialization import (
  30. get_pickled_exception,
  31. get_pickleable_exception,
  32. create_exception_cls,
  33. )
  34. __all__ = ['BaseBackend', 'KeyValueStoreBackend', 'DisabledBackend']
  35. EXCEPTION_ABLE_CODECS = frozenset({'pickle'})
  36. logger = get_logger(__name__)
  37. MESSAGE_BUFFER_MAX = 8192
  38. pending_results_t = namedtuple('pending_results_t', (
  39. 'concrete', 'weak',
  40. ))
  41. def unpickle_backend(cls, args, kwargs):
  42. """Return an unpickled backend."""
  43. return cls(*args, app=current_app._get_current_object(), **kwargs)
  44. class _nulldict(dict):
  45. def ignore(self, *a, **kw):
  46. pass
  47. __setitem__ = update = setdefault = ignore
  48. class Backend:
  49. READY_STATES = states.READY_STATES
  50. UNREADY_STATES = states.UNREADY_STATES
  51. EXCEPTION_STATES = states.EXCEPTION_STATES
  52. TimeoutError = TimeoutError
  53. #: Time to sleep between polling each individual item
  54. #: in `ResultSet.iterate`. as opposed to the `interval`
  55. #: argument which is for each pass.
  56. subpolling_interval = None
  57. #: If true the backend must implement :meth:`get_many`.
  58. supports_native_join = False
  59. #: If true the backend must automatically expire results.
  60. #: The daily backend_cleanup periodic task will not be triggered
  61. #: in this case.
  62. supports_autoexpire = False
  63. #: Set to true if the backend is peristent by default.
  64. persistent = True
  65. retry_policy = {
  66. 'max_retries': 20,
  67. 'interval_start': 0,
  68. 'interval_step': 1,
  69. 'interval_max': 1,
  70. }
  71. def __init__(self, app,
  72. serializer=None, max_cached_results=None, accept=None,
  73. expires=None, expires_type=None, url=None, **kwargs):
  74. self.app = app
  75. conf = self.app.conf
  76. self.serializer = serializer or conf.result_serializer
  77. (self.content_type,
  78. self.content_encoding,
  79. self.encoder) = serializer_registry._encoders[self.serializer]
  80. cmax = max_cached_results or conf.result_cache_max
  81. self._cache = _nulldict() if cmax == -1 else LRUCache(limit=cmax)
  82. self.expires = self.prepare_expires(expires, expires_type)
  83. self.accept = prepare_accept_content(
  84. conf.accept_content if accept is None else accept)
  85. self._pending_results = pending_results_t({}, WeakValueDictionary())
  86. self._pending_messages = BufferMap(MESSAGE_BUFFER_MAX)
  87. self.url = url
  88. def as_uri(self, include_password=False):
  89. """Return the backend as an URI, sanitizing the password or not"""
  90. # when using maybe_sanitize_url(), "/" is added
  91. # we're stripping it for consistency
  92. if include_password:
  93. return self.url
  94. url = maybe_sanitize_url(self.url or '')
  95. return url[:-1] if url.endswith(':///') else url
  96. def mark_as_started(self, task_id, **meta):
  97. """Mark a task as started"""
  98. return self.store_result(task_id, meta, states.STARTED)
  99. def mark_as_done(self, task_id, result,
  100. request=None, store_result=True, state=states.SUCCESS):
  101. """Mark task as successfully executed."""
  102. if store_result:
  103. self.store_result(task_id, result, state, request=request)
  104. if request and request.chord:
  105. self.on_chord_part_return(request, state, result)
  106. def mark_as_failure(self, task_id, exc,
  107. traceback=None, request=None,
  108. store_result=True, call_errbacks=True,
  109. state=states.FAILURE):
  110. """Mark task as executed with failure. Stores the exception."""
  111. if store_result:
  112. self.store_result(task_id, exc, state,
  113. traceback=traceback, request=request)
  114. if request:
  115. if request.chord:
  116. self.on_chord_part_return(request, state, exc)
  117. if call_errbacks and request.errbacks:
  118. self._call_task_errbacks(request, exc, traceback)
  119. def _call_task_errbacks(self, request, exc, traceback):
  120. old_signature = []
  121. for errback in request.errbacks:
  122. errback = self.app.signature(errback)
  123. if arity_greater(errback.type.__header__, 1):
  124. errback(request, exc, traceback)
  125. else:
  126. old_signature.append(errback)
  127. if old_signature:
  128. # Previously errback was called as a task so we still
  129. # need to do so if the errback only takes a single task_id arg.
  130. task_id = request.id
  131. root_id = request.root_id or task_id
  132. group(old_signature, app=self.app).apply_async(
  133. (task_id,), parent_id=task_id, root_id=root_id
  134. )
  135. def mark_as_revoked(self, task_id, reason='',
  136. request=None, store_result=True, state=states.REVOKED):
  137. exc = TaskRevokedError(reason)
  138. if store_result:
  139. self.store_result(task_id, exc, state,
  140. traceback=None, request=request)
  141. if request and request.chord:
  142. self.on_chord_part_return(request, state, exc)
  143. def mark_as_retry(self, task_id, exc, traceback=None,
  144. request=None, store_result=True, state=states.RETRY):
  145. """Mark task as being retries. Stores the current
  146. exception (if any)."""
  147. return self.store_result(task_id, exc, state,
  148. traceback=traceback, request=request)
  149. def chord_error_from_stack(self, callback, exc=None):
  150. from celery import group
  151. app = self.app
  152. backend = app._tasks[callback.task].backend
  153. try:
  154. group(
  155. [app.signature(errback)
  156. for errback in callback.options.get('link_error') or []],
  157. app=app,
  158. ).apply_async((callback.id,))
  159. except Exception as eb_exc:
  160. return backend.fail_from_current_stack(callback.id, exc=eb_exc)
  161. else:
  162. return backend.fail_from_current_stack(callback.id, exc=exc)
  163. def fail_from_current_stack(self, task_id, exc=None):
  164. type_, real_exc, tb = sys.exc_info()
  165. try:
  166. exc = real_exc if exc is None else exc
  167. ei = ExceptionInfo((type_, exc, tb))
  168. self.mark_as_failure(task_id, exc, ei.traceback)
  169. return ei
  170. finally:
  171. del(tb)
  172. def prepare_exception(self, exc, serializer=None):
  173. """Prepare exception for serialization."""
  174. serializer = self.serializer if serializer is None else serializer
  175. if serializer in EXCEPTION_ABLE_CODECS:
  176. return get_pickleable_exception(exc)
  177. return {'exc_type': type(exc).__name__, 'exc_message': str(exc)}
  178. def exception_to_python(self, exc):
  179. """Convert serialized exception to Python exception."""
  180. if exc:
  181. if not isinstance(exc, BaseException):
  182. exc = create_exception_cls(
  183. from_utf8(exc['exc_type']), __name__)(exc['exc_message'])
  184. if self.serializer in EXCEPTION_ABLE_CODECS:
  185. exc = get_pickled_exception(exc)
  186. return exc
  187. def prepare_value(self, result):
  188. """Prepare value for storage."""
  189. if self.serializer != 'pickle' and isinstance(result, ResultBase):
  190. return result.as_tuple()
  191. return result
  192. def encode(self, data):
  193. _, _, payload = dumps(data, serializer=self.serializer)
  194. return payload
  195. def meta_from_decoded(self, meta):
  196. if meta['status'] in self.EXCEPTION_STATES:
  197. meta['result'] = self.exception_to_python(meta['result'])
  198. return meta
  199. def decode_result(self, payload):
  200. return self.meta_from_decoded(self.decode(payload))
  201. def decode(self, payload):
  202. return loads(payload,
  203. content_type=self.content_type,
  204. content_encoding=self.content_encoding,
  205. accept=self.accept)
  206. def prepare_expires(self, value, type=None):
  207. if value is None:
  208. value = self.app.conf.result_expires
  209. if isinstance(value, timedelta):
  210. value = value.total_seconds()
  211. if value is not None and type:
  212. return type(value)
  213. return value
  214. def prepare_persistent(self, enabled=None):
  215. if enabled is not None:
  216. return enabled
  217. p = self.app.conf.result_persistent
  218. return self.persistent if p is None else p
  219. def encode_result(self, result, state):
  220. if state in self.EXCEPTION_STATES and isinstance(result, Exception):
  221. return self.prepare_exception(result)
  222. else:
  223. return self.prepare_value(result)
  224. def is_cached(self, task_id):
  225. return task_id in self._cache
  226. def store_result(self, task_id, result, state,
  227. traceback=None, request=None, **kwargs):
  228. """Update task state and result."""
  229. result = self.encode_result(result, state)
  230. self._store_result(task_id, result, state, traceback,
  231. request=request, **kwargs)
  232. return result
  233. def forget(self, task_id):
  234. self._cache.pop(task_id, None)
  235. self._forget(task_id)
  236. def _forget(self, task_id):
  237. raise NotImplementedError('backend does not implement forget.')
  238. def get_state(self, task_id):
  239. """Get the state of a task."""
  240. return self.get_task_meta(task_id)['status']
  241. get_status = get_state # XXX compat
  242. def get_traceback(self, task_id):
  243. """Get the traceback for a failed task."""
  244. return self.get_task_meta(task_id).get('traceback')
  245. def get_result(self, task_id):
  246. """Get the result of a task."""
  247. return self.get_task_meta(task_id).get('result')
  248. def get_children(self, task_id):
  249. """Get the list of subtasks sent by a task."""
  250. try:
  251. return self.get_task_meta(task_id)['children']
  252. except KeyError:
  253. pass
  254. def _ensure_not_eager(self):
  255. if self.app.conf.task_always_eager:
  256. raise RuntimeError(
  257. "Cannot retrieve result with task_always_eager enabled")
  258. def get_task_meta(self, task_id, cache=True):
  259. self._ensure_not_eager()
  260. if cache:
  261. try:
  262. return self._cache[task_id]
  263. except KeyError:
  264. pass
  265. meta = self._get_task_meta_for(task_id)
  266. if cache and meta.get('status') == states.SUCCESS:
  267. self._cache[task_id] = meta
  268. return meta
  269. def reload_task_result(self, task_id):
  270. """Reload task result, even if it has been previously fetched."""
  271. self._cache[task_id] = self.get_task_meta(task_id, cache=False)
  272. def reload_group_result(self, group_id):
  273. """Reload group result, even if it has been previously fetched."""
  274. self._cache[group_id] = self.get_group_meta(group_id, cache=False)
  275. def get_group_meta(self, group_id, cache=True):
  276. self._ensure_not_eager()
  277. if cache:
  278. try:
  279. return self._cache[group_id]
  280. except KeyError:
  281. pass
  282. meta = self._restore_group(group_id)
  283. if cache and meta is not None:
  284. self._cache[group_id] = meta
  285. return meta
  286. def restore_group(self, group_id, cache=True):
  287. """Get the result for a group."""
  288. meta = self.get_group_meta(group_id, cache=cache)
  289. if meta:
  290. return meta['result']
  291. def save_group(self, group_id, result):
  292. """Store the result of an executed group."""
  293. return self._save_group(group_id, result)
  294. def delete_group(self, group_id):
  295. self._cache.pop(group_id, None)
  296. return self._delete_group(group_id)
  297. def cleanup(self):
  298. """Backend cleanup. Is run by
  299. :class:`celery.task.DeleteExpiredTaskMetaTask`."""
  300. pass
  301. def process_cleanup(self):
  302. """Cleanup actions to do at the end of a task worker process."""
  303. pass
  304. def on_task_call(self, producer, task_id):
  305. return {}
  306. def add_to_chord(self, chord_id, result):
  307. raise NotImplementedError('Backend does not support add_to_chord')
  308. def on_chord_part_return(self, request, state, result, **kwargs):
  309. pass
  310. def fallback_chord_unlock(self, group_id, body, result=None,
  311. countdown=1, **kwargs):
  312. kwargs['result'] = [r.as_tuple() for r in result]
  313. self.app.tasks['celery.chord_unlock'].apply_async(
  314. (group_id, body,), kwargs, countdown=countdown,
  315. )
  316. def apply_chord(self, header, partial_args, group_id, body,
  317. options={}, **kwargs):
  318. fixed_options = {k: v for k, v in options.items() if k != 'task_id'}
  319. result = header(*partial_args, task_id=group_id, **fixed_options or {})
  320. self.fallback_chord_unlock(group_id, body, **kwargs)
  321. return result
  322. def current_task_children(self, request=None):
  323. request = request or getattr(current_task(), 'request', None)
  324. if request:
  325. return [r.as_tuple() for r in getattr(request, 'children', [])]
  326. def __reduce__(self, args=(), kwargs={}):
  327. return (unpickle_backend, (self.__class__, args, kwargs))
  328. class SyncBackendMixin:
  329. def iter_native(self, result, timeout=None, interval=0.5, no_ack=True,
  330. on_message=None, on_interval=None):
  331. self._ensure_not_eager()
  332. results = result.results
  333. if not results:
  334. return iter([])
  335. return self.get_many(
  336. {r.id for r in results},
  337. timeout=timeout, interval=interval, no_ack=no_ack,
  338. on_message=on_message, on_interval=on_interval,
  339. )
  340. def wait_for_pending(self, result, timeout=None, interval=0.5,
  341. no_ack=True, on_interval=None, callback=None,
  342. propagate=True):
  343. self._ensure_not_eager()
  344. meta = self.wait_for(
  345. result.id, timeout=timeout,
  346. interval=interval,
  347. on_interval=on_interval,
  348. no_ack=no_ack,
  349. )
  350. if meta:
  351. result._maybe_set_cache(meta)
  352. return result.maybe_throw(propagate=propagate, callback=callback)
  353. def wait_for(self, task_id,
  354. timeout=None, interval=0.5, no_ack=True, on_interval=None):
  355. """Wait for task and return its result.
  356. If the task raises an exception, this exception
  357. will be re-raised by :func:`wait_for`.
  358. Raises:
  359. celery.exceptions.TimeoutError:
  360. If `timeout` is not :const:`None`, and the operation
  361. takes longer than `timeout` seconds.
  362. """
  363. self._ensure_not_eager()
  364. time_elapsed = 0.0
  365. while 1:
  366. meta = self.get_task_meta(task_id)
  367. if meta['status'] in states.READY_STATES:
  368. return meta
  369. if on_interval:
  370. on_interval()
  371. # avoid hammering the CPU checking status.
  372. time.sleep(interval)
  373. time_elapsed += interval
  374. if timeout and time_elapsed >= timeout:
  375. raise TimeoutError('The operation timed out.')
  376. def add_pending_result(self, result, weak=False):
  377. return result
  378. def remove_pending_result(self, result):
  379. return result
  380. @property
  381. def is_async(self):
  382. return False
  383. class BaseBackend(Backend, SyncBackendMixin):
  384. pass
  385. BaseDictBackend = BaseBackend # XXX compat
  386. class BaseKeyValueStoreBackend(Backend):
  387. key_t = ensure_bytes
  388. task_keyprefix = 'celery-task-meta-'
  389. group_keyprefix = 'celery-taskset-meta-'
  390. chord_keyprefix = 'chord-unlock-'
  391. implements_incr = False
  392. def __init__(self, *args, **kwargs):
  393. if hasattr(self.key_t, '__func__'): # pragma: no cover
  394. self.key_t = self.key_t.__func__ # remove binding
  395. self._encode_prefixes()
  396. super(BaseKeyValueStoreBackend, self).__init__(*args, **kwargs)
  397. if self.implements_incr:
  398. self.apply_chord = self._apply_chord_incr
  399. def _encode_prefixes(self):
  400. self.task_keyprefix = self.key_t(self.task_keyprefix)
  401. self.group_keyprefix = self.key_t(self.group_keyprefix)
  402. self.chord_keyprefix = self.key_t(self.chord_keyprefix)
  403. def get(self, key):
  404. raise NotImplementedError('Must implement the get method.')
  405. def mget(self, keys):
  406. raise NotImplementedError('Does not support get_many')
  407. def set(self, key, value):
  408. raise NotImplementedError('Must implement the set method.')
  409. def delete(self, key):
  410. raise NotImplementedError('Must implement the delete method')
  411. def incr(self, key):
  412. raise NotImplementedError('Does not implement incr')
  413. def expire(self, key, value):
  414. pass
  415. def get_key_for_task(self, task_id, key=''):
  416. """Get the cache key for a task by id."""
  417. key_t = self.key_t
  418. return key_t('').join([
  419. self.task_keyprefix, key_t(task_id), key_t(key),
  420. ])
  421. def get_key_for_group(self, group_id, key=''):
  422. """Get the cache key for a group by id."""
  423. key_t = self.key_t
  424. return key_t('').join([
  425. self.group_keyprefix, key_t(group_id), key_t(key),
  426. ])
  427. def get_key_for_chord(self, group_id, key=''):
  428. """Get the cache key for the chord waiting on group with given id."""
  429. key_t = self.key_t
  430. return key_t('').join([
  431. self.chord_keyprefix, key_t(group_id), key_t(key),
  432. ])
  433. def _strip_prefix(self, key):
  434. """Takes bytes, emits string."""
  435. key = self.key_t(key)
  436. for prefix in self.task_keyprefix, self.group_keyprefix:
  437. if key.startswith(prefix):
  438. return bytes_to_str(key[len(prefix):])
  439. return bytes_to_str(key)
  440. def _filter_ready(self, values, READY_STATES=states.READY_STATES):
  441. for k, v in values:
  442. if v is not None:
  443. v = self.decode_result(v)
  444. if v['status'] in READY_STATES:
  445. yield k, v
  446. def _mget_to_results(self, values, keys):
  447. if hasattr(values, 'items'):
  448. # client returns dict so mapping preserved.
  449. return {
  450. self._strip_prefix(k): v
  451. for k, v in self._filter_ready(values.items())
  452. }
  453. else:
  454. # client returns list so need to recreate mapping.
  455. return {
  456. bytes_to_str(keys[i]): v
  457. for i, v in self._filter_ready(enumerate(values))
  458. }
  459. def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
  460. on_message=None, on_interval=None, max_iterations=None,
  461. READY_STATES=states.READY_STATES):
  462. interval = 0.5 if interval is None else interval
  463. ids = task_ids if isinstance(task_ids, set) else set(task_ids)
  464. cached_ids = set()
  465. cache = self._cache
  466. for task_id in ids:
  467. try:
  468. cached = cache[task_id]
  469. except KeyError:
  470. pass
  471. else:
  472. if cached['status'] in READY_STATES:
  473. yield bytes_to_str(task_id), cached
  474. cached_ids.add(task_id)
  475. ids.difference_update(cached_ids)
  476. iterations = 0
  477. while ids:
  478. keys = list(ids)
  479. r = self._mget_to_results(self.mget([self.get_key_for_task(k)
  480. for k in keys]), keys)
  481. cache.update(r)
  482. ids.difference_update({bytes_to_str(v) for v in r})
  483. for key, value in r.items():
  484. if on_message is not None:
  485. on_message(value)
  486. yield bytes_to_str(key), value
  487. if timeout and iterations * interval >= timeout:
  488. raise TimeoutError('Operation timed out ({0})'.format(timeout))
  489. if on_interval:
  490. on_interval()
  491. time.sleep(interval) # don't busy loop.
  492. iterations += 1
  493. if max_iterations and iterations >= max_iterations:
  494. break
  495. def _forget(self, task_id):
  496. self.delete(self.get_key_for_task(task_id))
  497. def _store_result(self, task_id, result, state,
  498. traceback=None, request=None, **kwargs):
  499. meta = {'status': state, 'result': result, 'traceback': traceback,
  500. 'children': self.current_task_children(request),
  501. 'task_id': bytes_to_str(task_id)}
  502. self.set(self.get_key_for_task(task_id), self.encode(meta))
  503. return result
  504. def _save_group(self, group_id, result):
  505. self.set(self.get_key_for_group(group_id),
  506. self.encode({'result': result.as_tuple()}))
  507. return result
  508. def _delete_group(self, group_id):
  509. self.delete(self.get_key_for_group(group_id))
  510. def _get_task_meta_for(self, task_id):
  511. """Get task meta-data for a task by id."""
  512. meta = self.get(self.get_key_for_task(task_id))
  513. if not meta:
  514. return {'status': states.PENDING, 'result': None}
  515. return self.decode_result(meta)
  516. def _restore_group(self, group_id):
  517. """Get task meta-data for a task by id."""
  518. meta = self.get(self.get_key_for_group(group_id))
  519. # previously this was always pickled, but later this
  520. # was extended to support other serializers, so the
  521. # structure is kind of weird.
  522. if meta:
  523. meta = self.decode(meta)
  524. result = meta['result']
  525. meta['result'] = result_from_tuple(result, self.app)
  526. return meta
  527. def _apply_chord_incr(self, header, partial_args, group_id, body,
  528. result=None, options={}, **kwargs):
  529. self.save_group(group_id, self.app.GroupResult(group_id, result))
  530. fixed_options = {k: v for k, v in options.items() if k != 'task_id'}
  531. return header(*partial_args, task_id=group_id, **fixed_options or {})
  532. def on_chord_part_return(self, request, state, result, **kwargs):
  533. if not self.implements_incr:
  534. return
  535. app = self.app
  536. gid = request.group
  537. if not gid:
  538. return
  539. key = self.get_key_for_chord(gid)
  540. try:
  541. deps = GroupResult.restore(gid, backend=self)
  542. except Exception as exc:
  543. callback = maybe_signature(request.chord, app=app)
  544. logger.error('Chord %r raised: %r', gid, exc, exc_info=1)
  545. return self.chord_error_from_stack(
  546. callback,
  547. ChordError('Cannot restore group: {0!r}'.format(exc)),
  548. )
  549. if deps is None:
  550. try:
  551. raise ValueError(gid)
  552. except ValueError as exc:
  553. callback = maybe_signature(request.chord, app=app)
  554. logger.error('Chord callback %r raised: %r', gid, exc,
  555. exc_info=1)
  556. return self.chord_error_from_stack(
  557. callback,
  558. ChordError('GroupResult {0} no longer exists'.format(gid)),
  559. )
  560. val = self.incr(key)
  561. size = len(deps)
  562. if val > size: # pragma: no cover
  563. logger.warning('Chord counter incremented too many times for %r',
  564. gid)
  565. elif val == size:
  566. callback = maybe_signature(request.chord, app=app)
  567. j = deps.join_native if deps.supports_native_join else deps.join
  568. try:
  569. with allow_join_result():
  570. ret = j(timeout=3.0, propagate=True)
  571. except Exception as exc:
  572. try:
  573. culprit = next(deps._failed_join_report())
  574. reason = 'Dependency {0.id} raised {1!r}'.format(
  575. culprit, exc,
  576. )
  577. except StopIteration:
  578. reason = repr(exc)
  579. logger.error('Chord %r raised: %r', gid, reason, exc_info=1)
  580. self.chord_error_from_stack(callback, ChordError(reason))
  581. else:
  582. try:
  583. callback.delay(ret)
  584. except Exception as exc:
  585. logger.error('Chord %r raised: %r', gid, exc, exc_info=1)
  586. self.chord_error_from_stack(
  587. callback,
  588. ChordError('Callback error: {0!r}'.format(exc)),
  589. )
  590. finally:
  591. deps.delete()
  592. self.client.delete(key)
  593. else:
  594. self.expire(key, 86400)
  595. class KeyValueStoreBackend(BaseKeyValueStoreBackend, SyncBackendMixin):
  596. pass
  597. class DisabledBackend(BaseBackend):
  598. _cache = {} # need this attribute to reset cache in tests.
  599. def store_result(self, *args, **kwargs):
  600. pass
  601. def _is_disabled(self, *args, **kwargs):
  602. raise NotImplementedError(
  603. 'No result backend configured. '
  604. 'Please see the documentation for more information.')
  605. def as_uri(self, *args, **kwargs):
  606. return 'disabled://'
  607. get_state = get_status = get_result = get_traceback = _is_disabled
  608. wait_for = get_many = _is_disabled