test_events.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. from __future__ import absolute_import
  2. import socket
  3. from mock import Mock
  4. from celery import events
  5. from celery.tests.utils import AppCase
  6. class MockProducer(object):
  7. raise_on_publish = False
  8. def __init__(self, *args, **kwargs):
  9. self.sent = []
  10. def publish(self, msg, *args, **kwargs):
  11. if self.raise_on_publish:
  12. raise KeyError()
  13. self.sent.append(msg)
  14. def close(self):
  15. pass
  16. def has_event(self, kind):
  17. for event in self.sent:
  18. if event['type'] == kind:
  19. return event
  20. return False
  21. class test_Event(AppCase):
  22. def test_constructor(self):
  23. event = events.Event('world war II')
  24. self.assertEqual(event['type'], 'world war II')
  25. self.assertTrue(event['timestamp'])
  26. class test_EventDispatcher(AppCase):
  27. def test_send(self):
  28. producer = MockProducer()
  29. eventer = self.app.events.Dispatcher(object(), enabled=False)
  30. eventer.publisher = producer
  31. eventer.enabled = True
  32. eventer.send('World War II', ended=True)
  33. self.assertTrue(producer.has_event('World War II'))
  34. eventer.enabled = False
  35. eventer.send('World War III')
  36. self.assertFalse(producer.has_event('World War III'))
  37. evs = ('Event 1', 'Event 2', 'Event 3')
  38. eventer.enabled = True
  39. eventer.publisher.raise_on_publish = True
  40. eventer.buffer_while_offline = False
  41. with self.assertRaises(KeyError):
  42. eventer.send('Event X')
  43. eventer.buffer_while_offline = True
  44. for ev in evs:
  45. eventer.send(ev)
  46. eventer.publisher.raise_on_publish = False
  47. eventer.flush()
  48. for ev in evs:
  49. self.assertTrue(producer.has_event(ev))
  50. buf = eventer._outbound_buffer = Mock()
  51. buf.popleft.side_effect = IndexError()
  52. eventer.flush()
  53. def test_enter_exit(self):
  54. with self.app.connection() as conn:
  55. d = self.app.events.Dispatcher(conn)
  56. d.close = Mock()
  57. with d as _d:
  58. self.assertTrue(_d)
  59. d.close.assert_called_with()
  60. def test_enable_disable_callbacks(self):
  61. on_enable = Mock()
  62. on_disable = Mock()
  63. with self.app.connection() as conn:
  64. with self.app.events.Dispatcher(conn, enabled=False) as d:
  65. d.on_enabled.add(on_enable)
  66. d.on_disabled.add(on_disable)
  67. d.enable()
  68. on_enable.assert_called_with()
  69. d.disable()
  70. on_disable.assert_called_with()
  71. def test_enabled_disable(self):
  72. connection = self.app.connection()
  73. channel = connection.channel()
  74. try:
  75. dispatcher = self.app.events.Dispatcher(connection,
  76. enabled=True)
  77. dispatcher2 = self.app.events.Dispatcher(connection,
  78. enabled=True,
  79. channel=channel)
  80. self.assertTrue(dispatcher.enabled)
  81. self.assertTrue(dispatcher.publisher.channel)
  82. self.assertEqual(dispatcher.publisher.serializer,
  83. self.app.conf.CELERY_EVENT_SERIALIZER)
  84. created_channel = dispatcher.publisher.channel
  85. dispatcher.disable()
  86. dispatcher.disable() # Disable with no active publisher
  87. dispatcher2.disable()
  88. self.assertFalse(dispatcher.enabled)
  89. self.assertIsNone(dispatcher.publisher)
  90. self.assertFalse(dispatcher2.channel.closed,
  91. 'does not close manually provided channel')
  92. dispatcher.enable()
  93. self.assertTrue(dispatcher.enabled)
  94. self.assertTrue(dispatcher.publisher)
  95. finally:
  96. channel.close()
  97. connection.close()
  98. self.assertTrue(created_channel.closed)
  99. class test_EventReceiver(AppCase):
  100. def test_process(self):
  101. message = {'type': 'world-war'}
  102. got_event = [False]
  103. def my_handler(event):
  104. got_event[0] = True
  105. connection = Mock()
  106. connection.transport_cls = 'memory'
  107. r = events.EventReceiver(connection,
  108. handlers={'world-war': my_handler},
  109. node_id='celery.tests')
  110. r._receive(message, object())
  111. self.assertTrue(got_event[0])
  112. def test_catch_all_event(self):
  113. message = {'type': 'world-war'}
  114. got_event = [False]
  115. def my_handler(event):
  116. got_event[0] = True
  117. connection = Mock()
  118. connection.transport_cls = 'memory'
  119. r = events.EventReceiver(connection, node_id='celery.tests')
  120. events.EventReceiver.handlers['*'] = my_handler
  121. try:
  122. r._receive(message, object())
  123. self.assertTrue(got_event[0])
  124. finally:
  125. events.EventReceiver.handlers = {}
  126. def test_itercapture(self):
  127. connection = self.app.connection()
  128. try:
  129. r = self.app.events.Receiver(connection, node_id='celery.tests')
  130. it = r.itercapture(timeout=0.0001, wakeup=False)
  131. consumer = it.next()
  132. self.assertTrue(consumer.queues)
  133. self.assertEqual(consumer.callbacks[0], r._receive)
  134. with self.assertRaises(socket.timeout):
  135. it.next()
  136. with self.assertRaises(socket.timeout):
  137. r.capture(timeout=0.00001)
  138. finally:
  139. connection.close()
  140. def test_itercapture_limit(self):
  141. connection = self.app.connection()
  142. channel = connection.channel()
  143. try:
  144. events_received = [0]
  145. def handler(event):
  146. events_received[0] += 1
  147. producer = self.app.events.Dispatcher(connection,
  148. enabled=True,
  149. channel=channel)
  150. r = self.app.events.Receiver(connection,
  151. handlers={'*': handler},
  152. node_id='celery.tests')
  153. evs = ['ev1', 'ev2', 'ev3', 'ev4', 'ev5']
  154. for ev in evs:
  155. producer.send(ev)
  156. it = r.itercapture(limit=4, wakeup=True)
  157. it.next() # skip consumer (see itercapture)
  158. list(it)
  159. self.assertEqual(events_received[0], 4)
  160. finally:
  161. channel.close()
  162. connection.close()
  163. class test_misc(AppCase):
  164. def test_State(self):
  165. state = self.app.events.State()
  166. self.assertDictEqual(dict(state.workers), {})
  167. def test_default_dispatcher(self):
  168. with self.app.events.default_dispatcher() as d:
  169. self.assertTrue(d)
  170. self.assertTrue(d.connection)