|
@@ -1,5 +1,9 @@
|
|
from __future__ import absolute_import
|
|
from __future__ import absolute_import
|
|
|
|
|
|
|
|
+import datetime
|
|
|
|
+
|
|
|
|
+import pytz
|
|
|
|
+
|
|
from kombu import Exchange, Queue
|
|
from kombu import Exchange, Queue
|
|
|
|
|
|
from celery.app.amqp import Queues, TaskPublisher
|
|
from celery.app.amqp import Queues, TaskPublisher
|
|
@@ -47,6 +51,26 @@ class test_TaskProducer(AppCase):
|
|
self.assertEqual(prod.publish.call_args[1]['exchange'], 'yyy')
|
|
self.assertEqual(prod.publish.call_args[1]['exchange'], 'yyy')
|
|
self.assertEqual(prod.publish.call_args[1]['routing_key'], 'zzz')
|
|
self.assertEqual(prod.publish.call_args[1]['routing_key'], 'zzz')
|
|
|
|
|
|
|
|
+ def test_publish_with_countdown(self):
|
|
|
|
+ prod = self.app.amqp.TaskProducer(Mock())
|
|
|
|
+ prod.channel.connection.client.declared_entities = set()
|
|
|
|
+ prod.publish = Mock()
|
|
|
|
+ now = datetime.datetime(2013, 11, 26, 16, 48, 46)
|
|
|
|
+ prod.publish_task('tasks.add', (1, 1), {}, retry=False,
|
|
|
|
+ countdown=10, now=now)
|
|
|
|
+ self.assertEqual(prod.publish.call_args[0][0]['eta'], '2013-11-26T16:48:56+00:00')
|
|
|
|
+
|
|
|
|
+ def test_publish_with_countdown_and_timezone(self):
|
|
|
|
+ # use timezone with fixed offset to be sure it won't be changed
|
|
|
|
+ self.app.conf.CELERY_TIMEZONE = pytz.FixedOffset(120)
|
|
|
|
+ prod = self.app.amqp.TaskProducer(Mock())
|
|
|
|
+ prod.channel.connection.client.declared_entities = set()
|
|
|
|
+ prod.publish = Mock()
|
|
|
|
+ now = datetime.datetime(2013, 11, 26, 16, 48, 46)
|
|
|
|
+ prod.publish_task('tasks.add', (2, 2), {}, retry=False,
|
|
|
|
+ countdown=20, now=now)
|
|
|
|
+ self.assertEqual(prod.publish.call_args[0][0]['eta'], '2013-11-26T18:49:06+02:00')
|
|
|
|
+
|
|
def test_event_dispatcher(self):
|
|
def test_event_dispatcher(self):
|
|
prod = self.app.amqp.TaskProducer(Mock())
|
|
prod = self.app.amqp.TaskProducer(Mock())
|
|
self.assertTrue(prod.event_dispatcher)
|
|
self.assertTrue(prod.event_dispatcher)
|