mocks.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from __future__ import absolute_import, unicode_literals
  2. import numbers
  3. from datetime import datetime, timedelta
  4. try:
  5. from case import Mock
  6. except ImportError:
  7. try:
  8. from unittest.mock import Mock
  9. except ImportError:
  10. from mock import Mock
  11. def TaskMessage(name, id=None, args=(), kwargs={}, callbacks=None,
  12. errbacks=None, chain=None, shadow=None, utc=None, **options):
  13. """Create task message in protocol 2 format."""
  14. from celery import uuid
  15. from kombu.serialization import dumps
  16. id = id or uuid()
  17. message = Mock(name='TaskMessage-{0}'.format(id))
  18. message.headers = {
  19. 'id': id,
  20. 'task': name,
  21. 'shadow': shadow,
  22. }
  23. embed = {'callbacks': callbacks, 'errbacks': errbacks, 'chain': chain}
  24. message.headers.update(options)
  25. message.content_type, message.content_encoding, message.body = dumps(
  26. (args, kwargs, embed), serializer='json',
  27. )
  28. message.payload = (args, kwargs, embed)
  29. return message
  30. def TaskMessage1(name, id=None, args=(), kwargs={}, callbacks=None,
  31. errbacks=None, chain=None, **options):
  32. """Create task message in protocol 1 format."""
  33. from celery import uuid
  34. from kombu.serialization import dumps
  35. id = id or uuid()
  36. message = Mock(name='TaskMessage-{0}'.format(id))
  37. message.headers = {}
  38. message.payload = {
  39. 'task': name,
  40. 'id': id,
  41. 'args': args,
  42. 'kwargs': kwargs,
  43. 'callbacks': callbacks,
  44. 'errbacks': errbacks,
  45. }
  46. message.payload.update(options)
  47. message.content_type, message.content_encoding, message.body = dumps(
  48. message.payload,
  49. )
  50. return message
  51. def task_message_from_sig(app, sig, utc=True, TaskMessage=TaskMessage):
  52. """Create task message from :class:`celery.Signature`."""
  53. sig.freeze()
  54. callbacks = sig.options.pop('link', None)
  55. errbacks = sig.options.pop('link_error', None)
  56. countdown = sig.options.pop('countdown', None)
  57. if countdown:
  58. eta = app.now() + timedelta(seconds=countdown)
  59. else:
  60. eta = sig.options.pop('eta', None)
  61. if eta and isinstance(eta, datetime):
  62. eta = eta.isoformat()
  63. expires = sig.options.pop('expires', None)
  64. if expires and isinstance(expires, numbers.Real):
  65. expires = app.now() + timedelta(seconds=expires)
  66. if expires and isinstance(expires, datetime):
  67. expires = expires.isoformat()
  68. return TaskMessage(
  69. sig.task, id=sig.id, args=sig.args,
  70. kwargs=sig.kwargs,
  71. callbacks=[dict(s) for s in callbacks] if callbacks else None,
  72. errbacks=[dict(s) for s in errbacks] if errbacks else None,
  73. eta=eta,
  74. expires=expires,
  75. utc=utc,
  76. **sig.options
  77. )