test_events.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. from __future__ import absolute_import, unicode_literals
  2. import socket
  3. import pytest
  4. from case import Mock, call
  5. from celery.events import Event
  6. from celery.events.receiver import CLIENT_CLOCK_SKEW
  7. class MockProducer(object):
  8. raise_on_publish = False
  9. def __init__(self, *args, **kwargs):
  10. self.sent = []
  11. def publish(self, msg, *args, **kwargs):
  12. if self.raise_on_publish:
  13. raise KeyError()
  14. self.sent.append(msg)
  15. def close(self):
  16. pass
  17. def has_event(self, kind):
  18. for event in self.sent:
  19. if event['type'] == kind:
  20. return event
  21. return False
  22. def test_Event():
  23. event = Event('world war II')
  24. assert event['type'] == 'world war II'
  25. assert event['timestamp']
  26. class test_EventDispatcher:
  27. def test_redis_uses_fanout_exchange(self):
  28. self.app.connection = Mock()
  29. conn = self.app.connection.return_value = Mock()
  30. conn.transport.driver_type = 'redis'
  31. dispatcher = self.app.events.Dispatcher(conn, enabled=False)
  32. assert dispatcher.exchange.type == 'fanout'
  33. def test_others_use_topic_exchange(self):
  34. self.app.connection = Mock()
  35. conn = self.app.connection.return_value = Mock()
  36. conn.transport.driver_type = 'amqp'
  37. dispatcher = self.app.events.Dispatcher(conn, enabled=False)
  38. assert dispatcher.exchange.type == 'topic'
  39. def test_takes_channel_connection(self):
  40. x = self.app.events.Dispatcher(channel=Mock())
  41. assert x.connection is x.channel.connection.client
  42. def test_sql_transports_disabled(self):
  43. conn = Mock()
  44. conn.transport.driver_type = 'sql'
  45. x = self.app.events.Dispatcher(connection=conn)
  46. assert not x.enabled
  47. def test_send(self):
  48. producer = MockProducer()
  49. producer.connection = self.app.connection_for_write()
  50. connection = Mock()
  51. connection.transport.driver_type = 'amqp'
  52. eventer = self.app.events.Dispatcher(connection, enabled=False,
  53. buffer_while_offline=False)
  54. eventer.producer = producer
  55. eventer.enabled = True
  56. eventer.send('World War II', ended=True)
  57. assert producer.has_event('World War II')
  58. eventer.enabled = False
  59. eventer.send('World War III')
  60. assert not producer.has_event('World War III')
  61. evs = ('Event 1', 'Event 2', 'Event 3')
  62. eventer.enabled = True
  63. eventer.producer.raise_on_publish = True
  64. eventer.buffer_while_offline = False
  65. with pytest.raises(KeyError):
  66. eventer.send('Event X')
  67. eventer.buffer_while_offline = True
  68. for ev in evs:
  69. eventer.send(ev)
  70. eventer.producer.raise_on_publish = False
  71. eventer.flush()
  72. for ev in evs:
  73. assert producer.has_event(ev)
  74. eventer.flush()
  75. def test_send_buffer_group(self):
  76. buf_received = [None]
  77. producer = MockProducer()
  78. producer.connection = self.app.connection_for_write()
  79. connection = Mock()
  80. connection.transport.driver_type = 'amqp'
  81. eventer = self.app.events.Dispatcher(
  82. connection, enabled=False,
  83. buffer_group={'task'}, buffer_limit=2,
  84. )
  85. eventer.producer = producer
  86. eventer.enabled = True
  87. eventer._publish = Mock(name='_publish')
  88. def on_eventer_publish(events, *args, **kwargs):
  89. buf_received[0] = list(events)
  90. eventer._publish.side_effect = on_eventer_publish
  91. assert not eventer._group_buffer['task']
  92. eventer.on_send_buffered = Mock(name='on_send_buffered')
  93. eventer.send('task-received', uuid=1)
  94. prev_buffer = eventer._group_buffer['task']
  95. assert eventer._group_buffer['task']
  96. eventer.on_send_buffered.assert_called_with()
  97. eventer.send('task-received', uuid=1)
  98. assert not eventer._group_buffer['task']
  99. eventer._publish.assert_has_calls([
  100. call([], eventer.producer, 'task.multi'),
  101. ])
  102. # clear in place
  103. assert eventer._group_buffer['task'] is prev_buffer
  104. assert len(buf_received[0]) == 2
  105. eventer.on_send_buffered = None
  106. eventer.send('task-received', uuid=1)
  107. def test_flush_no_groups_no_errors(self):
  108. eventer = self.app.events.Dispatcher(Mock())
  109. eventer.flush(errors=False, groups=False)
  110. def test_enter_exit(self):
  111. with self.app.connection_for_write() as conn:
  112. d = self.app.events.Dispatcher(conn)
  113. d.close = Mock()
  114. with d as _d:
  115. assert _d
  116. d.close.assert_called_with()
  117. def test_enable_disable_callbacks(self):
  118. on_enable = Mock()
  119. on_disable = Mock()
  120. with self.app.connection_for_write() as conn:
  121. with self.app.events.Dispatcher(conn, enabled=False) as d:
  122. d.on_enabled.add(on_enable)
  123. d.on_disabled.add(on_disable)
  124. d.enable()
  125. on_enable.assert_called_with()
  126. d.disable()
  127. on_disable.assert_called_with()
  128. def test_enabled_disable(self):
  129. connection = self.app.connection_for_write()
  130. channel = connection.channel()
  131. try:
  132. dispatcher = self.app.events.Dispatcher(connection,
  133. enabled=True)
  134. dispatcher2 = self.app.events.Dispatcher(connection,
  135. enabled=True,
  136. channel=channel)
  137. assert dispatcher.enabled
  138. assert dispatcher.producer.channel
  139. assert (dispatcher.producer.serializer ==
  140. self.app.conf.event_serializer)
  141. created_channel = dispatcher.producer.channel
  142. dispatcher.disable()
  143. dispatcher.disable() # Disable with no active producer
  144. dispatcher2.disable()
  145. assert not dispatcher.enabled
  146. assert dispatcher.producer is None
  147. # does not close manually provided channel
  148. assert not dispatcher2.channel.closed
  149. dispatcher.enable()
  150. assert dispatcher.enabled
  151. assert dispatcher.producer
  152. # XXX test compat attribute
  153. assert dispatcher.publisher is dispatcher.producer
  154. prev, dispatcher.publisher = dispatcher.producer, 42
  155. try:
  156. assert dispatcher.producer == 42
  157. finally:
  158. dispatcher.producer = prev
  159. finally:
  160. channel.close()
  161. connection.close()
  162. assert created_channel.closed
  163. class test_EventReceiver:
  164. def test_process(self):
  165. message = {'type': 'world-war'}
  166. got_event = [False]
  167. def my_handler(event):
  168. got_event[0] = True
  169. connection = Mock()
  170. connection.transport_cls = 'memory'
  171. r = self.app.events.Receiver(
  172. connection,
  173. handlers={'world-war': my_handler},
  174. node_id='celery.tests',
  175. )
  176. r._receive(message, object())
  177. assert got_event[0]
  178. def test_accept_argument(self):
  179. r = self.app.events.Receiver(Mock(), accept={'app/foo'})
  180. assert r.accept == {'app/foo'}
  181. def test_event_queue_prefix__default(self):
  182. r = self.app.events.Receiver(Mock())
  183. assert r.queue.name.startswith('celeryev.')
  184. def test_event_queue_prefix__setting(self):
  185. self.app.conf.event_queue_prefix = 'eventq'
  186. r = self.app.events.Receiver(Mock())
  187. assert r.queue.name.startswith('eventq.')
  188. def test_event_queue_prefix__argument(self):
  189. r = self.app.events.Receiver(Mock(), queue_prefix='fooq')
  190. assert r.queue.name.startswith('fooq.')
  191. def test_catch_all_event(self):
  192. message = {'type': 'world-war'}
  193. got_event = [False]
  194. def my_handler(event):
  195. got_event[0] = True
  196. connection = Mock()
  197. connection.transport_cls = 'memory'
  198. r = self.app.events.Receiver(connection, node_id='celery.tests')
  199. r.handlers['*'] = my_handler
  200. r._receive(message, object())
  201. assert got_event[0]
  202. def test_itercapture(self):
  203. connection = self.app.connection_for_write()
  204. try:
  205. r = self.app.events.Receiver(connection, node_id='celery.tests')
  206. it = r.itercapture(timeout=0.0001, wakeup=False)
  207. with pytest.raises(socket.timeout):
  208. next(it)
  209. with pytest.raises(socket.timeout):
  210. r.capture(timeout=0.00001)
  211. finally:
  212. connection.close()
  213. def test_event_from_message_localize_disabled(self):
  214. r = self.app.events.Receiver(Mock(), node_id='celery.tests')
  215. r.adjust_clock = Mock()
  216. ts_adjust = Mock()
  217. r.event_from_message(
  218. {'type': 'worker-online', 'clock': 313},
  219. localize=False,
  220. adjust_timestamp=ts_adjust,
  221. )
  222. ts_adjust.assert_not_called()
  223. r.adjust_clock.assert_called_with(313)
  224. def test_event_from_message_clock_from_client(self):
  225. r = self.app.events.Receiver(Mock(), node_id='celery.tests')
  226. r.clock.value = 302
  227. r.adjust_clock = Mock()
  228. body = {'type': 'task-sent'}
  229. r.event_from_message(
  230. body, localize=False, adjust_timestamp=Mock(),
  231. )
  232. assert body['clock'] == r.clock.value + CLIENT_CLOCK_SKEW
  233. def test_receive_multi(self):
  234. r = self.app.events.Receiver(Mock(name='connection'))
  235. r.process = Mock(name='process')
  236. efm = r.event_from_message = Mock(name='event_from_message')
  237. def on_efm(*args):
  238. return args
  239. efm.side_effect = on_efm
  240. r._receive([1, 2, 3], Mock())
  241. r.process.assert_has_calls([call(1), call(2), call(3)])
  242. def test_itercapture_limit(self):
  243. connection = self.app.connection_for_write()
  244. channel = connection.channel()
  245. try:
  246. events_received = [0]
  247. def handler(event):
  248. events_received[0] += 1
  249. producer = self.app.events.Dispatcher(
  250. connection, enabled=True, channel=channel,
  251. )
  252. r = self.app.events.Receiver(
  253. connection,
  254. handlers={'*': handler},
  255. node_id='celery.tests',
  256. )
  257. evs = ['ev1', 'ev2', 'ev3', 'ev4', 'ev5']
  258. for ev in evs:
  259. producer.send(ev)
  260. it = r.itercapture(limit=4, wakeup=True)
  261. next(it) # skip consumer (see itercapture)
  262. list(it)
  263. assert events_received[0] == 4
  264. finally:
  265. channel.close()
  266. connection.close()
  267. def test_State(app):
  268. state = app.events.State()
  269. assert dict(state.workers) == {}
  270. def test_default_dispatcher(app):
  271. with app.events.default_dispatcher() as d:
  272. assert d
  273. assert d.connection