task.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. """
  2. Working with tasks and task sets.
  3. """
  4. from carrot.connection import DjangoAMQPConnection
  5. from celery.log import setup_logger
  6. from celery.registry import tasks
  7. from celery.messaging import TaskPublisher, TaskConsumer
  8. from celery.models import TaskMeta
  9. from django.core.cache import cache
  10. from datetime import timedelta
  11. from celery.backends import default_backend
  12. from celery.datastructures import PositionQueue
  13. from celery.result import AsyncResult
  14. from celery.timer import TimeoutTimer
  15. import uuid
  16. import pickle
  17. def delay_task(task_name, *args, **kwargs):
  18. """Delay a task for execution by the ``celery`` daemon.
  19. :param task_name: the name of a task registered in the task registry.
  20. :param \*args: positional arguments to pass on to the task.
  21. :param \*\*kwargs: keyword arguments to pass on to the task.
  22. :raises celery.registry.NotRegistered: exception if no such task
  23. has been registered in the task registry.
  24. :rtype: :class:`celery.result.AsyncResult`.
  25. Example
  26. >>> r = delay_task("update_record", name="George Constanza", age=32)
  27. >>> r.ready()
  28. True
  29. >>> r.result
  30. "Record was updated"
  31. """
  32. if task_name not in tasks:
  33. raise tasks.NotRegistered(
  34. "Task with name %s not registered in the task registry." % (
  35. task_name))
  36. amqp_connection = DjangoAMQPConnection()
  37. publisher = TaskPublisher(connection=amqp_connection)
  38. task_id = publisher.delay_task(task_name, *args, **kwargs)
  39. amqp_connection.close()
  40. return AsyncResult(task_id)
  41. def discard_all():
  42. """Discard all waiting tasks.
  43. This will ignore all tasks waiting for execution, and they will
  44. be deleted from the messaging server.
  45. :returns: the number of tasks discarded.
  46. :rtype: int
  47. """
  48. amqp_connection = DjangoAMQPConnection()
  49. consumer = TaskConsumer(connection=amqp_connection)
  50. discarded_count = consumer.discard_all()
  51. amqp_connection.close()
  52. return discarded_count
  53. def is_done(task_id):
  54. """Returns ``True`` if task with ``task_id`` has been executed.
  55. :rtype: bool
  56. """
  57. return default_backend.is_done(task_id)
  58. class Task(object):
  59. """A task that can be delayed for execution by the ``celery`` daemon.
  60. All subclasses of :class:`Task` must define the :meth:`run` method,
  61. which is the actual method the ``celery`` daemon executes.
  62. The :meth:`run` method supports both positional, and keyword arguments.
  63. .. attribute:: name
  64. *REQUIRED* All subclasses of :class:`Task` has to define the
  65. :attr:`name` attribute. This is the name of the task, registered
  66. in the task registry, and passed to :func:`delay_task`.
  67. .. attribute:: type
  68. The type of task, currently this can be ``regular``, or ``periodic``,
  69. however if you want a periodic task, you should subclass
  70. :class:`PeriodicTask` instead.
  71. :raises NotImplementedError: if the :attr:`name` attribute is not set.
  72. The resulting class is callable, which if called will apply the
  73. :meth:`run` method.
  74. Examples
  75. This is a simple task just logging a message,
  76. >>> from celery.task import tasks, Task
  77. >>> class MyTask(Task):
  78. ... name = "mytask"
  79. ...
  80. ... def run(self, some_arg=None, **kwargs):
  81. ... logger = self.get_logger(**kwargs)
  82. ... logger.info("Running MyTask with arg some_arg=%s" %
  83. ... some_arg))
  84. ... return 42
  85. ... tasks.register(MyTask)
  86. You can delay the task using the classmethod :meth:`delay`...
  87. >>> result = MyTask.delay(some_arg="foo")
  88. >>> result.status # after some time
  89. 'DONE'
  90. >>> result.result
  91. 42
  92. ...or using the :func:`delay_task` function, by passing the name of
  93. the task.
  94. >>> from celery.task import delay_task
  95. >>> result = delay_task(MyTask.name, some_arg="foo")
  96. """
  97. name = None
  98. type = "regular"
  99. max_retries = 0 # unlimited
  100. retry_interval = timedelta(seconds=2)
  101. auto_retry = False
  102. def __init__(self):
  103. if not self.name:
  104. raise NotImplementedError("Tasks must define a name attribute.")
  105. def __call__(self, *args, **kwargs):
  106. return self.run(*args, **kwargs)
  107. def run(self, *args, **kwargs):
  108. """*REQUIRED* The actual task.
  109. All subclasses of :class:`Task` must define the run method.
  110. :raises NotImplementedError: by default, so you have to override
  111. this method in your subclass.
  112. """
  113. raise NotImplementedError("Tasks must define a run method.")
  114. def get_logger(self, **kwargs):
  115. """Get process-aware logger object.
  116. See :func:`celery.log.setup_logger`.
  117. """
  118. return setup_logger(**kwargs)
  119. def get_publisher(self):
  120. """Get a celery task message publisher.
  121. :rtype: :class:`celery.messaging.TaskPublisher`.
  122. Please be sure to close the AMQP connection when you're done
  123. with this object, i.e:
  124. >>> publisher = self.get_publisher()
  125. >>> # do something with publisher
  126. >>> publisher.connection.close()
  127. """
  128. return TaskPublisher(connection=DjangoAMQPConnection())
  129. def get_consumer(self):
  130. """Get a celery task message consumer.
  131. :rtype: :class:`celery.messaging.TaskConsumer`.
  132. Please be sure to close the AMQP connection when you're done
  133. with this object. i.e:
  134. >>> consumer = self.get_consumer()
  135. >>> # do something with consumer
  136. >>> consumer.connection.close()
  137. """
  138. return TaskConsumer(connection=DjangoAMQPConnection())
  139. def requeue(self, task_id, args, kwargs):
  140. publisher = self.get_publisher()
  141. publisher.requeue_task(self.name, task_id, args, kwargs)
  142. publisher.connection.close()
  143. def retry(self, task_id, args, kwargs):
  144. retry_queue.put(self.name, task_id, args, kwargs)
  145. @classmethod
  146. def delay(cls, *args, **kwargs):
  147. """Delay this task for execution by the ``celery`` daemon(s).
  148. :param \*args: positional arguments passed on to the task.
  149. :param \*\*kwargs: keyword arguments passed on to the task.
  150. :rtype: :class:`celery.result.AsyncResult`
  151. See :func:`delay_task`.
  152. """
  153. return delay_task(cls.name, *args, **kwargs)
  154. class TaskSet(object):
  155. """A task containing several subtasks, making it possible
  156. to track how many, or when all of the tasks has been completed.
  157. :param task: The task class or name.
  158. Can either be a fully qualified task name, or a task class.
  159. :param args: A list of args, kwargs pairs.
  160. e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
  161. .. attribute:: task_name
  162. The name of the task.
  163. .. attribute:: arguments
  164. The arguments, as passed to the task set constructor.
  165. .. attribute:: total
  166. Total number of tasks in this task set.
  167. Example
  168. >>> from djangofeeds.tasks import RefreshFeedTask
  169. >>> taskset = TaskSet(RefreshFeedTask, args=[
  170. ... [], {"feed_url": "http://cnn.com/rss"},
  171. ... [], {"feed_url": "http://bbc.com/rss"},
  172. ... [], {"feed_url": "http://xkcd.com/rss"}])
  173. >>> taskset_id, subtask_ids = taskset.run()
  174. >>> list_of_return_values = taskset.join()
  175. """
  176. def __init__(self, task, args):
  177. try:
  178. task_name = task.name
  179. except AttributeError:
  180. task_name = task
  181. self.task_name = task_name
  182. self.arguments = args
  183. self.total = len(args)
  184. def run(self):
  185. """Run all tasks in the taskset.
  186. :returns: A tuple containing the taskset id, and a list
  187. of subtask ids.
  188. :rtype: tuple
  189. Example
  190. >>> ts = RefreshFeeds([
  191. ... ["http://foo.com/rss", {}],
  192. ... ["http://bar.com/rss", {}],
  193. ... )
  194. >>> taskset_id, subtask_ids = ts.run()
  195. >>> taskset_id
  196. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  197. >>> subtask_ids
  198. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  199. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  200. >>> time.sleep(10)
  201. >>> is_done(taskset_id)
  202. True
  203. """
  204. taskset_id = str(uuid.uuid4())
  205. amqp_connection = DjangoAMQPConnection()
  206. publisher = TaskPublisher(connection=amqp_connection)
  207. subtask_ids = []
  208. for arg, kwarg in self.arguments:
  209. subtask_id = publisher.delay_task_in_set(task_name=self.task_name,
  210. taskset_id=taskset_id,
  211. task_args=arg,
  212. task_kwargs=kwarg)
  213. subtask_ids.append(subtask_id)
  214. amqp_connection.close()
  215. return taskset_id, subtask_ids
  216. def iterate(self):
  217. """Iterate over the results returned after calling :meth:`run`.
  218. If any of the tasks raises an exception, the exception will
  219. be re-raised.
  220. """
  221. taskset_id, subtask_ids = self.run()
  222. results = dict([(task_id, AsyncResult(task_id))
  223. for task_id in subtask_ids])
  224. while results:
  225. for task_id, pending_result in results.items():
  226. if pending_result.status == "DONE":
  227. del(results[task_id])
  228. yield pending_result.result
  229. elif pending_result.status == "FAILURE":
  230. raise pending_result.result
  231. def join(self, timeout=None):
  232. """Gather the results for all of the tasks in the taskset,
  233. and return a list with them ordered by the order of which they
  234. were called.
  235. :keyword timeout: The time in seconds, how long
  236. it will wait for results, before the operation times out.
  237. :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
  238. and the operation takes longer than ``timeout`` seconds.
  239. If any of the tasks raises an exception, the exception
  240. will be reraised by :meth:`join`.
  241. :returns: list of return values for all tasks in the taskset.
  242. """
  243. timeout_timer = TimeoutTimer(timeout) # Timeout timer starts here.
  244. taskset_id, subtask_ids = self.run()
  245. pending_results = map(AsyncResult, subtask_ids)
  246. results = PositionQueue(length=len(subtask_ids))
  247. while True:
  248. for position, pending_result in enumerate(pending_results):
  249. if pending_result.status == "DONE":
  250. results[position] = pending_result.result
  251. elif pending_result.status == "FAILURE":
  252. raise pending_result.result
  253. if results.full():
  254. # Make list copy, so the returned type is not a position
  255. # queue.
  256. return list(results)
  257. # This raises TimeoutError when timed out.
  258. timeout_timer.tick()
  259. @classmethod
  260. def remote_execute(cls, func, args):
  261. """Apply ``args`` to function by distributing the args to the
  262. celery server(s)."""
  263. pickled = pickle.dumps(func)
  264. arguments = [[[pickled, arg, {}], {}] for arg in args]
  265. return cls(ExecuteRemoteTask, arguments)
  266. @classmethod
  267. def map(cls, func, args, timeout=None):
  268. """Distribute processing of the arguments and collect the results."""
  269. remote_task = cls.remote_execute(func, args)
  270. return remote_task.join(timeout=timeout)
  271. @classmethod
  272. def map_async(cls, func, args, timeout=None):
  273. """Distribute processing of the arguments and collect the results
  274. asynchronously.
  275. :returns: :class:`celery.result.AsyncResult` instance.
  276. """
  277. serfunc = pickle.dumps(func)
  278. return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
  279. def dmap(func, args, timeout=None):
  280. """Distribute processing of the arguments and collect the results.
  281. Example
  282. >>> from celery.task import map
  283. >>> import operator
  284. >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
  285. [4, 8, 16]
  286. """
  287. return TaskSet.map(func, args, timeout=timeout)
  288. class AsynchronousMapTask(Task):
  289. """Task used internally by :func:`dmap_async` and
  290. :meth:`TaskSet.map_async`. """
  291. name = "celery.map_async"
  292. def run(self, serfunc, args, **kwargs):
  293. timeout = kwargs.get("timeout")
  294. return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
  295. tasks.register(AsynchronousMapTask)
  296. def dmap_async(func, args, timeout=None):
  297. """Distribute processing of the arguments and collect the results
  298. asynchronously.
  299. :returns: :class:`celery.result.AsyncResult` object.
  300. Example
  301. >>> from celery.task import dmap_async
  302. >>> import operator
  303. >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
  304. >>> presult
  305. <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
  306. >>> presult.status
  307. 'DONE'
  308. >>> presult.result
  309. [4, 8, 16]
  310. """
  311. return TaskSet.map_async(func, args, timeout=timeout)
  312. class PeriodicTask(Task):
  313. """A periodic task is a task that behaves like a :manpage:`cron` job.
  314. .. attribute:: run_every
  315. *REQUIRED* Defines how often the task is run (its interval),
  316. it can be either a :class:`datetime.timedelta` object or an
  317. integer specifying the time in seconds.
  318. :raises NotImplementedError: if the :attr:`run_every` attribute is
  319. not defined.
  320. You have to register the periodic task in the task registry.
  321. Example
  322. >>> from celery.task import tasks, PeriodicTask
  323. >>> from datetime import timedelta
  324. >>> class MyPeriodicTask(PeriodicTask):
  325. ... name = "my_periodic_task"
  326. ... run_every = timedelta(seconds=30)
  327. ...
  328. ... def run(self, **kwargs):
  329. ... logger = self.get_logger(**kwargs)
  330. ... logger.info("Running MyPeriodicTask")
  331. >>> tasks.register(MyPeriodicTask)
  332. """
  333. run_every = timedelta(days=1)
  334. type = "periodic"
  335. def __init__(self):
  336. if not self.run_every:
  337. raise NotImplementedError(
  338. "Periodic tasks must have a run_every attribute")
  339. # If run_every is a integer, convert it to timedelta seconds.
  340. if isinstance(self.run_every, int):
  341. self.run_every = timedelta(seconds=self.run_every)
  342. super(PeriodicTask, self).__init__()
  343. class ExecuteRemoteTask(Task):
  344. """Execute an arbitrary function or object.
  345. *Note* You probably want :func:`execute_remote` instead, which this
  346. is an internal component of.
  347. The object must be pickleable, so you can't use lambdas or functions
  348. defined in the REPL (that is the python shell, or ``ipython``).
  349. """
  350. name = "celery.execute_remote"
  351. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  352. """
  353. :param ser_callable: A pickled function or callable object.
  354. :param fargs: Positional arguments to apply to the function.
  355. :param fkwargs: Keyword arguments to apply to the function.
  356. """
  357. callable_ = pickle.loads(ser_callable)
  358. return callable_(*fargs, **fkwargs)
  359. tasks.register(ExecuteRemoteTask)
  360. def execute_remote(func, *args, **kwargs):
  361. """Execute arbitrary function/object remotely.
  362. :param func: A callable function or object.
  363. :param \*args: Positional arguments to apply to the function.
  364. :param \*\*kwargs: Keyword arguments to apply to the function.
  365. The object must be picklable, so you can't use lambdas or functions
  366. defined in the REPL (the objects must have an associated module).
  367. :returns: class:`celery.result.AsyncResult`.
  368. """
  369. return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
  370. class DeleteExpiredTaskMetaTask(PeriodicTask):
  371. """A periodic task that deletes expired task metadata every day.
  372. This runs the current backend's
  373. :meth:`celery.backends.base.BaseBackend.cleanup` method.
  374. """
  375. name = "celery.delete_expired_task_meta"
  376. run_every = timedelta(days=1)
  377. def run(self, **kwargs):
  378. logger = self.get_logger(**kwargs)
  379. logger.info("Deleting expired task meta objects...")
  380. default_backend.cleanup()
  381. tasks.register(DeleteExpiredTaskMetaTask)