|
@@ -223,19 +223,8 @@ class AMQP(object):
|
|
|
return self.task_protocols[self.app.conf.CELERY_TASK_PROTOCOL]
|
|
|
|
|
|
@cached_property
|
|
|
- def _task_retry(self):
|
|
|
- return self.app.conf.CELERY_TASK_PUBLISH_RETRY
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def _task_retry_policy(self):
|
|
|
- return self.app.conf.CELERY_TASK_PUBLISH_RETRY_POLICY
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def _task_sent_event(self):
|
|
|
- return self.app.conf.CELERY_SEND_TASK_SENT_EVENT
|
|
|
-
|
|
|
- def flush_routes(self):
|
|
|
- self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
|
|
|
+ def send_task_message(self):
|
|
|
+ return self._create_task_sender()
|
|
|
|
|
|
def Queues(self, queues, create_missing=None, ha_policy=None,
|
|
|
autoexchange=None):
|
|
@@ -263,6 +252,9 @@ class AMQP(object):
|
|
|
self.app.either('CELERY_CREATE_MISSING_QUEUES',
|
|
|
create_missing), app=self.app)
|
|
|
|
|
|
+ def flush_routes(self):
|
|
|
+ self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
|
|
|
+
|
|
|
def TaskConsumer(self, channel, queues=None, accept=None, **kw):
|
|
|
if accept is None:
|
|
|
accept = self.app.conf.CELERY_ACCEPT_CONTENT
|
|
@@ -272,45 +264,6 @@ class AMQP(object):
|
|
|
**kw
|
|
|
)
|
|
|
|
|
|
- @cached_property
|
|
|
- def default_queue(self):
|
|
|
- return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def queues(self):
|
|
|
- """Queue name⇒ declaration mapping."""
|
|
|
- return self.Queues(self.app.conf.CELERY_QUEUES)
|
|
|
-
|
|
|
- @queues.setter # noqa
|
|
|
- def queues(self, queues):
|
|
|
- return self.Queues(queues)
|
|
|
-
|
|
|
- @property
|
|
|
- def routes(self):
|
|
|
- if self._rtable is None:
|
|
|
- self.flush_routes()
|
|
|
- return self._rtable
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def router(self):
|
|
|
- return self.Router()
|
|
|
-
|
|
|
- @property
|
|
|
- def producer_pool(self):
|
|
|
- if self._producer_pool is None:
|
|
|
- self._producer_pool = ProducerPool(
|
|
|
- self.app.pool,
|
|
|
- limit=self.app.pool.limit,
|
|
|
- Producer=self.Producer,
|
|
|
- )
|
|
|
- return self._producer_pool
|
|
|
- publisher_pool = producer_pool # compat alias
|
|
|
-
|
|
|
- @cached_property
|
|
|
- def default_exchange(self):
|
|
|
- return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
|
|
|
- self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
|
|
|
-
|
|
|
def as_task_v2(self, task_id, name, args=None, kwargs=None,
|
|
|
countdown=None, eta=None, group_id=None,
|
|
|
expires=None, retries=0, chord=None,
|
|
@@ -519,8 +472,43 @@ class AMQP(object):
|
|
|
return publish_task
|
|
|
|
|
|
@cached_property
|
|
|
- def send_task_message(self):
|
|
|
- return self._create_task_sender()
|
|
|
+ def default_queue(self):
|
|
|
+ return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def queues(self):
|
|
|
+ """Queue name⇒ declaration mapping."""
|
|
|
+ return self.Queues(self.app.conf.CELERY_QUEUES)
|
|
|
+
|
|
|
+ @queues.setter # noqa
|
|
|
+ def queues(self, queues):
|
|
|
+ return self.Queues(queues)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def routes(self):
|
|
|
+ if self._rtable is None:
|
|
|
+ self.flush_routes()
|
|
|
+ return self._rtable
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def router(self):
|
|
|
+ return self.Router()
|
|
|
+
|
|
|
+ @property
|
|
|
+ def producer_pool(self):
|
|
|
+ if self._producer_pool is None:
|
|
|
+ self._producer_pool = ProducerPool(
|
|
|
+ self.app.pool,
|
|
|
+ limit=self.app.pool.limit,
|
|
|
+ Producer=self.Producer,
|
|
|
+ )
|
|
|
+ return self._producer_pool
|
|
|
+ publisher_pool = producer_pool # compat alias
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def default_exchange(self):
|
|
|
+ return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
|
|
|
+ self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
|
|
|
|
|
|
@cached_property
|
|
|
def utc(self):
|