base.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586
  1. import sys
  2. from datetime import timedelta
  3. from Queue import Queue
  4. from carrot.connection import DjangoBrokerConnection
  5. from billiard.serialization import pickle
  6. from celery import conf
  7. from celery.log import setup_logger
  8. from celery.utils import gen_unique_id, get_full_cls_name
  9. from celery.result import TaskSetResult, EagerResult
  10. from celery.execute import apply_async, apply
  11. from celery.registry import tasks
  12. from celery.backends import default_backend
  13. from celery.messaging import TaskPublisher, TaskConsumer
  14. from celery.exceptions import MaxRetriesExceededError, RetryTaskError
  15. class TaskType(type):
  16. """Metaclass for tasks.
  17. Automatically registers the task in the task registry, except
  18. if the ``abstract`` attribute is set.
  19. If no ``name`` attribute is provided, the name is automatically
  20. set to the name of the module it was defined in, and the class name.
  21. """
  22. def __new__(cls, name, bases, attrs):
  23. super_new = super(TaskType, cls).__new__
  24. task_module = attrs["__module__"]
  25. # Abstract class, remove the abstract attribute so
  26. # any class inheriting from this won't be abstract by default.
  27. if attrs.pop("abstract", None):
  28. return super_new(cls, name, bases, attrs)
  29. # Automatically generate missing name.
  30. if not attrs.get("name"):
  31. task_module = sys.modules[task_module]
  32. task_name = ".".join([task_module.__name__, name])
  33. attrs["name"] = task_name
  34. # Because of the way import happens (recursively)
  35. # we may or may not be the first time the task tries to register
  36. # with the framework. There should only be one class for each task
  37. # name, so we always return the registered version.
  38. task_name = attrs["name"]
  39. if task_name not in tasks:
  40. task_cls = super_new(cls, name, bases, attrs)
  41. tasks.register(task_cls)
  42. return tasks[task_name].__class__
  43. class Task(object):
  44. """A celery task.
  45. All subclasses of :class:`Task` must define the :meth:`run` method,
  46. which is the actual method the ``celery`` daemon executes.
  47. The :meth:`run` method can take use of the default keyword arguments,
  48. as listed in the :meth:`run` documentation.
  49. .. attribute:: name
  50. Name of the task.
  51. .. attribute:: abstract
  52. If ``True`` the task is an abstract base class.
  53. .. attribute:: type
  54. The type of task, currently this can be ``regular``, or ``periodic``,
  55. however if you want a periodic task, you should subclass
  56. :class:`PeriodicTask` instead.
  57. .. attribute:: routing_key
  58. Override the global default ``routing_key`` for this task.
  59. .. attribute:: exchange
  60. Override the global default ``exchange`` for this task.
  61. .. attribute:: mandatory
  62. Mandatory message routing. An exception will be raised if the task
  63. can't be routed to a queue.
  64. .. attribute:: immediate:
  65. Request immediate delivery. An exception will be raised if the task
  66. can't be routed to a worker immediately.
  67. .. attribute:: priority:
  68. The message priority. A number from ``0`` to ``9``, where ``0`` is the
  69. highest. Note that RabbitMQ doesn't support priorities yet.
  70. .. attribute:: max_retries
  71. Maximum number of retries before giving up.
  72. .. attribute:: default_retry_delay
  73. Default time in seconds before a retry of the task should be
  74. executed. Default is a 1 minute delay.
  75. .. attribute:: rate_limit
  76. Set the rate limit for this task type, Examples: ``None`` (no rate
  77. limit), ``"100/s"`` (hundred tasks a second), ``"100/m"`` (hundred
  78. tasks a minute), ``"100/h"`` (hundred tasks an hour)
  79. .. attribute:: ignore_result
  80. Don't store the return value of this task.
  81. .. attribute:: disable_error_emails
  82. Disable all error e-mails for this task (only applicable if
  83. ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
  84. .. attribute:: serializer
  85. The name of a serializer that has been registered with
  86. :mod:`carrot.serialization.registry`. Example: ``"json"``.
  87. .. attribute:: backend
  88. The result store backend used for this task.
  89. The resulting class is callable, which if called will apply the
  90. :meth:`run` method.
  91. """
  92. __metaclass__ = TaskType
  93. name = None
  94. abstract = True
  95. type = "regular"
  96. exchange = None
  97. routing_key = None
  98. immediate = False
  99. mandatory = False
  100. priority = None
  101. ignore_result = False
  102. disable_error_emails = False
  103. max_retries = 3
  104. default_retry_delay = 3 * 60
  105. serializer = conf.TASK_SERIALIZER
  106. rate_limit = conf.DEFAULT_RATE_LIMIT
  107. rate_limit_queue_type = Queue
  108. backend = default_backend
  109. MaxRetriesExceededError = MaxRetriesExceededError
  110. def __init__(self):
  111. if not self.__class__.name:
  112. self.__class__.name = get_full_cls_name(self.__class__)
  113. def __call__(self, *args, **kwargs):
  114. return self.run(*args, **kwargs)
  115. def run(self, *args, **kwargs):
  116. """The body of the task executed by the worker.
  117. The following standard keyword arguments are reserved and is passed
  118. by the worker if the function/method supports them:
  119. * task_id
  120. * task_name
  121. * task_retries
  122. * logfile
  123. * loglevel
  124. Additional standard keyword arguments may be added in the future.
  125. To take these default arguments, the task can either list the ones
  126. it wants explicitly or just take an arbitrary list of keyword
  127. arguments (\*\*kwargs).
  128. """
  129. raise NotImplementedError("Tasks must define a run method.")
  130. def get_logger(self, **kwargs):
  131. """Get process-aware logger object.
  132. See :func:`celery.log.setup_logger`.
  133. """
  134. logfile = kwargs.get("logfile")
  135. loglevel = kwargs.get("loglevel")
  136. return setup_logger(loglevel=loglevel, logfile=logfile)
  137. def get_publisher(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
  138. """Get a celery task message publisher.
  139. :rtype: :class:`celery.messaging.TaskPublisher`.
  140. Please be sure to close the AMQP connection when you're done
  141. with this object, i.e.:
  142. >>> publisher = self.get_publisher()
  143. >>> # do something with publisher
  144. >>> publisher.connection.close()
  145. """
  146. connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
  147. return TaskPublisher(connection=connection,
  148. exchange=self.exchange,
  149. routing_key=self.routing_key)
  150. def get_consumer(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
  151. """Get a celery task message consumer.
  152. :rtype: :class:`celery.messaging.TaskConsumer`.
  153. Please be sure to close the AMQP connection when you're done
  154. with this object. i.e.:
  155. >>> consumer = self.get_consumer()
  156. >>> # do something with consumer
  157. >>> consumer.connection.close()
  158. """
  159. connection = DjangoBrokerConnection(connect_timeout=connect_timeout)
  160. return TaskConsumer(connection=connection, exchange=self.exchange,
  161. routing_key=self.routing_key)
  162. @classmethod
  163. def delay(cls, *args, **kwargs):
  164. """Shortcut to :meth:`apply_async` but with star arguments,
  165. and doesn't support the extra options.
  166. :param \*args: positional arguments passed on to the task.
  167. :param \*\*kwargs: keyword arguments passed on to the task.
  168. :rtype: :class:`celery.result.AsyncResult`
  169. """
  170. return apply_async(cls, args, kwargs)
  171. @classmethod
  172. def apply_async(cls, args=None, kwargs=None, **options):
  173. """Delay this task for execution by the ``celery`` daemon(s).
  174. :param args: positional arguments passed on to the task.
  175. :param kwargs: keyword arguments passed on to the task.
  176. :keyword \*\*options: Any keyword arguments to pass on to
  177. :func:`celery.execute.apply_async`.
  178. See :func:`celery.execute.apply_async` for more information.
  179. :rtype: :class:`celery.result.AsyncResult`
  180. """
  181. return apply_async(cls, args, kwargs, **options)
  182. def retry(self, args, kwargs, exc=None, throw=True, **options):
  183. """Retry the task.
  184. :param args: Positional arguments to retry with.
  185. :param kwargs: Keyword arguments to retry with.
  186. :keyword exc: Optional exception to raise instead of
  187. :exc:`MaxRestartsExceededError` when the max restart limit has
  188. been exceeded.
  189. :keyword throw: Do not raise the
  190. :exc:`celery.exceptions.RetryTaskError` exception,
  191. that tells the worker that the task is to be retried.
  192. :keyword countdown: Time in seconds to delay the retry for.
  193. :keyword eta: Explicit time and date to run the retry at (must be a
  194. :class:`datetime.datetime` instance).
  195. :keyword \*\*options: Any extra options to pass on to
  196. meth:`apply_async`. See :func:`celery.execute.apply_async`.
  197. :raises celery.exceptions.RetryTaskError: To tell the worker that the
  198. task has been re-sent for retry. This always happens except if
  199. the ``throw`` keyword argument has been explicitly set
  200. to ``False``.
  201. Example
  202. >>> class TwitterPostStatusTask(Task):
  203. ...
  204. ... def run(self, username, password, message, **kwargs):
  205. ... twitter = Twitter(username, password)
  206. ... try:
  207. ... twitter.post_status(message)
  208. ... except twitter.FailWhale, exc:
  209. ... # Retry in 5 minutes.
  210. ... self.retry([username, password, message], kwargs,
  211. ... countdown=60 * 5, exc=exc)
  212. """
  213. options["retries"] = kwargs.pop("task_retries", 0) + 1
  214. options["task_id"] = kwargs.pop("task_id", None)
  215. options["countdown"] = options.get("countdown",
  216. self.default_retry_delay)
  217. max_exc = exc or self.MaxRetriesExceededError(
  218. "Can't retry %s[%s] args:%s kwargs:%s" % (
  219. self.name, options["task_id"], args, kwargs))
  220. if options["retries"] > self.max_retries:
  221. raise max_exc
  222. # If task was executed eagerly using apply(),
  223. # then the retry must also be executed eagerly.
  224. if kwargs.get("task_is_eager", False):
  225. result = self.apply(args=args, kwargs=kwargs, **options)
  226. if isinstance(result, EagerResult):
  227. # get() propogates any exceptions.
  228. return result.get()
  229. return result
  230. self.apply_async(args=args, kwargs=kwargs, **options)
  231. if throw:
  232. message = "Retry in %d seconds." % options["countdown"]
  233. raise RetryTaskError(message, exc)
  234. def on_retry(self, exc, task_id, args, kwargs):
  235. """Retry handler.
  236. This is run by the worker when the task is to be retried.
  237. :param exc: The exception sent to :meth:`retry`.
  238. :param task_id: Unique id of the retried task.
  239. :param args: Original arguments for the retried task.
  240. :param kwargs: Original keyword arguments for the retried task.
  241. The return value of this handler is ignored.
  242. """
  243. pass
  244. def on_failure(self, exc, task_id, args, kwargs):
  245. """Error handler.
  246. This is run by the worker when the task fails.
  247. :param exc: The exception raised by the task.
  248. :param task_id: Unique id of the failed task.
  249. :param args: Original arguments for the task that failed.
  250. :param kwargs: Original keyword arguments for the task that failed.
  251. The return value of this handler is ignored.
  252. """
  253. pass
  254. def on_success(self, retval, task_id, args, kwargs):
  255. """Success handler.
  256. This is run by the worker when the task executed successfully.
  257. :param retval: The return value of the task.
  258. :param task_id: Unique id of the executed task.
  259. :param args: Original arguments for the executed task.
  260. :param kwargs: Original keyword arguments for the executed task.
  261. The return value of this handler is ignored.
  262. """
  263. pass
  264. @classmethod
  265. def apply(cls, args=None, kwargs=None, **options):
  266. """Execute this task at once, by blocking until the task
  267. has finished executing.
  268. :param args: positional arguments passed on to the task.
  269. :param kwargs: keyword arguments passed on to the task.
  270. :rtype: :class:`celery.result.EagerResult`
  271. See :func:`celery.execute.apply`.
  272. """
  273. return apply(cls, args, kwargs, **options)
  274. class ExecuteRemoteTask(Task):
  275. """Execute an arbitrary function or object.
  276. *Note* You probably want :func:`execute_remote` instead, which this
  277. is an internal component of.
  278. The object must be pickleable, so you can't use lambdas or functions
  279. defined in the REPL (that is the python shell, or ``ipython``).
  280. """
  281. name = "celery.execute_remote"
  282. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  283. """
  284. :param ser_callable: A pickled function or callable object.
  285. :param fargs: Positional arguments to apply to the function.
  286. :param fkwargs: Keyword arguments to apply to the function.
  287. """
  288. callable_ = pickle.loads(ser_callable)
  289. return callable_(*fargs, **fkwargs)
  290. class AsynchronousMapTask(Task):
  291. """Task used internally by :func:`dmap_async` and
  292. :meth:`TaskSet.map_async`. """
  293. name = "celery.map_async"
  294. def run(self, serfunc, args, **kwargs):
  295. """The method run by ``celeryd``."""
  296. timeout = kwargs.get("timeout")
  297. return TaskSet.map(pickle.loads(serfunc), args, timeout=timeout)
  298. class TaskSet(object):
  299. """A task containing several subtasks, making it possible
  300. to track how many, or when all of the tasks has been completed.
  301. :param task: The task class or name.
  302. Can either be a fully qualified task name, or a task class.
  303. :param args: A list of args, kwargs pairs.
  304. e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
  305. .. attribute:: task_name
  306. The name of the task.
  307. .. attribute:: arguments
  308. The arguments, as passed to the task set constructor.
  309. .. attribute:: total
  310. Total number of tasks in this task set.
  311. Example
  312. >>> from djangofeeds.tasks import RefreshFeedTask
  313. >>> taskset = TaskSet(RefreshFeedTask, args=[
  314. ... ([], {"feed_url": "http://cnn.com/rss"}),
  315. ... ([], {"feed_url": "http://bbc.com/rss"}),
  316. ... ([], {"feed_url": "http://xkcd.com/rss"})
  317. ... ])
  318. >>> taskset_result = taskset.run()
  319. >>> list_of_return_values = taskset_result.join()
  320. """
  321. def __init__(self, task, args):
  322. try:
  323. task_name = task.name
  324. task_obj = task
  325. except AttributeError:
  326. task_name = task
  327. task_obj = tasks[task_name]
  328. self.task = task_obj
  329. self.task_name = task_name
  330. self.arguments = args
  331. self.total = len(args)
  332. def run(self, connect_timeout=conf.AMQP_CONNECTION_TIMEOUT):
  333. """Run all tasks in the taskset.
  334. :returns: A :class:`celery.result.TaskSetResult` instance.
  335. Example
  336. >>> ts = TaskSet(RefreshFeedTask, args=[
  337. ... (["http://foo.com/rss"], {}),
  338. ... (["http://bar.com/rss"], {}),
  339. ... ])
  340. >>> result = ts.run()
  341. >>> result.taskset_id
  342. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  343. >>> result.subtask_ids
  344. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  345. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  346. >>> result.waiting()
  347. True
  348. >>> time.sleep(10)
  349. >>> result.ready()
  350. True
  351. >>> result.successful()
  352. True
  353. >>> result.failed()
  354. False
  355. >>> result.join()
  356. [True, True]
  357. """
  358. taskset_id = gen_unique_id()
  359. from celery.conf import ALWAYS_EAGER
  360. if ALWAYS_EAGER:
  361. subtasks = [apply(self.task, args, kwargs)
  362. for args, kwargs in self.arguments]
  363. return TaskSetResult(taskset_id, subtasks)
  364. conn = DjangoBrokerConnection(connect_timeout=connect_timeout)
  365. publisher = TaskPublisher(connection=conn,
  366. exchange=self.task.exchange)
  367. subtasks = [apply_async(self.task, args, kwargs,
  368. taskset_id=taskset_id, publisher=publisher)
  369. for args, kwargs in self.arguments]
  370. publisher.close()
  371. conn.close()
  372. return TaskSetResult(taskset_id, subtasks)
  373. @classmethod
  374. def remote_execute(cls, func, args):
  375. """Apply ``args`` to function by distributing the args to the
  376. celery server(s)."""
  377. pickled = pickle.dumps(func)
  378. arguments = [[[pickled, arg, {}], {}] for arg in args]
  379. return cls(ExecuteRemoteTask, arguments)
  380. @classmethod
  381. def map(cls, func, args, timeout=None):
  382. """Distribute processing of the arguments and collect the results."""
  383. remote_task = cls.remote_execute(func, args)
  384. return remote_task.run().join(timeout=timeout)
  385. @classmethod
  386. def map_async(cls, func, args, timeout=None):
  387. """Distribute processing of the arguments and collect the results
  388. asynchronously.
  389. :returns: :class:`celery.result.AsyncResult` instance.
  390. """
  391. serfunc = pickle.dumps(func)
  392. return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
  393. class PeriodicTask(Task):
  394. """A periodic task is a task that behaves like a :manpage:`cron` job.
  395. .. attribute:: run_every
  396. *REQUIRED* Defines how often the task is run (its interval),
  397. it can be either a :class:`datetime.timedelta` object or an
  398. integer specifying the time in seconds.
  399. :raises NotImplementedError: if the :attr:`run_every` attribute is
  400. not defined.
  401. Example
  402. >>> from celery.task import tasks, PeriodicTask
  403. >>> from datetime import timedelta
  404. >>> class MyPeriodicTask(PeriodicTask):
  405. ... name = "my_periodic_task"
  406. ... run_every = timedelta(seconds=30)
  407. ...
  408. ... def run(self, **kwargs):
  409. ... logger = self.get_logger(**kwargs)
  410. ... logger.info("Running MyPeriodicTask")
  411. """
  412. abstract = True
  413. run_every = timedelta(days=1)
  414. ignore_result = True
  415. type = "periodic"
  416. def __init__(self):
  417. if not self.run_every:
  418. raise NotImplementedError(
  419. "Periodic tasks must have a run_every attribute")
  420. # If run_every is a integer, convert it to timedelta seconds.
  421. # Operate on the original class attribute so anyone accessing
  422. # it directly gets the right value.
  423. if isinstance(self.__class__.run_every, int):
  424. self.__class__.run_every = timedelta(seconds=self.run_every)
  425. super(PeriodicTask, self).__init__()