|
@@ -331,6 +331,31 @@ class TestCeleryTasks(unittest.TestCase):
|
|
|
publisher = t1.get_publisher()
|
|
|
self.assertTrue(publisher.exchange)
|
|
|
|
|
|
+ def test_send_task_sent_event(self):
|
|
|
+ T1 = self.createTaskCls("T1", "c.unittest.t.t1")
|
|
|
+ conn = T1.app.broker_connection()
|
|
|
+ chan = conn.channel()
|
|
|
+ prev = T1.app.conf.CELERY_SEND_TASK_SENT_EVENT
|
|
|
+ T1.app.conf.CELERY_SEND_TASK_SENT_EVENT = True
|
|
|
+ dispatcher = [None]
|
|
|
+
|
|
|
+ class Pub(object):
|
|
|
+ channel = chan
|
|
|
+
|
|
|
+ def delay_task(self, *args, **kwargs):
|
|
|
+ dispatcher[0] = kwargs.get("event_dispatcher")
|
|
|
+
|
|
|
+ try:
|
|
|
+ T1.apply_async(publisher=Pub())
|
|
|
+ finally:
|
|
|
+ T1.app.conf.CELERY_SEND_TASK_SENT_EVENT = False
|
|
|
+ chan.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ self.assertTrue(dispatcher[0])
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
def test_get_publisher(self):
|
|
|
from celery.app import amqp
|
|
|
old_pub = amqp.TaskPublisher
|
|
@@ -357,6 +382,19 @@ class TestCeleryTasks(unittest.TestCase):
|
|
|
self.assertEqual(yyy.AsyncResult(tid).status, "FROBULATING")
|
|
|
self.assertDictEqual(yyy.AsyncResult(tid).result, {"fooz": "baaz"})
|
|
|
|
|
|
+ yyy.request.id = tid
|
|
|
+ yyy.update_state(state="FROBUZATING", meta={"fooz": "baaz"})
|
|
|
+ self.assertEqual(yyy.AsyncResult(tid).status, "FROBUZATING")
|
|
|
+ self.assertDictEqual(yyy.AsyncResult(tid).result, {"fooz": "baaz"})
|
|
|
+
|
|
|
+ def test_repr(self):
|
|
|
+
|
|
|
+ @task_dec
|
|
|
+ def task_test_repr():
|
|
|
+ pass
|
|
|
+
|
|
|
+ self.assertIn("task_test_repr", repr(task_test_repr))
|
|
|
+
|
|
|
def test_has___name__(self):
|
|
|
|
|
|
@task_dec
|
|
@@ -372,6 +410,11 @@ class TestCeleryTasks(unittest.TestCase):
|
|
|
logger = t1.get_logger(logfile=logfh, loglevel=0)
|
|
|
self.assertTrue(logger)
|
|
|
|
|
|
+ T1.request.loglevel = 3
|
|
|
+ logger = t1.get_logger(logfile=logfh, loglevel=None)
|
|
|
+ self.assertTrue(logger)
|
|
|
+
|
|
|
+
|
|
|
|
|
|
class TestTaskSet(unittest.TestCase):
|
|
|
|