test_events.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. from __future__ import absolute_import
  2. import socket
  3. from mock import Mock
  4. from celery import Celery
  5. from celery import events
  6. from celery.tests.case import AppCase
  7. class MockProducer(object):
  8. raise_on_publish = False
  9. def __init__(self, *args, **kwargs):
  10. self.sent = []
  11. def publish(self, msg, *args, **kwargs):
  12. if self.raise_on_publish:
  13. raise KeyError()
  14. self.sent.append(msg)
  15. def close(self):
  16. pass
  17. def has_event(self, kind):
  18. for event in self.sent:
  19. if event['type'] == kind:
  20. return event
  21. return False
  22. class test_Event(AppCase):
  23. def test_constructor(self):
  24. event = events.Event('world war II')
  25. self.assertEqual(event['type'], 'world war II')
  26. self.assertTrue(event['timestamp'])
  27. class test_EventDispatcher(AppCase):
  28. def test_redis_uses_fanout_exchange(self):
  29. with Celery(set_as_current=False) as app:
  30. app.connection = Mock()
  31. conn = app.connection.return_value = Mock()
  32. conn.transport.driver_type = 'redis'
  33. dispatcher = app.events.Dispatcher(conn, enabled=False)
  34. self.assertEqual(dispatcher.exchange.type, 'fanout')
  35. def test_others_use_topic_exchange(self):
  36. with Celery(set_as_current=False) as app:
  37. app.connection = Mock()
  38. conn = app.connection.return_value = Mock()
  39. conn.transport.driver_type = 'amqp'
  40. dispatcher = app.events.Dispatcher(conn, enabled=False)
  41. self.assertEqual(dispatcher.exchange.type, 'topic')
  42. def test_send(self):
  43. producer = MockProducer()
  44. producer.connection = self.app.connection()
  45. connection = Mock()
  46. connection.transport.driver_type = 'amqp'
  47. eventer = self.app.events.Dispatcher(connection, enabled=False,
  48. buffer_while_offline=False)
  49. eventer.producer = producer
  50. eventer.enabled = True
  51. eventer.send('World War II', ended=True)
  52. self.assertTrue(producer.has_event('World War II'))
  53. eventer.enabled = False
  54. eventer.send('World War III')
  55. self.assertFalse(producer.has_event('World War III'))
  56. evs = ('Event 1', 'Event 2', 'Event 3')
  57. eventer.enabled = True
  58. eventer.producer.raise_on_publish = True
  59. eventer.buffer_while_offline = False
  60. with self.assertRaises(KeyError):
  61. eventer.send('Event X')
  62. eventer.buffer_while_offline = True
  63. for ev in evs:
  64. eventer.send(ev)
  65. eventer.producer.raise_on_publish = False
  66. eventer.flush()
  67. for ev in evs:
  68. self.assertTrue(producer.has_event(ev))
  69. buf = eventer._outbound_buffer = Mock()
  70. buf.popleft.side_effect = IndexError()
  71. eventer.flush()
  72. def test_enter_exit(self):
  73. with self.app.connection() as conn:
  74. d = self.app.events.Dispatcher(conn)
  75. d.close = Mock()
  76. with d as _d:
  77. self.assertTrue(_d)
  78. d.close.assert_called_with()
  79. def test_enable_disable_callbacks(self):
  80. on_enable = Mock()
  81. on_disable = Mock()
  82. with self.app.connection() as conn:
  83. with self.app.events.Dispatcher(conn, enabled=False) as d:
  84. d.on_enabled.add(on_enable)
  85. d.on_disabled.add(on_disable)
  86. d.enable()
  87. on_enable.assert_called_with()
  88. d.disable()
  89. on_disable.assert_called_with()
  90. def test_enabled_disable(self):
  91. connection = self.app.connection()
  92. channel = connection.channel()
  93. try:
  94. dispatcher = self.app.events.Dispatcher(connection,
  95. enabled=True)
  96. dispatcher2 = self.app.events.Dispatcher(connection,
  97. enabled=True,
  98. channel=channel)
  99. self.assertTrue(dispatcher.enabled)
  100. self.assertTrue(dispatcher.producer.channel)
  101. self.assertEqual(dispatcher.producer.serializer,
  102. self.app.conf.CELERY_EVENT_SERIALIZER)
  103. created_channel = dispatcher.producer.channel
  104. dispatcher.disable()
  105. dispatcher.disable() # Disable with no active producer
  106. dispatcher2.disable()
  107. self.assertFalse(dispatcher.enabled)
  108. self.assertIsNone(dispatcher.producer)
  109. self.assertFalse(dispatcher2.channel.closed,
  110. 'does not close manually provided channel')
  111. dispatcher.enable()
  112. self.assertTrue(dispatcher.enabled)
  113. self.assertTrue(dispatcher.producer)
  114. # XXX test compat attribute
  115. self.assertIs(dispatcher.publisher, dispatcher.producer)
  116. prev, dispatcher.publisher = dispatcher.producer, 42
  117. try:
  118. self.assertEqual(dispatcher.producer, 42)
  119. finally:
  120. dispatcher.producer = prev
  121. finally:
  122. channel.close()
  123. connection.close()
  124. self.assertTrue(created_channel.closed)
  125. class test_EventReceiver(AppCase):
  126. def test_process(self):
  127. message = {'type': 'world-war'}
  128. got_event = [False]
  129. def my_handler(event):
  130. got_event[0] = True
  131. connection = Mock()
  132. connection.transport_cls = 'memory'
  133. r = events.EventReceiver(connection,
  134. handlers={'world-war': my_handler},
  135. node_id='celery.tests')
  136. r._receive(message, object())
  137. self.assertTrue(got_event[0])
  138. def test_catch_all_event(self):
  139. message = {'type': 'world-war'}
  140. got_event = [False]
  141. def my_handler(event):
  142. got_event[0] = True
  143. connection = Mock()
  144. connection.transport_cls = 'memory'
  145. r = events.EventReceiver(connection, node_id='celery.tests')
  146. r.handlers['*'] = my_handler
  147. r._receive(message, object())
  148. self.assertTrue(got_event[0])
  149. def test_itercapture(self):
  150. connection = self.app.connection()
  151. try:
  152. r = self.app.events.Receiver(connection, node_id='celery.tests')
  153. it = r.itercapture(timeout=0.0001, wakeup=False)
  154. with self.assertRaises(socket.timeout):
  155. next(it)
  156. with self.assertRaises(socket.timeout):
  157. r.capture(timeout=0.00001)
  158. finally:
  159. connection.close()
  160. def test_itercapture_limit(self):
  161. connection = self.app.connection()
  162. channel = connection.channel()
  163. try:
  164. events_received = [0]
  165. def handler(event):
  166. events_received[0] += 1
  167. producer = self.app.events.Dispatcher(connection,
  168. enabled=True,
  169. channel=channel)
  170. r = self.app.events.Receiver(connection,
  171. handlers={'*': handler},
  172. node_id='celery.tests')
  173. evs = ['ev1', 'ev2', 'ev3', 'ev4', 'ev5']
  174. for ev in evs:
  175. producer.send(ev)
  176. it = r.itercapture(limit=4, wakeup=True)
  177. next(it) # skip consumer (see itercapture)
  178. list(it)
  179. self.assertEqual(events_received[0], 4)
  180. finally:
  181. channel.close()
  182. connection.close()
  183. class test_misc(AppCase):
  184. def test_State(self):
  185. state = self.app.events.State()
  186. self.assertDictEqual(dict(state.workers), {})
  187. def test_default_dispatcher(self):
  188. with self.app.events.default_dispatcher() as d:
  189. self.assertTrue(d)
  190. self.assertTrue(d.connection)