__init__.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. """
  2. Working with tasks and task sets.
  3. """
  4. from billiard.serialization import pickle
  5. from celery.execute import apply_async
  6. from celery.registry import tasks
  7. from celery.task.base import Task, TaskSet, PeriodicTask, ExecuteRemoteTask
  8. from celery.task.control import discard_all
  9. from celery.task.builtins import PingTask
  10. __all__ = ["Task", "TaskSet", "PeriodicTask", "tasks", "discard_all",
  11. "dmap", "dmap_async", "execute_remote", "ping"]
  12. def dmap(fun, args, timeout=None):
  13. """Distribute processing of the arguments and collect the results.
  14. Example
  15. >>> from celery.task import dmap
  16. >>> import operator
  17. >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
  18. [4, 8, 16]
  19. """
  20. return TaskSet.map(fun, args, timeout=timeout)
  21. def dmap_async(fun, args, timeout=None):
  22. """Distribute processing of the arguments and collect the results
  23. asynchronously.
  24. :returns: :class:`celery.result.AsyncResult` object.
  25. Example
  26. >>> from celery.task import dmap_async
  27. >>> import operator
  28. >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
  29. >>> presult
  30. <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
  31. >>> presult.status
  32. 'SUCCESS'
  33. >>> presult.result
  34. [4, 8, 16]
  35. """
  36. return TaskSet.map_async(fun, args, timeout=timeout)
  37. def execute_remote(fun, *args, **kwargs):
  38. """Execute arbitrary function/object remotely.
  39. :param fun: A callable function or object.
  40. :param \*args: Positional arguments to apply to the function.
  41. :param \*\*kwargs: Keyword arguments to apply to the function.
  42. The object must be picklable, so you can't use lambdas or functions
  43. defined in the REPL (the objects must have an associated module).
  44. :returns: class:`celery.result.AsyncResult`.
  45. """
  46. return ExecuteRemoteTask.delay(pickle.dumps(fun), args, kwargs)
  47. def ping():
  48. """Test if the server is alive.
  49. Example:
  50. >>> from celery.task import ping
  51. >>> ping()
  52. 'pong'
  53. """
  54. return PingTask.apply_async().get()