base.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. from carrot.connection import DjangoBrokerConnection
  2. from celery import conf
  3. from celery.messaging import TaskPublisher, TaskConsumer
  4. from celery.log import setup_logger
  5. from celery.result import TaskSetResult, EagerResult
  6. from celery.execute import apply_async, delay_task, apply
  7. from celery.utils import gen_unique_id, get_full_cls_name
  8. from celery.registry import tasks
  9. from celery.serialization import pickle
  10. from celery.exceptions import MaxRetriesExceededError, RetryTaskError
  11. from datetime import timedelta
  12. class Task(object):
  13. """A task that can be delayed for execution by the ``celery`` daemon.
  14. All subclasses of :class:`Task` must define the :meth:`run` method,
  15. which is the actual method the ``celery`` daemon executes.
  16. The :meth:`run` method can take use of the default keyword arguments,
  17. as listed in the :meth:`run` documentation.
  18. The :meth:`run` method supports both positional, and keyword arguments.
  19. .. attribute:: name
  20. *REQUIRED* All subclasses of :class:`Task` has to define the
  21. :attr:`name` attribute. This is the name of the task, registered
  22. in the task registry, and passed to :func:`delay_task`.
  23. .. attribute:: type
  24. The type of task, currently this can be ``regular``, or ``periodic``,
  25. however if you want a periodic task, you should subclass
  26. :class:`PeriodicTask` instead.
  27. .. attribute:: routing_key
  28. Override the global default ``routing_key`` for this task.
  29. .. attribute:: exchange
  30. Override the global default ``exchange`` for this task.
  31. .. attribute:: mandatory
  32. If set, the message has mandatory routing. By default the message
  33. is silently dropped by the broker if it can't be routed to a queue.
  34. However - If the message is mandatory, an exception will be raised
  35. instead.
  36. .. attribute:: immediate:
  37. Request immediate delivery. If the message cannot be routed to a
  38. task worker immediately, an exception will be raised. This is
  39. instead of the default behaviour, where the broker will accept and
  40. queue the message, but with no guarantee that the message will ever
  41. be consumed.
  42. .. attribute:: priority:
  43. The message priority. A number from ``0`` to ``9``.
  44. .. attribute:: max_retries
  45. Maximum number of retries before giving up.
  46. .. attribute:: default_retry_delay
  47. Defeault time in seconds before a retry of the task should be
  48. executed. Default is a 1 minute delay.
  49. .. attribute:: ignore_result
  50. Don't store the status and return value. This means you can't
  51. use the :class:`celery.result.AsyncResult` to check if the task is
  52. done, or get its return value. Only use if you need the performance
  53. and is able live without these features. Any exceptions raised will
  54. store the return value/status as usual.
  55. .. attribute:: disable_error_emails
  56. Disable all error e-mails for this task (only applicable if
  57. ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
  58. .. attribute:: serializer
  59. A string identifying the default serialization
  60. method to use. Defaults to the ``CELERY_TASK_SERIALIZER`` setting.
  61. Can be ``pickle`` ``json``, ``yaml``, or any custom serialization
  62. methods that have been registered with
  63. :mod:`carrot.serialization.registry`.
  64. :raises NotImplementedError: if the :attr:`name` attribute is not set.
  65. The resulting class is callable, which if called will apply the
  66. :meth:`run` method.
  67. Examples
  68. This is a simple task just logging a message,
  69. >>> from celery.task import tasks, Task
  70. >>> class MyTask(Task):
  71. ...
  72. ... def run(self, some_arg=None, **kwargs):
  73. ... logger = self.get_logger(**kwargs)
  74. ... logger.info("Running MyTask with arg some_arg=%s" %
  75. ... some_arg))
  76. ... return 42
  77. ... tasks.register(MyTask)
  78. You can delay the task using the classmethod :meth:`delay`...
  79. >>> result = MyTask.delay(some_arg="foo")
  80. >>> result.status # after some time
  81. 'DONE'
  82. >>> result.result
  83. 42
  84. ...or using the :func:`delay_task` function, by passing the name of
  85. the task.
  86. >>> from celery.task import delay_task
  87. >>> result = delay_task(MyTask.name, some_arg="foo")
  88. """
  89. name = None
  90. type = "regular"
  91. exchange = None
  92. routing_key = None
  93. immediate = False
  94. mandatory = False
  95. priority = None
  96. ignore_result = False
  97. disable_error_emails = False
  98. max_retries = 3
  99. default_retry_delay = 3 * 60
  100. serializer = conf.TASK_SERIALIZER
  101. MaxRetriesExceededError = MaxRetriesExceededError
  102. def __init__(self):
  103. if not self.__class__.name:
  104. self.__class__.name = get_full_cls_name(self.__class__)
  105. def __call__(self, *args, **kwargs):
  106. return self.run(*args, **kwargs)
  107. def run(self, *args, **kwargs):
  108. """The body of the task executed by the worker.
  109. The following standard keyword arguments are reserved and is passed
  110. by the worker if the function/method supports them:
  111. * task_id
  112. Unique id of the currently executing task.
  113. * task_name
  114. Name of the currently executing task (same as :attr:`name`)
  115. * task_retries
  116. How many times the current task has been retried
  117. (an integer starting at ``0``).
  118. * logfile
  119. Name of the worker log file.
  120. * loglevel
  121. The current loglevel, an integer mapping to one of the
  122. following values: ``logging.DEBUG``, ``logging.INFO``,
  123. ``logging.ERROR``, ``logging.CRITICAL``, ``logging.WARNING``,
  124. ``logging.FATAL``.
  125. Additional standard keyword arguments may be added in the future.
  126. To take these default arguments, the task can either list the ones
  127. it wants explicitly or just take an arbitrary list of keyword
  128. arguments (\*\*kwargs).
  129. Example using an explicit list of default arguments to take:
  130. .. code-block:: python
  131. def run(self, x, y, logfile=None, loglevel=None):
  132. self.get_logger(loglevel=loglevel, logfile=logfile)
  133. return x * y
  134. Example taking all default keyword arguments, and any extra arguments
  135. passed on by the caller:
  136. .. code-block:: python
  137. def run(self, x, y, **kwargs): # CORRECT!
  138. logger = self.get_logger(**kwargs)
  139. adjust = kwargs.get("adjust", 0)
  140. return x * y - adjust
  141. """
  142. raise NotImplementedError("Tasks must define a run method.")
  143. def get_logger(self, **kwargs):
  144. """Get process-aware logger object.
  145. See :func:`celery.log.setup_logger`.
  146. """
  147. logfile = kwargs.get("logfile")
  148. loglevel = kwargs.get("loglevel")
  149. return setup_logger(loglevel=loglevel, logfile=logfile)
  150. def get_publisher(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
  151. """Get a celery task message publisher.
  152. :rtype: :class:`celery.messaging.TaskPublisher`.
  153. Please be sure to close the AMQP connection when you're done
  154. with this object, i.e.:
  155. >>> publisher = self.get_publisher()
  156. >>> # do something with publisher
  157. >>> publisher.connection.close()
  158. """
  159. connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
  160. return TaskPublisher(connection=connection,
  161. exchange=self.exchange,
  162. routing_key=self.routing_key)
  163. def get_consumer(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
  164. """Get a celery task message consumer.
  165. :rtype: :class:`celery.messaging.TaskConsumer`.
  166. Please be sure to close the AMQP connection when you're done
  167. with this object. i.e.:
  168. >>> consumer = self.get_consumer()
  169. >>> # do something with consumer
  170. >>> consumer.connection.close()
  171. """
  172. connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
  173. return TaskConsumer(connection=connection, exchange=self.exchange,
  174. routing_key=self.routing_key)
  175. @classmethod
  176. def delay(cls, *args, **kwargs):
  177. """Delay this task for execution by the ``celery`` daemon(s).
  178. :param \*args: positional arguments passed on to the task.
  179. :param \*\*kwargs: keyword arguments passed on to the task.
  180. :rtype: :class:`celery.result.AsyncResult`
  181. See :func:`celery.execute.delay_task`.
  182. """
  183. return apply_async(cls, args, kwargs)
  184. @classmethod
  185. def apply_async(cls, args=None, kwargs=None, **options):
  186. """Delay this task for execution by the ``celery`` daemon(s).
  187. :param args: positional arguments passed on to the task.
  188. :param kwargs: keyword arguments passed on to the task.
  189. :keyword \*\*options: Any keyword arguments to pass on to
  190. :func:`celery.execute.apply_async`.
  191. See :func:`celery.execute.apply_async` for more information.
  192. :rtype: :class:`celery.result.AsyncResult`
  193. """
  194. return apply_async(cls, args, kwargs, **options)
  195. def retry(self, args, kwargs, exc=None, throw=True, **options):
  196. """Retry the task.
  197. :param args: Positional arguments to retry with.
  198. :param kwargs: Keyword arguments to retry with.
  199. :keyword exc: Optional exception to raise instead of
  200. :exc:`MaxRestartsExceededError` when the max restart limit has
  201. been exceeded.
  202. :keyword throw: Do not raise the
  203. :exc:`celery.exceptions.RetryTaskError` exception,
  204. that tells the worker that the task is to be retried.
  205. :keyword countdown: Time in seconds to delay the retry for.
  206. :keyword eta: Explicit time and date to run the retry at (must be a
  207. :class:`datetime.datetime` instance).
  208. :keyword \*\*options: Any extra options to pass on to
  209. meth:`apply_async`. See :func:`celery.execute.apply_async`.
  210. :raises celery.exceptions.RetryTaskError: To tell the worker that the
  211. task has been re-sent for retry. This always happens except if
  212. the ``throw`` keyword argument has been explicitly set
  213. to ``False``.
  214. Example
  215. >>> class TwitterPostStatusTask(Task):
  216. ...
  217. ... def run(self, username, password, message, **kwargs):
  218. ... twitter = Twitter(username, password)
  219. ... try:
  220. ... twitter.post_status(message)
  221. ... except twitter.FailWhale, exc:
  222. ... # Retry in 5 minutes.
  223. ... self.retry([username, password, message], kwargs,
  224. ... countdown=60 * 5, exc=exc)
  225. """
  226. options["retries"] = kwargs.pop("task_retries", 0) + 1
  227. options["task_id"] = kwargs.pop("task_id", None)
  228. options["countdown"] = options.get("countdown",
  229. self.default_retry_delay)
  230. max_exc = exc or self.MaxRetriesExceededError(
  231. "Can't retry %s[%s] args:%s kwargs:%s" % (
  232. self.name, options["task_id"], args, kwargs))
  233. if options["retries"] > self.max_retries:
  234. raise max_exc
  235. # If task was executed eagerly using apply(),
  236. # then the retry must also be executed eagerly.
  237. if kwargs.get("task_is_eager", False):
  238. result = self.apply(args=args, kwargs=kwargs, **options)
  239. if isinstance(result, EagerResult):
  240. # get() propogates any exceptions.
  241. return result.get()
  242. return result
  243. self.apply_async(args=args, kwargs=kwargs, **options)
  244. if throw:
  245. message = "Retry in %d seconds." % options["countdown"]
  246. raise RetryTaskError(message, exc)
  247. def on_retry(self, exc, task_id, args, kwargs):
  248. """Retry handler.
  249. This is run by the worker when the task is to be retried.
  250. :param exc: The exception sent to :meth:`retry`.
  251. :param task_id: Unique id of the retried task.
  252. :param args: Original arguments for the retried task.
  253. :param kwargs: Original keyword arguments for the retried task.
  254. The return value of this handler is ignored.
  255. """
  256. pass
  257. def on_failure(self, exc, task_id, args, kwargs):
  258. """Error handler.
  259. This is run by the worker when the task fails.
  260. :param exc: The exception raised by the task.
  261. :param task_id: Unique id of the failed task.
  262. :param args: Original arguments for the task that failed.
  263. :param kwargs: Original keyword arguments for the task that failed.
  264. The return value of this handler is ignored.
  265. """
  266. pass
  267. def on_success(self, retval, task_id, args, kwargs):
  268. """Success handler.
  269. This is run by the worker when the task executed successfully.
  270. :param retval: The return value of the task.
  271. :param task_id: Unique id of the executed task.
  272. :param args: Original arguments for the executed task.
  273. :param kwargs: Original keyword arguments for the executed task.
  274. The return value of this handler is ignored.
  275. """
  276. pass
  277. @classmethod
  278. def apply(cls, args=None, kwargs=None, **options):
  279. """Execute this task at once, by blocking until the task
  280. has finished executing.
  281. :param args: positional arguments passed on to the task.
  282. :param kwargs: keyword arguments passed on to the task.
  283. :rtype: :class:`celery.result.EagerResult`
  284. See :func:`celery.execute.apply`.
  285. """
  286. return apply(cls, args, kwargs, **options)
  287. class ExecuteRemoteTask(Task):
  288. """Execute an arbitrary function or object.
  289. *Note* You probably want :func:`execute_remote` instead, which this
  290. is an internal component of.
  291. The object must be pickleable, so you can't use lambdas or functions
  292. defined in the REPL (that is the python shell, or ``ipython``).
  293. """
  294. name = "celery.execute_remote"
  295. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  296. """
  297. :param ser_callable: A pickled function or callable object.
  298. :param fargs: Positional arguments to apply to the function.
  299. :param fkwargs: Keyword arguments to apply to the function.
  300. """
  301. callable_ = pickle.loads(ser_callable)
  302. return callable_(*fargs, **fkwargs)
  303. tasks.register(ExecuteRemoteTask)
  304. class AsynchronousMapTask(Task):
  305. """Task used internally by :func:`dmap_async` and
  306. :meth:`TaskSet.map_async`. """
  307. name = "celery.map_async"
  308. def run(self, serfunc, args, **kwargs):
  309. """The method run by ``celeryd``."""
  310. timeout = kwargs.get("timeout")
  311. return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
  312. tasks.register(AsynchronousMapTask)
  313. class TaskSet(object):
  314. """A task containing several subtasks, making it possible
  315. to track how many, or when all of the tasks has been completed.
  316. :param task: The task class or name.
  317. Can either be a fully qualified task name, or a task class.
  318. :param args: A list of args, kwargs pairs.
  319. e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
  320. .. attribute:: task_name
  321. The name of the task.
  322. .. attribute:: arguments
  323. The arguments, as passed to the task set constructor.
  324. .. attribute:: total
  325. Total number of tasks in this task set.
  326. Example
  327. >>> from djangofeeds.tasks import RefreshFeedTask
  328. >>> taskset = TaskSet(RefreshFeedTask, args=[
  329. ... ([], {"feed_url": "http://cnn.com/rss"}),
  330. ... ([], {"feed_url": "http://bbc.com/rss"}),
  331. ... ([], {"feed_url": "http://xkcd.com/rss"})
  332. ... ])
  333. >>> taskset_result = taskset.run()
  334. >>> list_of_return_values = taskset.join()
  335. """
  336. def __init__(self, task, args):
  337. try:
  338. task_name = task.name
  339. task_obj = task
  340. except AttributeError:
  341. task_name = task
  342. task_obj = tasks[task_name]
  343. self.task = task_obj
  344. self.task_name = task_name
  345. self.arguments = args
  346. self.total = len(args)
  347. def run(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
  348. """Run all tasks in the taskset.
  349. :returns: A :class:`celery.result.TaskSetResult` instance.
  350. Example
  351. >>> ts = TaskSet(RefreshFeedTask, args=[
  352. ... (["http://foo.com/rss"], {}),
  353. ... (["http://bar.com/rss"], {}),
  354. ... ])
  355. >>> result = ts.run()
  356. >>> result.taskset_id
  357. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  358. >>> result.subtask_ids
  359. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  360. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  361. >>> result.waiting()
  362. True
  363. >>> time.sleep(10)
  364. >>> result.ready()
  365. True
  366. >>> result.successful()
  367. True
  368. >>> result.failed()
  369. False
  370. >>> result.join()
  371. [True, True]
  372. """
  373. taskset_id = gen_unique_id()
  374. from celery.conf import ALWAYS_EAGER
  375. if ALWAYS_EAGER:
  376. subtasks = [apply(self.task, args, kwargs)
  377. for args, kwargs in self.arguments]
  378. return TaskSetResult(taskset_id, subtasks)
  379. conn = DjangoBrokerConnection(connect_timeout=connect_timeout)
  380. publisher = TaskPublisher(connection=conn,
  381. exchange=self.task.exchange)
  382. subtasks = [apply_async(self.task, args, kwargs,
  383. taskset_id=taskset_id, publisher=publisher)
  384. for args, kwargs in self.arguments]
  385. publisher.close()
  386. conn.close()
  387. return TaskSetResult(taskset_id, subtasks)
  388. def join(self, timeout=None):
  389. """Gather the results for all of the tasks in the taskset,
  390. and return a list with them ordered by the order of which they
  391. were called.
  392. :keyword timeout: The time in seconds, how long
  393. it will wait for results, before the operation times out.
  394. :raises TimeoutError: if ``timeout`` is not ``None``
  395. and the operation takes longer than ``timeout`` seconds.
  396. If any of the tasks raises an exception, the exception
  397. will be reraised by :meth:`join`.
  398. :returns: list of return values for all tasks in the taskset.
  399. """
  400. return self.run().join(timeout=timeout)
  401. @classmethod
  402. def remote_execute(cls, func, args):
  403. """Apply ``args`` to function by distributing the args to the
  404. celery server(s)."""
  405. pickled = pickle.dumps(func)
  406. arguments = [[[pickled, arg, {}], {}] for arg in args]
  407. return cls(ExecuteRemoteTask, arguments)
  408. @classmethod
  409. def map(cls, func, args, timeout=None):
  410. """Distribute processing of the arguments and collect the results."""
  411. remote_task = cls.remote_execute(func, args)
  412. return remote_task.join(timeout=timeout)
  413. @classmethod
  414. def map_async(cls, func, args, timeout=None):
  415. """Distribute processing of the arguments and collect the results
  416. asynchronously.
  417. :returns: :class:`celery.result.AsyncResult` instance.
  418. """
  419. serfunc = pickle.dumps(func)
  420. return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
  421. class PeriodicTask(Task):
  422. """A periodic task is a task that behaves like a :manpage:`cron` job.
  423. .. attribute:: run_every
  424. *REQUIRED* Defines how often the task is run (its interval),
  425. it can be either a :class:`datetime.timedelta` object or an
  426. integer specifying the time in seconds.
  427. :raises NotImplementedError: if the :attr:`run_every` attribute is
  428. not defined.
  429. You have to register the periodic task in the task registry.
  430. Example
  431. >>> from celery.task import tasks, PeriodicTask
  432. >>> from datetime import timedelta
  433. >>> class MyPeriodicTask(PeriodicTask):
  434. ... name = "my_periodic_task"
  435. ... run_every = timedelta(seconds=30)
  436. ...
  437. ... def run(self, **kwargs):
  438. ... logger = self.get_logger(**kwargs)
  439. ... logger.info("Running MyPeriodicTask")
  440. >>> tasks.register(MyPeriodicTask)
  441. """
  442. run_every = timedelta(days=1)
  443. type = "periodic"
  444. def __init__(self):
  445. if not self.run_every:
  446. raise NotImplementedError(
  447. "Periodic tasks must have a run_every attribute")
  448. # If run_every is a integer, convert it to timedelta seconds.
  449. # Operate on the original class attribute so anyone accessing
  450. # it directly gets the right value.
  451. if isinstance(self.__class__.run_every, int):
  452. self.__class__.run_every = timedelta(seconds=self.run_every)
  453. super(PeriodicTask, self).__init__()