_state.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery._state
  4. ~~~~~~~~~~~~~~~
  5. This is an internal module containing thread state
  6. like the ``current_app``, and ``current_task``.
  7. This module shouldn't be used directly.
  8. """
  9. from __future__ import absolute_import
  10. import threading
  11. import weakref
  12. from celery.local import Proxy
  13. from celery.utils.threads import LocalStack
  14. default_app = None
  15. class _TLS(threading.local):
  16. #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  17. #: sets this, so it will always contain the last instantiated app,
  18. #: and is the default app returned by :func:`app_or_default`.
  19. current_app = None
  20. _tls = _TLS()
  21. _task_stack = LocalStack()
  22. def set_default_app(app):
  23. global default_app
  24. if default_app is None:
  25. default_app = app
  26. def get_current_app():
  27. if default_app is None:
  28. # creates the default app, but we want to defer that.
  29. import celery.app # noqa
  30. return _tls.current_app or default_app
  31. def get_current_task():
  32. """Currently executing task."""
  33. return _task_stack.top
  34. def get_current_worker_task():
  35. """Currently executing task, that was applied by the worker.
  36. This is used to differentiate between the actual task
  37. executed by the worker and any task that was called within
  38. a task (using ``task.__call__`` or ``task.apply``)
  39. """
  40. for task in reversed(_task_stack.stack):
  41. if not task.request.called_directly:
  42. return task
  43. current_app = Proxy(get_current_app)
  44. current_task = Proxy(get_current_task)
  45. #: WeakSet does not seem to work properly,
  46. #: it doesn't recognize when objects go out of scope.
  47. _apps = set()
  48. def _register_app(app):
  49. _apps.add(weakref.ref(app))
  50. def _get_active_apps():
  51. dirty = []
  52. try:
  53. for appref in _apps:
  54. app = appref()
  55. if app is None:
  56. dirty.append(appref)
  57. else:
  58. yield app
  59. finally:
  60. while dirty:
  61. _apps.discard(dirty.pop())