test_events.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import socket
  4. from mock import Mock
  5. from celery import events
  6. from celery.tests.utils import AppCase
  7. class MockProducer(object):
  8. raise_on_publish = False
  9. def __init__(self, *args, **kwargs):
  10. self.sent = []
  11. def publish(self, msg, *args, **kwargs):
  12. if self.raise_on_publish:
  13. raise KeyError()
  14. self.sent.append(msg)
  15. def close(self):
  16. pass
  17. def has_event(self, kind):
  18. for event in self.sent:
  19. if event['type'] == kind:
  20. return event
  21. return False
  22. class test_Event(AppCase):
  23. def test_constructor(self):
  24. event = events.Event('world war II')
  25. self.assertEqual(event['type'], 'world war II')
  26. self.assertTrue(event['timestamp'])
  27. class test_EventDispatcher(AppCase):
  28. def test_send(self):
  29. producer = MockProducer()
  30. eventer = self.app.events.Dispatcher(object(), enabled=False)
  31. eventer.publisher = producer
  32. eventer.enabled = True
  33. eventer.send('World War II', ended=True)
  34. self.assertTrue(producer.has_event('World War II'))
  35. eventer.enabled = False
  36. eventer.send('World War III')
  37. self.assertFalse(producer.has_event('World War III'))
  38. evs = ('Event 1', 'Event 2', 'Event 3')
  39. eventer.enabled = True
  40. eventer.publisher.raise_on_publish = True
  41. eventer.buffer_while_offline = False
  42. with self.assertRaises(KeyError):
  43. eventer.send('Event X')
  44. eventer.buffer_while_offline = True
  45. for ev in evs:
  46. eventer.send(ev)
  47. eventer.publisher.raise_on_publish = False
  48. eventer.flush()
  49. for ev in evs:
  50. self.assertTrue(producer.has_event(ev))
  51. buf = eventer._outbound_buffer = Mock()
  52. buf.popleft.side_effect = IndexError()
  53. eventer.flush()
  54. def test_enter_exit(self):
  55. with self.app.broker_connection() as conn:
  56. d = self.app.events.Dispatcher(conn)
  57. d.close = Mock()
  58. with d as _d:
  59. self.assertTrue(_d)
  60. d.close.assert_called_with()
  61. def test_enable_disable_callbacks(self):
  62. on_enable = Mock()
  63. on_disable = Mock()
  64. with self.app.broker_connection() as conn:
  65. with self.app.events.Dispatcher(conn, enabled=False) as d:
  66. d.on_enabled.add(on_enable)
  67. d.on_disabled.add(on_disable)
  68. d.enable()
  69. on_enable.assert_called_with()
  70. d.disable()
  71. on_disable.assert_called_with()
  72. def test_enabled_disable(self):
  73. connection = self.app.broker_connection()
  74. channel = connection.channel()
  75. try:
  76. dispatcher = self.app.events.Dispatcher(connection,
  77. enabled=True)
  78. dispatcher2 = self.app.events.Dispatcher(connection,
  79. enabled=True,
  80. channel=channel)
  81. self.assertTrue(dispatcher.enabled)
  82. self.assertTrue(dispatcher.publisher.channel)
  83. self.assertEqual(dispatcher.publisher.serializer,
  84. self.app.conf.CELERY_EVENT_SERIALIZER)
  85. created_channel = dispatcher.publisher.channel
  86. dispatcher.disable()
  87. dispatcher.disable() # Disable with no active publisher
  88. dispatcher2.disable()
  89. self.assertFalse(dispatcher.enabled)
  90. self.assertIsNone(dispatcher.publisher)
  91. self.assertFalse(dispatcher2.channel.closed,
  92. 'does not close manually provided channel')
  93. dispatcher.enable()
  94. self.assertTrue(dispatcher.enabled)
  95. self.assertTrue(dispatcher.publisher)
  96. finally:
  97. channel.close()
  98. connection.close()
  99. self.assertTrue(created_channel.closed)
  100. class test_EventReceiver(AppCase):
  101. def test_process(self):
  102. message = {'type': 'world-war'}
  103. got_event = [False]
  104. def my_handler(event):
  105. got_event[0] = True
  106. connection = Mock()
  107. connection.transport_cls = 'memory'
  108. r = events.EventReceiver(connection,
  109. handlers={'world-war': my_handler},
  110. node_id='celery.tests')
  111. r._receive(message, object())
  112. self.assertTrue(got_event[0])
  113. def test_catch_all_event(self):
  114. message = {'type': 'world-war'}
  115. got_event = [False]
  116. def my_handler(event):
  117. got_event[0] = True
  118. connection = Mock()
  119. connection.transport_cls = 'memory'
  120. r = events.EventReceiver(connection, node_id='celery.tests')
  121. events.EventReceiver.handlers['*'] = my_handler
  122. try:
  123. r._receive(message, object())
  124. self.assertTrue(got_event[0])
  125. finally:
  126. events.EventReceiver.handlers = {}
  127. def test_itercapture(self):
  128. connection = self.app.broker_connection()
  129. try:
  130. r = self.app.events.Receiver(connection, node_id='celery.tests')
  131. it = r.itercapture(timeout=0.0001, wakeup=False)
  132. consumer = it.next()
  133. self.assertTrue(consumer.queues)
  134. self.assertEqual(consumer.callbacks[0], r._receive)
  135. with self.assertRaises(socket.timeout):
  136. it.next()
  137. with self.assertRaises(socket.timeout):
  138. r.capture(timeout=0.00001)
  139. finally:
  140. connection.close()
  141. def test_itercapture_limit(self):
  142. connection = self.app.broker_connection()
  143. channel = connection.channel()
  144. try:
  145. events_received = [0]
  146. def handler(event):
  147. events_received[0] += 1
  148. producer = self.app.events.Dispatcher(connection,
  149. enabled=True,
  150. channel=channel)
  151. r = self.app.events.Receiver(connection,
  152. handlers={'*': handler},
  153. node_id='celery.tests')
  154. evs = ['ev1', 'ev2', 'ev3', 'ev4', 'ev5']
  155. for ev in evs:
  156. producer.send(ev)
  157. it = r.itercapture(limit=4, wakeup=True)
  158. it.next() # skip consumer (see itercapture)
  159. list(it)
  160. self.assertEqual(events_received[0], 4)
  161. finally:
  162. channel.close()
  163. connection.close()
  164. class test_misc(AppCase):
  165. def test_State(self):
  166. state = self.app.events.State()
  167. self.assertDictEqual(dict(state.workers), {})
  168. def test_default_dispatcher(self):
  169. with self.app.events.default_dispatcher() as d:
  170. self.assertTrue(d)
  171. self.assertTrue(d.connection)