base.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. from carrot.connection import DjangoBrokerConnection
  2. from celery.conf import AMQP_CONNECTION_TIMEOUT
  3. from celery.messaging import TaskPublisher, TaskConsumer
  4. from celery.log import setup_logger
  5. from celery.result import TaskSetResult
  6. from celery.execute import apply_async, delay_task, apply
  7. from celery.utils import gen_unique_id, get_full_cls_name
  8. from datetime import timedelta
  9. from celery.registry import tasks
  10. from celery.serialization import pickle
  11. class MaxRetriesExceededError(Exception):
  12. """The tasks max restart limit has been exceeded."""
  13. class Task(object):
  14. """A task that can be delayed for execution by the ``celery`` daemon.
  15. All subclasses of :class:`Task` must define the :meth:`run` method,
  16. which is the actual method the ``celery`` daemon executes.
  17. The :meth:`run` method supports both positional, and keyword arguments.
  18. .. attribute:: name
  19. *REQUIRED* All subclasses of :class:`Task` has to define the
  20. :attr:`name` attribute. This is the name of the task, registered
  21. in the task registry, and passed to :func:`delay_task`.
  22. .. attribute:: type
  23. The type of task, currently this can be ``regular``, or ``periodic``,
  24. however if you want a periodic task, you should subclass
  25. :class:`PeriodicTask` instead.
  26. .. attribute:: routing_key
  27. Override the global default ``routing_key`` for this task.
  28. .. attribute:: exchange
  29. Override the global default ``exchange`` for this task.
  30. .. attribute:: mandatory
  31. If set, the message has mandatory routing. By default the message
  32. is silently dropped by the broker if it can't be routed to a queue.
  33. However - If the message is mandatory, an exception will be raised
  34. instead.
  35. .. attribute:: immediate:
  36. Request immediate delivery. If the message cannot be routed to a
  37. task worker immediately, an exception will be raised. This is
  38. instead of the default behaviour, where the broker will accept and
  39. queue the message, but with no guarantee that the message will ever
  40. be consumed.
  41. .. attribute:: priority:
  42. The message priority. A number from ``0`` to ``9``.
  43. .. attribute:: max_retries
  44. Maximum number of retries before giving up.
  45. .. attribute:: default_retry_delay
  46. Defeault time in seconds before a retry of the task should be
  47. executed. Default is a 1 minute delay.
  48. .. attribute:: ignore_result
  49. Don't store the status and return value. This means you can't
  50. use the :class:`celery.result.AsyncResult` to check if the task is
  51. done, or get its return value. Only use if you need the performance
  52. and is able live without these features. Any exceptions raised will
  53. store the return value/status as usual.
  54. .. attribute:: disable_error_emails
  55. Disable all error e-mails for this task (only applicable if
  56. ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
  57. :raises NotImplementedError: if the :attr:`name` attribute is not set.
  58. The resulting class is callable, which if called will apply the
  59. :meth:`run` method.
  60. Examples
  61. This is a simple task just logging a message,
  62. >>> from celery.task import tasks, Task
  63. >>> class MyTask(Task):
  64. ...
  65. ... def run(self, some_arg=None, **kwargs):
  66. ... logger = self.get_logger(**kwargs)
  67. ... logger.info("Running MyTask with arg some_arg=%s" %
  68. ... some_arg))
  69. ... return 42
  70. ... tasks.register(MyTask)
  71. You can delay the task using the classmethod :meth:`delay`...
  72. >>> result = MyTask.delay(some_arg="foo")
  73. >>> result.status # after some time
  74. 'DONE'
  75. >>> result.result
  76. 42
  77. ...or using the :func:`delay_task` function, by passing the name of
  78. the task.
  79. >>> from celery.task import delay_task
  80. >>> result = delay_task(MyTask.name, some_arg="foo")
  81. """
  82. name = None
  83. type = "regular"
  84. exchange = None
  85. routing_key = None
  86. immediate = False
  87. mandatory = False
  88. priority = None
  89. ignore_result = False
  90. disable_error_emails = False
  91. max_retries = 3
  92. default_retry_delay = 60
  93. MaxRetriesExceededError = MaxRetriesExceededError
  94. def __init__(self):
  95. if not self.__class__.name:
  96. self.__class__.name = get_full_cls_name(self.__class__)
  97. def __call__(self, *args, **kwargs):
  98. return self.run(*args, **kwargs)
  99. def run(self, *args, **kwargs):
  100. """*REQUIRED* The actual task.
  101. All subclasses of :class:`Task` must define the run method.
  102. :raises NotImplementedError: by default, so you have to override
  103. this method in your subclass.
  104. """
  105. raise NotImplementedError("Tasks must define a run method.")
  106. def get_logger(self, **kwargs):
  107. """Get process-aware logger object.
  108. See :func:`celery.log.setup_logger`.
  109. """
  110. return setup_logger(**kwargs)
  111. def get_publisher(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
  112. """Get a celery task message publisher.
  113. :rtype: :class:`celery.messaging.TaskPublisher`.
  114. Please be sure to close the AMQP connection when you're done
  115. with this object, i.e.:
  116. >>> publisher = self.get_publisher()
  117. >>> # do something with publisher
  118. >>> publisher.connection.close()
  119. """
  120. connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
  121. return TaskPublisher(connection=connection,
  122. exchange=self.exchange,
  123. routing_key=self.routing_key)
  124. def get_consumer(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
  125. """Get a celery task message consumer.
  126. :rtype: :class:`celery.messaging.TaskConsumer`.
  127. Please be sure to close the AMQP connection when you're done
  128. with this object. i.e.:
  129. >>> consumer = self.get_consumer()
  130. >>> # do something with consumer
  131. >>> consumer.connection.close()
  132. """
  133. connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
  134. return TaskConsumer(connection=connection, exchange=self.exchange,
  135. routing_key=self.routing_key)
  136. @classmethod
  137. def delay(cls, *args, **kwargs):
  138. """Delay this task for execution by the ``celery`` daemon(s).
  139. :param \*args: positional arguments passed on to the task.
  140. :param \*\*kwargs: keyword arguments passed on to the task.
  141. :rtype: :class:`celery.result.AsyncResult`
  142. See :func:`celery.execute.delay_task`.
  143. """
  144. return apply_async(cls, args, kwargs)
  145. @classmethod
  146. def apply_async(cls, args=None, kwargs=None, **options):
  147. """Delay this task for execution by the ``celery`` daemon(s).
  148. :param args: positional arguments passed on to the task.
  149. :param kwargs: keyword arguments passed on to the task.
  150. :keyword \*\*options: Any keyword arguments to pass on to
  151. :func:`celery.execute.apply_async`.
  152. See :func:`celery.execute.apply_async` for more information.
  153. :rtype: :class:`celery.result.AsyncResult`
  154. """
  155. return apply_async(cls, args, kwargs, **options)
  156. def retry(self, args, kwargs, exc=None, **options):
  157. """Retry the task.
  158. :param args: Positional arguments to retry with.
  159. :param kwargs: Keyword arguments to retry with.
  160. :keyword exc: Optional exception to raise instead of
  161. :exc:`MaxRestartsExceededError` when the max restart limit has
  162. been exceeded.
  163. :keyword countdown: Time in seconds to delay the retry for.
  164. :keyword eta: Explicit time and date to run the retry at (must be a
  165. :class:`datetime.datetime` instance).
  166. :keyword \*\*options: Any extra options to pass on to
  167. meth:`apply_async`. See :func:`celery.execute.apply_async`.
  168. Example
  169. >>> class TwitterPostStatusTask(Task):
  170. ...
  171. ... def run(self, username, password, message, **kwargs):
  172. ... twitter = Twitter(username, password)
  173. ... try:
  174. ... twitter.post_status(message)
  175. ... except twitter.FailWhale, exc:
  176. ... # Retry in 5 minutes.
  177. ... self.retry([username, password, message], kwargs,
  178. ... countdown=60 * 5, exc=exc)
  179. """
  180. options["retries"] = kwargs.pop("task_retries", 0) + 1
  181. options["task_id"] = kwargs.pop("task_id", None)
  182. options["countdown"] = options.get("countdown",
  183. self.default_retry_delay)
  184. exc = exc or MaxRetriesExceededError(
  185. "Can't retry %s[%s] args:%s kwargs:%s" % (
  186. self.name, options["task_id"], args, kwargs))
  187. if options["retries"] > self.max_retries:
  188. raise exc
  189. return self.apply_async(args=args, kwargs=kwargs, **options)
  190. @classmethod
  191. def apply(cls, args=None, kwargs=None, **options):
  192. """Execute this task at once, by blocking until the task
  193. has finished executing.
  194. :param args: positional arguments passed on to the task.
  195. :param kwargs: keyword arguments passed on to the task.
  196. :rtype: :class:`celery.result.EagerResult`
  197. See :func:`celery.execute.apply`.
  198. """
  199. return apply(cls, args, kwargs, **options)
  200. class ExecuteRemoteTask(Task):
  201. """Execute an arbitrary function or object.
  202. *Note* You probably want :func:`execute_remote` instead, which this
  203. is an internal component of.
  204. The object must be pickleable, so you can't use lambdas or functions
  205. defined in the REPL (that is the python shell, or ``ipython``).
  206. """
  207. name = "celery.execute_remote"
  208. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  209. """
  210. :param ser_callable: A pickled function or callable object.
  211. :param fargs: Positional arguments to apply to the function.
  212. :param fkwargs: Keyword arguments to apply to the function.
  213. """
  214. callable_ = pickle.loads(ser_callable)
  215. return callable_(*fargs, **fkwargs)
  216. tasks.register(ExecuteRemoteTask)
  217. class AsynchronousMapTask(Task):
  218. """Task used internally by :func:`dmap_async` and
  219. :meth:`TaskSet.map_async`. """
  220. name = "celery.map_async"
  221. def run(self, serfunc, args, **kwargs):
  222. """The method run by ``celeryd``."""
  223. timeout = kwargs.get("timeout")
  224. return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
  225. tasks.register(AsynchronousMapTask)
  226. class TaskSet(object):
  227. """A task containing several subtasks, making it possible
  228. to track how many, or when all of the tasks has been completed.
  229. :param task: The task class or name.
  230. Can either be a fully qualified task name, or a task class.
  231. :param args: A list of args, kwargs pairs.
  232. e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
  233. .. attribute:: task_name
  234. The name of the task.
  235. .. attribute:: arguments
  236. The arguments, as passed to the task set constructor.
  237. .. attribute:: total
  238. Total number of tasks in this task set.
  239. Example
  240. >>> from djangofeeds.tasks import RefreshFeedTask
  241. >>> taskset = TaskSet(RefreshFeedTask, args=[
  242. ... [], {"feed_url": "http://cnn.com/rss"},
  243. ... [], {"feed_url": "http://bbc.com/rss"},
  244. ... [], {"feed_url": "http://xkcd.com/rss"}])
  245. >>> taskset_result = taskset.run()
  246. >>> list_of_return_values = taskset.join()
  247. """
  248. def __init__(self, task, args):
  249. try:
  250. task_name = task.name
  251. task_obj = task
  252. except AttributeError:
  253. task_name = task
  254. task_obj = tasks[task_name]
  255. self.task = task_obj
  256. self.task_name = task_name
  257. self.arguments = args
  258. self.total = len(args)
  259. def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
  260. """Run all tasks in the taskset.
  261. :returns: A :class:`celery.result.TaskSetResult` instance.
  262. Example
  263. >>> ts = TaskSet(RefreshFeedTask, [
  264. ... ["http://foo.com/rss", {}],
  265. ... ["http://bar.com/rss", {}],
  266. ... )
  267. >>> result = ts.run()
  268. >>> result.taskset_id
  269. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  270. >>> result.subtask_ids
  271. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  272. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  273. >>> result.waiting()
  274. True
  275. >>> time.sleep(10)
  276. >>> result.ready()
  277. True
  278. >>> result.successful()
  279. True
  280. >>> result.failed()
  281. False
  282. >>> result.join()
  283. [True, True]
  284. """
  285. taskset_id = gen_unique_id()
  286. from celery.conf import ALWAYS_EAGER
  287. if ALWAYS_EAGER:
  288. subtasks = [apply(self.task, args, kwargs)
  289. for args, kwargs in self.arguments]
  290. return TaskSetResult(taskset_id, subtasks)
  291. conn = DjangoBrokerConnection(connect_timeout=connect_timeout)
  292. publisher = TaskPublisher(connection=conn,
  293. exchange=self.task.exchange)
  294. subtasks = [apply_async(self.task, args, kwargs,
  295. taskset_id=taskset_id, publisher=publisher)
  296. for args, kwargs in self.arguments]
  297. publisher.close()
  298. conn.close()
  299. return TaskSetResult(taskset_id, subtasks)
  300. def join(self, timeout=None):
  301. """Gather the results for all of the tasks in the taskset,
  302. and return a list with them ordered by the order of which they
  303. were called.
  304. :keyword timeout: The time in seconds, how long
  305. it will wait for results, before the operation times out.
  306. :raises TimeoutError: if ``timeout`` is not ``None``
  307. and the operation takes longer than ``timeout`` seconds.
  308. If any of the tasks raises an exception, the exception
  309. will be reraised by :meth:`join`.
  310. :returns: list of return values for all tasks in the taskset.
  311. """
  312. return self.run().join(timeout=timeout)
  313. @classmethod
  314. def remote_execute(cls, func, args):
  315. """Apply ``args`` to function by distributing the args to the
  316. celery server(s)."""
  317. pickled = pickle.dumps(func)
  318. arguments = [[[pickled, arg, {}], {}] for arg in args]
  319. return cls(ExecuteRemoteTask, arguments)
  320. @classmethod
  321. def map(cls, func, args, timeout=None):
  322. """Distribute processing of the arguments and collect the results."""
  323. remote_task = cls.remote_execute(func, args)
  324. return remote_task.join(timeout=timeout)
  325. @classmethod
  326. def map_async(cls, func, args, timeout=None):
  327. """Distribute processing of the arguments and collect the results
  328. asynchronously.
  329. :returns: :class:`celery.result.AsyncResult` instance.
  330. """
  331. serfunc = pickle.dumps(func)
  332. return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
  333. class PeriodicTask(Task):
  334. """A periodic task is a task that behaves like a :manpage:`cron` job.
  335. .. attribute:: run_every
  336. *REQUIRED* Defines how often the task is run (its interval),
  337. it can be either a :class:`datetime.timedelta` object or an
  338. integer specifying the time in seconds.
  339. :raises NotImplementedError: if the :attr:`run_every` attribute is
  340. not defined.
  341. You have to register the periodic task in the task registry.
  342. Example
  343. >>> from celery.task import tasks, PeriodicTask
  344. >>> from datetime import timedelta
  345. >>> class MyPeriodicTask(PeriodicTask):
  346. ... name = "my_periodic_task"
  347. ... run_every = timedelta(seconds=30)
  348. ...
  349. ... def run(self, **kwargs):
  350. ... logger = self.get_logger(**kwargs)
  351. ... logger.info("Running MyPeriodicTask")
  352. >>> tasks.register(MyPeriodicTask)
  353. """
  354. run_every = timedelta(days=1)
  355. type = "periodic"
  356. def __init__(self):
  357. if not self.run_every:
  358. raise NotImplementedError(
  359. "Periodic tasks must have a run_every attribute")
  360. # If run_every is a integer, convert it to timedelta seconds.
  361. if isinstance(self.run_every, int):
  362. self.run_every = timedelta(seconds=self.run_every)
  363. super(PeriodicTask, self).__init__()