__init__.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. """
  2. Working with tasks and task sets.
  3. """
  4. from carrot.connection import DjangoBrokerConnection
  5. from celery.messaging import TaskConsumer
  6. from celery.conf import AMQP_CONNECTION_TIMEOUT
  7. from celery.registry import tasks
  8. from celery.backends import default_backend
  9. from celery.task.base import Task, TaskSet, PeriodicTask
  10. from celery.task.base import ExecuteRemoteTask
  11. from celery.task.base import AsynchronousMapTask
  12. from celery.task.builtins import DeleteExpiredTaskMetaTask, PingTask
  13. from celery.execute import apply_async
  14. from celery.serialization import pickle
  15. from celery.task.rest import RESTProxyTask
  16. def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
  17. """Discard all waiting tasks.
  18. This will ignore all tasks waiting for execution, and they will
  19. be deleted from the messaging server.
  20. :returns: the number of tasks discarded.
  21. :rtype: int
  22. """
  23. amqp_connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
  24. consumer = TaskConsumer(connection=amqp_connection)
  25. discarded_count = consumer.discard_all()
  26. amqp_connection.close()
  27. return discarded_count
  28. def is_done(task_id):
  29. """Returns ``True`` if task with ``task_id`` has been executed.
  30. :rtype: bool
  31. """
  32. return default_backend.is_done(task_id)
  33. def dmap(func, args, timeout=None):
  34. """Distribute processing of the arguments and collect the results.
  35. Example
  36. >>> from celery.task import dmap
  37. >>> import operator
  38. >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
  39. [4, 8, 16]
  40. """
  41. return TaskSet.map(func, args, timeout=timeout)
  42. def dmap_async(func, args, timeout=None):
  43. """Distribute processing of the arguments and collect the results
  44. asynchronously.
  45. :returns: :class:`celery.result.AsyncResult` object.
  46. Example
  47. >>> from celery.task import dmap_async
  48. >>> import operator
  49. >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
  50. >>> presult
  51. <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
  52. >>> presult.status
  53. 'DONE'
  54. >>> presult.result
  55. [4, 8, 16]
  56. """
  57. return TaskSet.map_async(func, args, timeout=timeout)
  58. def execute_remote(func, *args, **kwargs):
  59. """Execute arbitrary function/object remotely.
  60. :param func: A callable function or object.
  61. :param \*args: Positional arguments to apply to the function.
  62. :param \*\*kwargs: Keyword arguments to apply to the function.
  63. The object must be picklable, so you can't use lambdas or functions
  64. defined in the REPL (the objects must have an associated module).
  65. :returns: class:`celery.result.AsyncResult`.
  66. """
  67. return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
  68. def ping():
  69. """Test if the server is alive.
  70. Example:
  71. >>> from celery.task import ping
  72. >>> ping()
  73. 'pong'
  74. """
  75. return PingTask.apply_async().get()