call.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. """The ``celery call`` program used to send tasks from the command-line."""
  2. from __future__ import absolute_import, unicode_literals
  3. from kombu.utils.json import loads
  4. from celery.bin.base import Command
  5. from celery.five import string_t
  6. from celery.utils.time import maybe_iso8601
  7. class call(Command):
  8. """Call a task by name.
  9. Examples:
  10. .. code-block:: console
  11. $ celery call tasks.add --args='[2, 2]'
  12. $ celery call tasks.add --args='[2, 2]' --countdown=10
  13. """
  14. args = '<task_name>'
  15. # since we have an argument --args, we need to name this differently.
  16. args_name = 'posargs'
  17. def add_arguments(self, parser):
  18. group = parser.add_argument_group('Calling Options')
  19. group.add_argument('--args', '-a',
  20. help='positional arguments (json).')
  21. group.add_argument('--kwargs', '-k',
  22. help='keyword arguments (json).')
  23. group.add_argument('--eta',
  24. help='scheduled time (ISO-8601).')
  25. group.add_argument(
  26. '--countdown', type=float,
  27. help='eta in seconds from now (float/int).',
  28. )
  29. group.add_argument(
  30. '--expires',
  31. help='expiry time (ISO-8601/float/int).',
  32. ),
  33. group.add_argument(
  34. '--serializer', default='json',
  35. help='defaults to json.'),
  36. ropts = parser.add_argument_group('Routing Options')
  37. ropts.add_argument('--queue', help='custom queue name.')
  38. ropts.add_argument('--exchange', help='custom exchange name.')
  39. ropts.add_argument('--routing-key', help='custom routing key.')
  40. def run(self, name, *_, **kwargs):
  41. self._send_task(name, **kwargs)
  42. def _send_task(self, name, args=None, kwargs=None,
  43. countdown=None, serializer=None,
  44. queue=None, exchange=None, routing_key=None,
  45. eta=None, expires=None, **_):
  46. # arguments
  47. args = loads(args) if isinstance(args, string_t) else args
  48. kwargs = loads(kwargs) if isinstance(kwargs, string_t) else kwargs
  49. # Expires can be int/float.
  50. try:
  51. expires = float(expires)
  52. except (TypeError, ValueError):
  53. # or a string describing an ISO 8601 datetime.
  54. try:
  55. expires = maybe_iso8601(expires)
  56. except (TypeError, ValueError):
  57. raise
  58. # send the task and print the id.
  59. self.out(self.app.send_task(
  60. name,
  61. args=args or (), kwargs=kwargs or {},
  62. countdown=countdown,
  63. serializer=serializer,
  64. queue=queue,
  65. exchange=exchange,
  66. routing_key=routing_key,
  67. eta=maybe_iso8601(eta),
  68. expires=expires,
  69. ).id)