task.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. """
  2. Working with tasks and task sets.
  3. """
  4. from carrot.connection import DjangoAMQPConnection
  5. from celery.conf import AMQP_CONNECTION_TIMEOUT
  6. from celery.messaging import TaskPublisher, TaskConsumer
  7. from celery.log import setup_logger
  8. from celery.registry import tasks
  9. from datetime import timedelta
  10. from celery.backends import default_backend
  11. from celery.result import AsyncResult, TaskSetResult
  12. import uuid
  13. import pickle
  14. def apply_async(task, args=None, kwargs=None, routing_key=None,
  15. immediate=None, mandatory=None, connection=None,
  16. connect_timeout=AMQP_CONNECTION_TIMEOUT, priority=None):
  17. """Run a task asynchronously by the celery daemon(s).
  18. :param task: The task to run (a callable object, or a :class:`Task`
  19. instance
  20. :param args: The positional arguments to pass on to the task (a ``list``).
  21. :param kwargs: The keyword arguments to pass on to the task (a ``dict``)
  22. :keyword routing_key: The routing key used to route the task to a worker
  23. server.
  24. :keyword immediate: Request immediate delivery. Will raise an exception
  25. if the task cannot be routed to a worker immediately.
  26. :keyword mandatory: Mandatory routing. Raises an exception if there's
  27. no running workers able to take on this task.
  28. :keyword connection: Re-use existing AMQP connection.
  29. The ``connect_timeout`` argument is not respected if this is set.
  30. :keyword connect_timeout: The timeout in seconds, before we give up
  31. on establishing a connection to the AMQP server.
  32. :keyword priority: The task priority, a number between ``0`` and ``9``.
  33. """
  34. if not args:
  35. args = []
  36. if not kwargs:
  37. kwargs = []
  38. message_opts = {"routing_key": routing_key,
  39. "immediate": immediate,
  40. "mandatory": mandatory,
  41. "priority": priority}
  42. for option_name, option_value in message_opts.items():
  43. message_opts[option_name] = getattr(task, option_name, option_value)
  44. need_to_close_connection = False
  45. if not connection:
  46. connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
  47. need_to_close_connection = True
  48. publisher = TaskPublisher(connection=connection)
  49. task_id = publisher.delay_task(task.name, args, kwargs, **message_opts)
  50. publisher.close()
  51. if need_to_close_connection:
  52. connection.close()
  53. return AsyncResult(task_id)
  54. def delay_task(task_name, *args, **kwargs):
  55. """Delay a task for execution by the ``celery`` daemon.
  56. :param task_name: the name of a task registered in the task registry.
  57. :param \*args: positional arguments to pass on to the task.
  58. :param \*\*kwargs: keyword arguments to pass on to the task.
  59. :raises celery.registry.NotRegistered: exception if no such task
  60. has been registered in the task registry.
  61. :rtype: :class:`celery.result.AsyncResult`.
  62. Example
  63. >>> r = delay_task("update_record", name="George Constanza", age=32)
  64. >>> r.ready()
  65. True
  66. >>> r.result
  67. "Record was updated"
  68. """
  69. if task_name not in tasks:
  70. raise tasks.NotRegistered(
  71. "Task with name %s not registered in the task registry." % (
  72. task_name))
  73. task = tasks[task_name]
  74. return apply_async(task, args, kwargs)
  75. def discard_all(connect_timeout=AMQP_CONNECTION_TIMEOUT):
  76. """Discard all waiting tasks.
  77. This will ignore all tasks waiting for execution, and they will
  78. be deleted from the messaging server.
  79. :returns: the number of tasks discarded.
  80. :rtype: int
  81. """
  82. amqp_connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
  83. consumer = TaskConsumer(connection=amqp_connection)
  84. discarded_count = consumer.discard_all()
  85. amqp_connection.close()
  86. return discarded_count
  87. def is_done(task_id):
  88. """Returns ``True`` if task with ``task_id`` has been executed.
  89. :rtype: bool
  90. """
  91. return default_backend.is_done(task_id)
  92. class Task(object):
  93. """A task that can be delayed for execution by the ``celery`` daemon.
  94. All subclasses of :class:`Task` must define the :meth:`run` method,
  95. which is the actual method the ``celery`` daemon executes.
  96. The :meth:`run` method supports both positional, and keyword arguments.
  97. .. attribute:: name
  98. *REQUIRED* All subclasses of :class:`Task` has to define the
  99. :attr:`name` attribute. This is the name of the task, registered
  100. in the task registry, and passed to :func:`delay_task`.
  101. .. attribute:: type
  102. The type of task, currently this can be ``regular``, or ``periodic``,
  103. however if you want a periodic task, you should subclass
  104. :class:`PeriodicTask` instead.
  105. :raises NotImplementedError: if the :attr:`name` attribute is not set.
  106. The resulting class is callable, which if called will apply the
  107. :meth:`run` method.
  108. Examples
  109. This is a simple task just logging a message,
  110. >>> from celery.task import tasks, Task
  111. >>> class MyTask(Task):
  112. ... name = "mytask"
  113. ...
  114. ... def run(self, some_arg=None, **kwargs):
  115. ... logger = self.get_logger(**kwargs)
  116. ... logger.info("Running MyTask with arg some_arg=%s" %
  117. ... some_arg))
  118. ... return 42
  119. ... tasks.register(MyTask)
  120. You can delay the task using the classmethod :meth:`delay`...
  121. >>> result = MyTask.delay(some_arg="foo")
  122. >>> result.status # after some time
  123. 'DONE'
  124. >>> result.result
  125. 42
  126. ...or using the :func:`delay_task` function, by passing the name of
  127. the task.
  128. >>> from celery.task import delay_task
  129. >>> result = delay_task(MyTask.name, some_arg="foo")
  130. """
  131. name = None
  132. type = "regular"
  133. max_retries = 0 # unlimited
  134. retry_interval = timedelta(seconds=2)
  135. auto_retry = False
  136. routing_key = None
  137. immediate = False
  138. mandatory = False
  139. def __init__(self):
  140. if not self.name:
  141. raise NotImplementedError("Tasks must define a name attribute.")
  142. def __call__(self, *args, **kwargs):
  143. return self.run(*args, **kwargs)
  144. def run(self, *args, **kwargs):
  145. """*REQUIRED* The actual task.
  146. All subclasses of :class:`Task` must define the run method.
  147. :raises NotImplementedError: by default, so you have to override
  148. this method in your subclass.
  149. """
  150. raise NotImplementedError("Tasks must define a run method.")
  151. def get_logger(self, **kwargs):
  152. """Get process-aware logger object.
  153. See :func:`celery.log.setup_logger`.
  154. """
  155. return setup_logger(**kwargs)
  156. def get_publisher(self):
  157. """Get a celery task message publisher.
  158. :rtype: :class:`celery.messaging.TaskPublisher`.
  159. Please be sure to close the AMQP connection when you're done
  160. with this object, i.e.:
  161. >>> publisher = self.get_publisher()
  162. >>> # do something with publisher
  163. >>> publisher.connection.close()
  164. """
  165. return TaskPublisher(connection=DjangoAMQPConnection(
  166. connect_timeout=AMQP_CONNECTION_TIMEOUT))
  167. def get_consumer(self):
  168. """Get a celery task message consumer.
  169. :rtype: :class:`celery.messaging.TaskConsumer`.
  170. Please be sure to close the AMQP connection when you're done
  171. with this object. i.e.:
  172. >>> consumer = self.get_consumer()
  173. >>> # do something with consumer
  174. >>> consumer.connection.close()
  175. """
  176. return TaskConsumer(connection=DjangoAMQPConnection(
  177. connect_timeout=AMQP_CONNECTION_TIMEOUT))
  178. @classmethod
  179. def delay(cls, *args, **kwargs):
  180. """Delay this task for execution by the ``celery`` daemon(s).
  181. :param \*args: positional arguments passed on to the task.
  182. :param \*\*kwargs: keyword arguments passed on to the task.
  183. :rtype: :class:`celery.result.AsyncResult`
  184. See :func:`delay_task`.
  185. """
  186. return apply_async(cls, args, kwargs)
  187. @classmethod
  188. def apply_async(cls, args=None, kwargs=None, **options):
  189. """Delay this task for execution by the ``celery`` daemon(s).
  190. :param args: positional arguments passed on to the task.
  191. :param kwargs: keyword arguments passed on to the task.
  192. :rtype: :class:`celery.result.AsyncResult`
  193. See :func:`apply_async`.
  194. """
  195. return apply_async(cls, args, kwargs, **options)
  196. class TaskSet(object):
  197. """A task containing several subtasks, making it possible
  198. to track how many, or when all of the tasks has been completed.
  199. :param task: The task class or name.
  200. Can either be a fully qualified task name, or a task class.
  201. :param args: A list of args, kwargs pairs.
  202. e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
  203. .. attribute:: task_name
  204. The name of the task.
  205. .. attribute:: arguments
  206. The arguments, as passed to the task set constructor.
  207. .. attribute:: total
  208. Total number of tasks in this task set.
  209. Example
  210. >>> from djangofeeds.tasks import RefreshFeedTask
  211. >>> taskset = TaskSet(RefreshFeedTask, args=[
  212. ... [], {"feed_url": "http://cnn.com/rss"},
  213. ... [], {"feed_url": "http://bbc.com/rss"},
  214. ... [], {"feed_url": "http://xkcd.com/rss"}])
  215. >>> taskset_result = taskset.run()
  216. >>> list_of_return_values = taskset.join()
  217. """
  218. def __init__(self, task, args):
  219. try:
  220. task_name = task.name
  221. except AttributeError:
  222. task_name = task
  223. self.task_name = task_name
  224. self.arguments = args
  225. self.total = len(args)
  226. def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
  227. """Run all tasks in the taskset.
  228. :returns: A :class:`celery.result.TaskSetResult` instance.
  229. Example
  230. >>> ts = TaskSet(RefreshFeedTask, [
  231. ... ["http://foo.com/rss", {}],
  232. ... ["http://bar.com/rss", {}],
  233. ... )
  234. >>> result = ts.run()
  235. >>> result.taskset_id
  236. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  237. >>> result.subtask_ids
  238. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  239. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  240. >>> result.waiting()
  241. True
  242. >>> time.sleep(10)
  243. >>> result.ready()
  244. True
  245. >>> result.successful()
  246. True
  247. >>> result.failed()
  248. False
  249. >>> result.join()
  250. [True, True]
  251. """
  252. taskset_id = str(uuid.uuid4())
  253. conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
  254. publisher = TaskPublisher(connection=conn)
  255. subtask_ids = [publisher.delay_task_in_set(task_name=self.task_name,
  256. taskset_id=taskset_id,
  257. task_args=arg,
  258. task_kwargs=kwarg)
  259. for arg, kwarg in self.arguments]
  260. publisher.close()
  261. conn.close()
  262. return TaskSetResult(taskset_id, subtask_ids)
  263. def iterate(self):
  264. """Iterate over the results returned after calling :meth:`run`.
  265. If any of the tasks raises an exception, the exception will
  266. be re-raised.
  267. """
  268. return iter(self.run())
  269. def join(self, timeout=None):
  270. """Gather the results for all of the tasks in the taskset,
  271. and return a list with them ordered by the order of which they
  272. were called.
  273. :keyword timeout: The time in seconds, how long
  274. it will wait for results, before the operation times out.
  275. :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
  276. and the operation takes longer than ``timeout`` seconds.
  277. If any of the tasks raises an exception, the exception
  278. will be reraised by :meth:`join`.
  279. :returns: list of return values for all tasks in the taskset.
  280. """
  281. return self.run().join(timeout=timeout)
  282. @classmethod
  283. def remote_execute(cls, func, args):
  284. """Apply ``args`` to function by distributing the args to the
  285. celery server(s)."""
  286. pickled = pickle.dumps(func)
  287. arguments = [[[pickled, arg, {}], {}] for arg in args]
  288. return cls(ExecuteRemoteTask, arguments)
  289. @classmethod
  290. def map(cls, func, args, timeout=None):
  291. """Distribute processing of the arguments and collect the results."""
  292. remote_task = cls.remote_execute(func, args)
  293. return remote_task.join(timeout=timeout)
  294. @classmethod
  295. def map_async(cls, func, args, timeout=None):
  296. """Distribute processing of the arguments and collect the results
  297. asynchronously.
  298. :returns: :class:`celery.result.AsyncResult` instance.
  299. """
  300. serfunc = pickle.dumps(func)
  301. return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
  302. def dmap(func, args, timeout=None):
  303. """Distribute processing of the arguments and collect the results.
  304. Example
  305. >>> from celery.task import map
  306. >>> import operator
  307. >>> dmap(operator.add, [[2, 2], [4, 4], [8, 8]])
  308. [4, 8, 16]
  309. """
  310. return TaskSet.map(func, args, timeout=timeout)
  311. class AsynchronousMapTask(Task):
  312. """Task used internally by :func:`dmap_async` and
  313. :meth:`TaskSet.map_async`. """
  314. name = "celery.map_async"
  315. def run(self, serfunc, args, **kwargs):
  316. """The method run by ``celeryd``."""
  317. timeout = kwargs.get("timeout")
  318. return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
  319. tasks.register(AsynchronousMapTask)
  320. def dmap_async(func, args, timeout=None):
  321. """Distribute processing of the arguments and collect the results
  322. asynchronously.
  323. :returns: :class:`celery.result.AsyncResult` object.
  324. Example
  325. >>> from celery.task import dmap_async
  326. >>> import operator
  327. >>> presult = dmap_async(operator.add, [[2, 2], [4, 4], [8, 8]])
  328. >>> presult
  329. <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
  330. >>> presult.status
  331. 'DONE'
  332. >>> presult.result
  333. [4, 8, 16]
  334. """
  335. return TaskSet.map_async(func, args, timeout=timeout)
  336. class PeriodicTask(Task):
  337. """A periodic task is a task that behaves like a :manpage:`cron` job.
  338. .. attribute:: run_every
  339. *REQUIRED* Defines how often the task is run (its interval),
  340. it can be either a :class:`datetime.timedelta` object or an
  341. integer specifying the time in seconds.
  342. :raises NotImplementedError: if the :attr:`run_every` attribute is
  343. not defined.
  344. You have to register the periodic task in the task registry.
  345. Example
  346. >>> from celery.task import tasks, PeriodicTask
  347. >>> from datetime import timedelta
  348. >>> class MyPeriodicTask(PeriodicTask):
  349. ... name = "my_periodic_task"
  350. ... run_every = timedelta(seconds=30)
  351. ...
  352. ... def run(self, **kwargs):
  353. ... logger = self.get_logger(**kwargs)
  354. ... logger.info("Running MyPeriodicTask")
  355. >>> tasks.register(MyPeriodicTask)
  356. """
  357. run_every = timedelta(days=1)
  358. type = "periodic"
  359. def __init__(self):
  360. if not self.run_every:
  361. raise NotImplementedError(
  362. "Periodic tasks must have a run_every attribute")
  363. # If run_every is a integer, convert it to timedelta seconds.
  364. if isinstance(self.run_every, int):
  365. self.run_every = timedelta(seconds=self.run_every)
  366. super(PeriodicTask, self).__init__()
  367. class ExecuteRemoteTask(Task):
  368. """Execute an arbitrary function or object.
  369. *Note* You probably want :func:`execute_remote` instead, which this
  370. is an internal component of.
  371. The object must be pickleable, so you can't use lambdas or functions
  372. defined in the REPL (that is the python shell, or ``ipython``).
  373. """
  374. name = "celery.execute_remote"
  375. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  376. """
  377. :param ser_callable: A pickled function or callable object.
  378. :param fargs: Positional arguments to apply to the function.
  379. :param fkwargs: Keyword arguments to apply to the function.
  380. """
  381. callable_ = pickle.loads(ser_callable)
  382. return callable_(*fargs, **fkwargs)
  383. tasks.register(ExecuteRemoteTask)
  384. def execute_remote(func, *args, **kwargs):
  385. """Execute arbitrary function/object remotely.
  386. :param func: A callable function or object.
  387. :param \*args: Positional arguments to apply to the function.
  388. :param \*\*kwargs: Keyword arguments to apply to the function.
  389. The object must be picklable, so you can't use lambdas or functions
  390. defined in the REPL (the objects must have an associated module).
  391. :returns: class:`celery.result.AsyncResult`.
  392. """
  393. return ExecuteRemoteTask.delay(pickle.dumps(func), args, kwargs)
  394. class DeleteExpiredTaskMetaTask(PeriodicTask):
  395. """A periodic task that deletes expired task metadata every day.
  396. This runs the current backend's
  397. :meth:`celery.backends.base.BaseBackend.cleanup` method.
  398. """
  399. name = "celery.delete_expired_task_meta"
  400. run_every = timedelta(days=1)
  401. def run(self, **kwargs):
  402. """The method run by ``celeryd``."""
  403. logger = self.get_logger(**kwargs)
  404. logger.info("Deleting expired task meta objects...")
  405. default_backend.cleanup()
  406. tasks.register(DeleteExpiredTaskMetaTask)