test_events.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. import pytest
  2. import socket
  3. from case import Mock, call
  4. from celery.events import Event
  5. from celery.events.receiver import CLIENT_CLOCK_SKEW
  6. class MockProducer:
  7. raise_on_publish = False
  8. def __init__(self, *args, **kwargs):
  9. self.sent = []
  10. def publish(self, msg, *args, **kwargs):
  11. if self.raise_on_publish:
  12. raise KeyError()
  13. self.sent.append(msg)
  14. def close(self):
  15. pass
  16. def has_event(self, kind):
  17. for event in self.sent:
  18. if event['type'] == kind:
  19. return event
  20. return False
  21. def test_Event():
  22. event = Event('world war II')
  23. assert event['type'] == 'world war II'
  24. assert event['timestamp']
  25. class test_EventDispatcher:
  26. def test_redis_uses_fanout_exchange(self):
  27. self.app.connection = Mock()
  28. conn = self.app.connection.return_value = Mock()
  29. conn.transport.driver_type = 'redis'
  30. dispatcher = self.app.events.Dispatcher(conn, enabled=False)
  31. assert dispatcher.exchange.type == 'fanout'
  32. def test_others_use_topic_exchange(self):
  33. self.app.connection = Mock()
  34. conn = self.app.connection.return_value = Mock()
  35. conn.transport.driver_type = 'amqp'
  36. dispatcher = self.app.events.Dispatcher(conn, enabled=False)
  37. assert dispatcher.exchange.type == 'topic'
  38. def test_takes_channel_connection(self):
  39. x = self.app.events.Dispatcher(channel=Mock())
  40. assert x.connection is x.channel.connection.client
  41. def test_sql_transports_disabled(self):
  42. conn = Mock()
  43. conn.transport.driver_type = 'sql'
  44. x = self.app.events.Dispatcher(connection=conn)
  45. assert not x.enabled
  46. def test_send(self):
  47. producer = MockProducer()
  48. producer.connection = self.app.connection_for_write()
  49. connection = Mock()
  50. connection.transport.driver_type = 'amqp'
  51. eventer = self.app.events.Dispatcher(connection, enabled=False,
  52. buffer_while_offline=False)
  53. eventer.producer = producer
  54. eventer.enabled = True
  55. eventer.send('World War II', ended=True)
  56. assert producer.has_event('World War II')
  57. eventer.enabled = False
  58. eventer.send('World War III')
  59. assert not producer.has_event('World War III')
  60. evs = ('Event 1', 'Event 2', 'Event 3')
  61. eventer.enabled = True
  62. eventer.producer.raise_on_publish = True
  63. eventer.buffer_while_offline = False
  64. with pytest.raises(KeyError):
  65. eventer.send('Event X')
  66. eventer.buffer_while_offline = True
  67. for ev in evs:
  68. eventer.send(ev)
  69. eventer.producer.raise_on_publish = False
  70. eventer.flush()
  71. for ev in evs:
  72. assert producer.has_event(ev)
  73. eventer.flush()
  74. def test_send_buffer_group(self):
  75. buf_received = [None]
  76. producer = MockProducer()
  77. producer.connection = self.app.connection_for_write()
  78. connection = Mock()
  79. connection.transport.driver_type = 'amqp'
  80. eventer = self.app.events.Dispatcher(
  81. connection, enabled=False,
  82. buffer_group={'task'}, buffer_limit=2,
  83. )
  84. eventer.producer = producer
  85. eventer.enabled = True
  86. eventer._publish = Mock(name='_publish')
  87. def on_eventer_publish(events, *args, **kwargs):
  88. buf_received[0] = list(events)
  89. eventer._publish.side_effect = on_eventer_publish
  90. assert not eventer._group_buffer['task']
  91. eventer.on_send_buffered = Mock(name='on_send_buffered')
  92. eventer.send('task-received', uuid=1)
  93. prev_buffer = eventer._group_buffer['task']
  94. assert eventer._group_buffer['task']
  95. eventer.on_send_buffered.assert_called_with()
  96. eventer.send('task-received', uuid=1)
  97. assert not eventer._group_buffer['task']
  98. eventer._publish.assert_has_calls([
  99. call([], eventer.producer, 'task.multi'),
  100. ])
  101. # clear in place
  102. assert eventer._group_buffer['task'] is prev_buffer
  103. assert len(buf_received[0]) == 2
  104. eventer.on_send_buffered = None
  105. eventer.send('task-received', uuid=1)
  106. def test_flush_no_groups_no_errors(self):
  107. eventer = self.app.events.Dispatcher(Mock())
  108. eventer.flush(errors=False, groups=False)
  109. def test_enter_exit(self):
  110. with self.app.connection_for_write() as conn:
  111. d = self.app.events.Dispatcher(conn)
  112. d.close = Mock()
  113. with d as _d:
  114. assert _d
  115. d.close.assert_called_with()
  116. def test_enable_disable_callbacks(self):
  117. on_enable = Mock()
  118. on_disable = Mock()
  119. with self.app.connection_for_write() as conn:
  120. with self.app.events.Dispatcher(conn, enabled=False) as d:
  121. d.on_enabled.add(on_enable)
  122. d.on_disabled.add(on_disable)
  123. d.enable()
  124. on_enable.assert_called_with()
  125. d.disable()
  126. on_disable.assert_called_with()
  127. def test_enabled_disable(self):
  128. connection = self.app.connection_for_write()
  129. channel = connection.channel()
  130. try:
  131. dispatcher = self.app.events.Dispatcher(connection,
  132. enabled=True)
  133. dispatcher2 = self.app.events.Dispatcher(connection,
  134. enabled=True,
  135. channel=channel)
  136. assert dispatcher.enabled
  137. assert dispatcher.producer.channel
  138. assert (dispatcher.producer.serializer ==
  139. self.app.conf.event_serializer)
  140. created_channel = dispatcher.producer.channel
  141. dispatcher.disable()
  142. dispatcher.disable() # Disable with no active producer
  143. dispatcher2.disable()
  144. assert not dispatcher.enabled
  145. assert dispatcher.producer is None
  146. # does not close manually provided channel
  147. assert not dispatcher2.channel.closed
  148. dispatcher.enable()
  149. assert dispatcher.enabled
  150. assert dispatcher.producer
  151. finally:
  152. channel.close()
  153. connection.close()
  154. assert created_channel.closed
  155. class test_EventReceiver:
  156. def test_process(self):
  157. message = {'type': 'world-war'}
  158. got_event = [False]
  159. def my_handler(event):
  160. got_event[0] = True
  161. connection = Mock()
  162. connection.transport_cls = 'memory'
  163. r = self.app.events.Receiver(
  164. connection,
  165. handlers={'world-war': my_handler},
  166. node_id='celery.tests',
  167. )
  168. r._receive(message, object())
  169. assert got_event[0]
  170. def test_accept_argument(self):
  171. r = self.app.events.Receiver(Mock(), accept={'app/foo'})
  172. assert r.accept == {'app/foo'}
  173. def test_event_queue_prefix__default(self):
  174. r = self.app.events.Receiver(Mock())
  175. assert r.queue.name.startswith('celeryev.')
  176. def test_event_queue_prefix__setting(self):
  177. self.app.conf.event_queue_prefix = 'eventq'
  178. r = self.app.events.Receiver(Mock())
  179. assert r.queue.name.startswith('eventq.')
  180. def test_event_queue_prefix__argument(self):
  181. r = self.app.events.Receiver(Mock(), queue_prefix='fooq')
  182. assert r.queue.name.startswith('fooq.')
  183. def test_catch_all_event(self):
  184. message = {'type': 'world-war'}
  185. got_event = [False]
  186. def my_handler(event):
  187. got_event[0] = True
  188. connection = Mock()
  189. connection.transport_cls = 'memory'
  190. r = self.app.events.Receiver(connection, node_id='celery.tests')
  191. r.handlers['*'] = my_handler
  192. r._receive(message, object())
  193. assert got_event[0]
  194. def test_itercapture(self):
  195. connection = self.app.connection_for_write()
  196. try:
  197. r = self.app.events.Receiver(connection, node_id='celery.tests')
  198. it = r.itercapture(timeout=0.0001, wakeup=False)
  199. with pytest.raises(socket.timeout):
  200. next(it)
  201. with pytest.raises(socket.timeout):
  202. r.capture(timeout=0.00001)
  203. finally:
  204. connection.close()
  205. def test_event_from_message_localize_disabled(self):
  206. r = self.app.events.Receiver(Mock(), node_id='celery.tests')
  207. r.adjust_clock = Mock()
  208. ts_adjust = Mock()
  209. r.event_from_message(
  210. {'type': 'worker-online', 'clock': 313},
  211. localize=False,
  212. adjust_timestamp=ts_adjust,
  213. )
  214. ts_adjust.assert_not_called()
  215. r.adjust_clock.assert_called_with(313)
  216. def test_event_from_message_clock_from_client(self):
  217. r = self.app.events.Receiver(Mock(), node_id='celery.tests')
  218. r.clock.value = 302
  219. r.adjust_clock = Mock()
  220. body = {'type': 'task-sent'}
  221. r.event_from_message(
  222. body, localize=False, adjust_timestamp=Mock(),
  223. )
  224. assert body['clock'] == r.clock.value + CLIENT_CLOCK_SKEW
  225. def test_receive_multi(self):
  226. r = self.app.events.Receiver(Mock(name='connection'))
  227. r.process = Mock(name='process')
  228. efm = r.event_from_message = Mock(name='event_from_message')
  229. def on_efm(*args):
  230. return args
  231. efm.side_effect = on_efm
  232. r._receive([1, 2, 3], Mock())
  233. r.process.assert_has_calls([call(1), call(2), call(3)])
  234. def test_itercapture_limit(self):
  235. connection = self.app.connection_for_write()
  236. channel = connection.channel()
  237. try:
  238. events_received = [0]
  239. def handler(event):
  240. events_received[0] += 1
  241. producer = self.app.events.Dispatcher(
  242. connection, enabled=True, channel=channel,
  243. )
  244. r = self.app.events.Receiver(
  245. connection,
  246. handlers={'*': handler},
  247. node_id='celery.tests',
  248. )
  249. evs = ['ev1', 'ev2', 'ev3', 'ev4', 'ev5']
  250. for ev in evs:
  251. producer.send(ev)
  252. it = r.itercapture(limit=4, wakeup=True)
  253. next(it) # skip consumer (see itercapture)
  254. list(it)
  255. assert events_received[0] == 4
  256. finally:
  257. channel.close()
  258. connection.close()
  259. def test_State(app):
  260. state = app.events.State()
  261. assert dict(state.workers) == {}
  262. def test_default_dispatcher(app):
  263. with app.events.default_dispatcher() as d:
  264. assert d
  265. assert d.connection