builtins.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. from celery import conf
  2. from celery.schedules import crontab
  3. from celery.serialization import pickle
  4. from celery.task.base import Task
  5. from celery.task.sets import TaskSet
  6. class backend_cleanup(Task):
  7. name = "celery.backend_cleanup"
  8. def run(self):
  9. self.backend.cleanup()
  10. if conf.TASK_RESULT_EXPIRES and \
  11. backend_cleanup.name not in conf.CELERYBEAT_SCHEDULE:
  12. conf.CELERYBEAT_SCHEDULE[backend_cleanup.name] = dict(
  13. task=backend_cleanup.name,
  14. schedule=crontab(minute="00", hour="04", day_of_week="*"))
  15. DeleteExpiredTaskMetaTask = backend_cleanup # FIXME remove in 3.0
  16. class PingTask(Task):
  17. """The task used by :func:`ping`."""
  18. name = "celery.ping"
  19. def run(self, **kwargs):
  20. """:returns: the string `"pong"`."""
  21. return "pong"
  22. def _dmap(fun, args, timeout=None):
  23. pickled = pickle.dumps(fun)
  24. arguments = [((pickled, arg, {}), {}) for arg in args]
  25. ts = TaskSet(ExecuteRemoteTask, arguments)
  26. return ts.apply_async().join(timeout=timeout)
  27. class AsynchronousMapTask(Task):
  28. """Task used internally by :func:`dmap_async` and
  29. :meth:`TaskSet.map_async`. """
  30. name = "celery.map_async"
  31. def run(self, serfun, args, timeout=None, **kwargs):
  32. return _dmap(pickle.loads(serfun), args, timeout=timeout)
  33. class ExecuteRemoteTask(Task):
  34. """Execute an arbitrary function or object.
  35. *Note* You probably want :func:`execute_remote` instead, which this
  36. is an internal component of.
  37. The object must be pickleable, so you can't use lambdas or functions
  38. defined in the REPL (that is the python shell, or :program:`ipython`).
  39. """
  40. name = "celery.execute_remote"
  41. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  42. """
  43. :param ser_callable: A pickled function or callable object.
  44. :param fargs: Positional arguments to apply to the function.
  45. :param fkwargs: Keyword arguments to apply to the function.
  46. """
  47. return pickle.loads(ser_callable)(*fargs, **fkwargs)