base.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import warnings
  10. from collections import deque
  11. from contextlib import contextmanager
  12. from copy import deepcopy
  13. from functools import wraps
  14. from billiard.util import register_after_fork
  15. from kombu.clocks import LamportClock
  16. from kombu.utils import cached_property
  17. from celery import platforms
  18. from celery.exceptions import AlwaysEagerIgnored
  19. from celery.loaders import get_loader_cls
  20. from celery.local import PromiseProxy, maybe_evaluate
  21. from celery._state import _task_stack, _tls, get_current_app
  22. from celery.utils.functional import first
  23. from celery.utils.imports import instantiate, symbol_by_name
  24. from .annotations import prepare as prepare_annotations
  25. from .builtins import shared_task, load_shared_tasks
  26. from .defaults import DEFAULTS, find_deprecated_settings
  27. from .registry import TaskRegistry
  28. from .utils import AppPickler, Settings, bugreport, _unpickle_app
  29. def _unpickle_appattr(reverse_name, args):
  30. """Given an attribute name and a list of args, gets
  31. the attribute from the current app and calls it."""
  32. return get_current_app()._rgetattr(reverse_name)(*args)
  33. class Celery(object):
  34. Pickler = AppPickler
  35. SYSTEM = platforms.SYSTEM
  36. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  37. amqp_cls = 'celery.app.amqp:AMQP'
  38. backend_cls = None
  39. events_cls = 'celery.events:Events'
  40. loader_cls = 'celery.loaders.app:AppLoader'
  41. log_cls = 'celery.app.log:Logging'
  42. control_cls = 'celery.app.control:Control'
  43. registry_cls = TaskRegistry
  44. _pool = None
  45. def __init__(self, main=None, loader=None, backend=None,
  46. amqp=None, events=None, log=None, control=None,
  47. set_as_current=True, accept_magic_kwargs=False,
  48. tasks=None, broker=None, include=None, **kwargs):
  49. self.clock = LamportClock()
  50. self.main = main
  51. self.amqp_cls = amqp or self.amqp_cls
  52. self.backend_cls = backend or self.backend_cls
  53. self.events_cls = events or self.events_cls
  54. self.loader_cls = loader or self.loader_cls
  55. self.log_cls = log or self.log_cls
  56. self.control_cls = control or self.control_cls
  57. self.set_as_current = set_as_current
  58. self.registry_cls = symbol_by_name(self.registry_cls)
  59. self.accept_magic_kwargs = accept_magic_kwargs
  60. self.finalized = False
  61. self._pending = deque()
  62. self._tasks = tasks
  63. if not isinstance(self._tasks, TaskRegistry):
  64. self._tasks = TaskRegistry(self._tasks or {})
  65. # these options are moved to the config to
  66. # simplify pickling of the app object.
  67. self._preconf = {}
  68. if broker:
  69. self._preconf['BROKER_URL'] = broker
  70. if include:
  71. self._preconf['CELERY_IMPORTS'] = include
  72. if self.set_as_current:
  73. self.set_current()
  74. self.on_init()
  75. def set_current(self):
  76. _tls.current_app = self
  77. def on_init(self):
  78. """Optional callback called at init."""
  79. pass
  80. def start(self, argv=None):
  81. return instantiate('celery.bin.celery:CeleryCommand', app=self) \
  82. .execute_from_commandline(argv)
  83. def worker_main(self, argv=None):
  84. return instantiate('celery.bin.celeryd:WorkerCommand', app=self) \
  85. .execute_from_commandline(argv)
  86. def task(self, *args, **opts):
  87. """Creates new task class from any callable."""
  88. def inner_create_task_cls(shared=True, filter=None, **opts):
  89. def _create_task_cls(fun):
  90. if shared:
  91. cons = lambda app: app._task_from_fun(fun, **opts)
  92. cons.__name__ = fun.__name__
  93. shared_task(cons)
  94. if self.accept_magic_kwargs: # compat mode
  95. task = self._task_from_fun(fun, **opts)
  96. if filter:
  97. task = filter(task)
  98. return task
  99. # return a proxy object that is only evaluated when first used
  100. promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
  101. self._pending.append(promise)
  102. if filter:
  103. return filter(promise)
  104. return promise
  105. return _create_task_cls
  106. if len(args) == 1 and callable(args[0]):
  107. return inner_create_task_cls(**opts)(*args)
  108. return inner_create_task_cls(**opts)
  109. def _task_from_fun(self, fun, **options):
  110. base = options.pop('base', None) or self.Task
  111. T = type(fun.__name__, (base, ), dict({
  112. 'app': self,
  113. 'accept_magic_kwargs': False,
  114. 'run': staticmethod(fun),
  115. '__doc__': fun.__doc__,
  116. '__module__': fun.__module__}, **options))()
  117. task = self._tasks[T.name] # return global instance.
  118. task.bind(self)
  119. return task
  120. def finalize(self):
  121. if not self.finalized:
  122. self.finalized = True
  123. load_shared_tasks(self)
  124. pending = self._pending
  125. while pending:
  126. maybe_evaluate(pending.pop())
  127. for task in self._tasks.itervalues():
  128. task.bind(self)
  129. def config_from_object(self, obj, silent=False):
  130. del(self.conf)
  131. return self.loader.config_from_object(obj, silent=silent)
  132. def config_from_envvar(self, variable_name, silent=False):
  133. del(self.conf)
  134. return self.loader.config_from_envvar(variable_name, silent=silent)
  135. def config_from_cmdline(self, argv, namespace='celery'):
  136. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  137. def send_task(self, name, args=None, kwargs=None, countdown=None,
  138. eta=None, task_id=None, publisher=None, connection=None,
  139. result_cls=None, expires=None, queues=None, **options):
  140. if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  141. warnings.warn(AlwaysEagerIgnored(
  142. 'CELERY_ALWAYS_EAGER has no effect on send_task'))
  143. result_cls = result_cls or self.AsyncResult
  144. router = self.amqp.Router(queues)
  145. options.setdefault('compression',
  146. self.conf.CELERY_MESSAGE_COMPRESSION)
  147. options = router.route(options, name, args, kwargs)
  148. with self.default_producer(publisher) as producer:
  149. return result_cls(producer.publish_task(name, args, kwargs,
  150. task_id=task_id,
  151. countdown=countdown, eta=eta,
  152. expires=expires, **options))
  153. def connection(self, hostname=None, userid=None,
  154. password=None, virtual_host=None, port=None, ssl=None,
  155. insist=None, connect_timeout=None, transport=None,
  156. transport_options=None, **kwargs):
  157. conf = self.conf
  158. return self.amqp.Connection(
  159. hostname or conf.BROKER_HOST,
  160. userid or conf.BROKER_USER,
  161. password or conf.BROKER_PASSWORD,
  162. virtual_host or conf.BROKER_VHOST,
  163. port or conf.BROKER_PORT,
  164. transport=transport or conf.BROKER_TRANSPORT,
  165. insist=self.either('BROKER_INSIST', insist),
  166. ssl=self.either('BROKER_USE_SSL', ssl),
  167. connect_timeout=self.either(
  168. 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
  169. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  170. **transport_options or {}))
  171. broker_connection = connection
  172. @contextmanager
  173. def default_connection(self, connection=None, *args, **kwargs):
  174. if connection:
  175. yield connection
  176. else:
  177. with self.pool.acquire(block=True) as connection:
  178. yield connection
  179. @contextmanager
  180. def default_producer(self, producer=None):
  181. if producer:
  182. yield producer
  183. else:
  184. with self.amqp.producer_pool.acquire(block=True) as producer:
  185. yield producer
  186. def with_default_connection(self, fun):
  187. """With any function accepting a `connection`
  188. keyword argument, establishes a default connection if one is
  189. not already passed to it.
  190. Any automatically established connection will be closed after
  191. the function returns.
  192. **Deprecated**
  193. Use ``with app.default_connection(connection)`` instead.
  194. """
  195. @wraps(fun)
  196. def _inner(*args, **kwargs):
  197. connection = kwargs.pop('connection', None)
  198. with self.default_connection(connection) as c:
  199. return fun(*args, **dict(kwargs, connection=c))
  200. return _inner
  201. def prepare_config(self, c):
  202. """Prepare configuration before it is merged with the defaults."""
  203. return find_deprecated_settings(c)
  204. def now(self):
  205. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  206. def mail_admins(self, subject, body, fail_silently=False):
  207. if self.conf.ADMINS:
  208. to = [admin_email for _, admin_email in self.conf.ADMINS]
  209. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  210. sender=self.conf.SERVER_EMAIL,
  211. host=self.conf.EMAIL_HOST,
  212. port=self.conf.EMAIL_PORT,
  213. user=self.conf.EMAIL_HOST_USER,
  214. password=self.conf.EMAIL_HOST_PASSWORD,
  215. timeout=self.conf.EMAIL_TIMEOUT,
  216. use_ssl=self.conf.EMAIL_USE_SSL,
  217. use_tls=self.conf.EMAIL_USE_TLS)
  218. def select_queues(self, queues=None):
  219. return self.amqp.queues.select_subset(queues)
  220. def either(self, default_key, *values):
  221. """Fallback to the value of a configuration key if none of the
  222. `*values` are true."""
  223. return first(None, values) or self.conf.get(default_key)
  224. def bugreport(self):
  225. return bugreport(self)
  226. def _get_backend(self):
  227. from celery.backends import get_backend_by_url
  228. backend, url = get_backend_by_url(
  229. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  230. self.loader)
  231. return backend(app=self, url=url)
  232. def _get_config(self):
  233. s = Settings({}, [self.prepare_config(self.loader.conf),
  234. deepcopy(DEFAULTS)])
  235. if self._preconf:
  236. for key, value in self._preconf.iteritems():
  237. setattr(s, key, value)
  238. return s
  239. def _after_fork(self, obj_):
  240. if self._pool:
  241. self._pool.force_close_all()
  242. self._pool = None
  243. def create_task_cls(self):
  244. """Creates a base task class using default configuration
  245. taken from this app."""
  246. return self.subclass_with_self('celery.app.task:Task', name='Task',
  247. attribute='_app', abstract=True)
  248. def subclass_with_self(self, Class, name=None, attribute='app',
  249. reverse=None, **kw):
  250. """Subclass an app-compatible class by setting its app attribute
  251. to be this app instance.
  252. App-compatible means that the class has a class attribute that
  253. provides the default app it should use, e.g.
  254. ``class Foo: app = None``.
  255. :param Class: The app-compatible class to subclass.
  256. :keyword name: Custom name for the target class.
  257. :keyword attribute: Name of the attribute holding the app,
  258. default is 'app'.
  259. """
  260. Class = symbol_by_name(Class)
  261. reverse = reverse if reverse else Class.__name__
  262. def __reduce__(self):
  263. return _unpickle_appattr, (reverse, self.__reduce_args__())
  264. attrs = dict({attribute: self}, __module__=Class.__module__,
  265. __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
  266. return type(name or Class.__name__, (Class, ), attrs)
  267. def _rgetattr(self, path):
  268. return reduce(getattr, [self] + path.split('.'))
  269. def __repr__(self):
  270. return '<%s %s:0x%x>' % (self.__class__.__name__,
  271. self.main or '__main__', id(self), )
  272. def __reduce__(self):
  273. # Reduce only pickles the configuration changes,
  274. # so the default configuration doesn't have to be passed
  275. # between processes.
  276. return (_unpickle_app, (self.__class__, self.Pickler)
  277. + self.__reduce_args__())
  278. def __reduce_args__(self):
  279. return (self.main, self.conf.changes, self.loader_cls,
  280. self.backend_cls, self.amqp_cls, self.events_cls,
  281. self.log_cls, self.control_cls, self.accept_magic_kwargs)
  282. @cached_property
  283. def Worker(self):
  284. return self.subclass_with_self('celery.apps.worker:Worker')
  285. @cached_property
  286. def WorkController(self, **kwargs):
  287. return self.subclass_with_self('celery.worker:WorkController')
  288. @cached_property
  289. def Beat(self, **kwargs):
  290. return self.subclass_with_self('celery.apps.beat:Beat')
  291. @cached_property
  292. def TaskSet(self):
  293. return self.subclass_with_self('celery.task.sets:TaskSet')
  294. @cached_property
  295. def Task(self):
  296. return self.create_task_cls()
  297. @cached_property
  298. def annotations(self):
  299. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  300. @cached_property
  301. def AsyncResult(self):
  302. return self.subclass_with_self('celery.result:AsyncResult')
  303. @cached_property
  304. def GroupResult(self):
  305. return self.subclass_with_self('celery.result:GroupResult')
  306. @cached_property
  307. def TaskSetResult(self): # XXX compat
  308. return self.subclass_with_self('celery.result:TaskSetResult')
  309. @property
  310. def pool(self):
  311. if self._pool is None:
  312. register_after_fork(self, self._after_fork)
  313. limit = self.conf.BROKER_POOL_LIMIT
  314. self._pool = self.connection().Pool(limit=limit)
  315. return self._pool
  316. @property
  317. def current_task(self):
  318. return _task_stack.top
  319. @cached_property
  320. def amqp(self):
  321. return instantiate(self.amqp_cls, app=self)
  322. @cached_property
  323. def backend(self):
  324. return self._get_backend()
  325. @cached_property
  326. def conf(self):
  327. return self._get_config()
  328. @cached_property
  329. def control(self):
  330. return instantiate(self.control_cls, app=self)
  331. @cached_property
  332. def events(self):
  333. return instantiate(self.events_cls, app=self)
  334. @cached_property
  335. def loader(self):
  336. return get_loader_cls(self.loader_cls)(app=self)
  337. @cached_property
  338. def log(self):
  339. return instantiate(self.log_cls, app=self)
  340. @cached_property
  341. def tasks(self):
  342. self.finalize()
  343. return self._tasks
  344. App = Celery # compat