__init__.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. """
  2. Working with tasks and task sets.
  3. """
  4. from celery.execute import apply_async
  5. from celery.registry import tasks
  6. from celery.serialization import pickle
  7. from celery.task.base import Task, PeriodicTask
  8. from celery.task.sets import TaskSet
  9. from celery.task.builtins import PingTask, ExecuteRemoteTask
  10. from celery.task.builtins import AsynchronousMapTask, _dmap
  11. from celery.task.control import discard_all
  12. from celery.task.http import HttpDispatchTask
  13. __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
  14. "dmap", "dmap_async", "execute_remote", "ping", "HttpDispatchTask"]
  15. def dmap(fun, args, timeout=None):
  16. """Distribute processing of the arguments and collect the results.
  17. Example
  18. >>> from celery.task import dmap
  19. >>> import operator
  20. >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
  21. [4, 8, 16]
  22. """
  23. return _dmap(fun, args, timeout)
  24. def dmap_async(fun, args, timeout=None):
  25. """Distribute processing of the arguments and collect the results
  26. asynchronously.
  27. :returns :class:`celery.result.AsyncResult`:
  28. Example
  29. >>> from celery.task import dmap_async
  30. >>> import operator
  31. >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
  32. >>> presult
  33. <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
  34. >>> presult.status
  35. 'SUCCESS'
  36. >>> presult.result
  37. [4, 8, 16]
  38. """
  39. return AsynchronousMapTask.delay(pickle.dumps(fun), args, timeout=timeout)
  40. def execute_remote(fun, *args, **kwargs):
  41. """Execute arbitrary function/object remotely.
  42. :param fun: A callable function or object.
  43. :param \*args: Positional arguments to apply to the function.
  44. :param \*\*kwargs: Keyword arguments to apply to the function.
  45. The object must be picklable, so you can't use lambdas or functions
  46. defined in the REPL (the objects must have an associated module).
  47. :returns class:`celery.result.AsyncResult`:
  48. """
  49. return ExecuteRemoteTask.delay(pickle.dumps(fun), args, kwargs)
  50. def ping():
  51. """Test if the server is alive.
  52. Example:
  53. >>> from celery.task import ping
  54. >>> ping()
  55. 'pong'
  56. """
  57. return PingTask.apply_async().get()