builtins.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from datetime import timedelta
  2. from celery.serialization import pickle
  3. from celery.task.base import Task, PeriodicTask
  4. from celery.task.sets import TaskSet
  5. class DeleteExpiredTaskMetaTask(PeriodicTask):
  6. """A periodic task that deletes expired task metadata every day.
  7. This runs the current backend's
  8. :meth:`celery.backends.base.BaseBackend.cleanup` method.
  9. """
  10. name = "celery.delete_expired_task_meta"
  11. run_every = timedelta(days=1)
  12. def run(self, **kwargs):
  13. """:returns: None"""
  14. logger = self.get_logger(**kwargs)
  15. logger.info("Deleting expired task results...")
  16. self.backend.cleanup()
  17. class PingTask(Task):
  18. """The task used by :func:`ping`."""
  19. name = "celery.ping"
  20. def run(self, **kwargs):
  21. """:returns: the string ``"pong"``."""
  22. return "pong"
  23. def _dmap(fun, args, timeout=None):
  24. pickled = pickle.dumps(fun)
  25. arguments = [((pickled, arg, {}), {}) for arg in args]
  26. ts = TaskSet(ExecuteRemoteTask, arguments)
  27. return ts.apply_async().join(timeout=timeout)
  28. class AsynchronousMapTask(Task):
  29. """Task used internally by :func:`dmap_async` and
  30. :meth:`TaskSet.map_async`. """
  31. name = "celery.map_async"
  32. def run(self, serfun, args, timeout=None, **kwargs):
  33. return _dmap(pickle.loads(serfun), args, timeout=timeout)
  34. class ExecuteRemoteTask(Task):
  35. """Execute an arbitrary function or object.
  36. *Note* You probably want :func:`execute_remote` instead, which this
  37. is an internal component of.
  38. The object must be pickleable, so you can't use lambdas or functions
  39. defined in the REPL (that is the python shell, or ``ipython``).
  40. """
  41. name = "celery.execute_remote"
  42. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  43. """
  44. :param ser_callable: A pickled function or callable object.
  45. :param fargs: Positional arguments to apply to the function.
  46. :param fkwargs: Keyword arguments to apply to the function.
  47. """
  48. return pickle.loads(ser_callable)(*fargs, **fkwargs)