base.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682
  1. import sys
  2. import warnings
  3. from datetime import datetime, timedelta
  4. from Queue import Queue
  5. from billiard.serialization import pickle
  6. from celery import conf
  7. from celery.log import setup_logger
  8. from celery.utils import gen_unique_id, get_full_cls_name, mexpand
  9. from celery.result import BaseAsyncResult, TaskSetResult, EagerResult
  10. from celery.execute import apply_async, apply
  11. from celery.registry import tasks
  12. from celery.backends import default_backend
  13. from celery.messaging import TaskPublisher, TaskConsumer
  14. from celery.messaging import establish_connection as _establish_connection
  15. from celery.exceptions import MaxRetriesExceededError, RetryTaskError
  16. class TaskType(type):
  17. """Metaclass for tasks.
  18. Automatically registers the task in the task registry, except
  19. if the ``abstract`` attribute is set.
  20. If no ``name`` attribute is provided, the name is automatically
  21. set to the name of the module it was defined in, and the class name.
  22. """
  23. def __new__(cls, name, bases, attrs):
  24. super_new = super(TaskType, cls).__new__
  25. task_module = attrs["__module__"]
  26. # Abstract class, remove the abstract attribute so
  27. # any class inheriting from this won't be abstract by default.
  28. if attrs.pop("abstract", None) or not attrs.get("autoregister", True):
  29. return super_new(cls, name, bases, attrs)
  30. # Automatically generate missing name.
  31. if not attrs.get("name"):
  32. task_module = sys.modules[task_module]
  33. task_name = ".".join([task_module.__name__, name])
  34. attrs["name"] = task_name
  35. # Because of the way import happens (recursively)
  36. # we may or may not be the first time the task tries to register
  37. # with the framework. There should only be one class for each task
  38. # name, so we always return the registered version.
  39. task_name = attrs["name"]
  40. if task_name not in tasks:
  41. task_cls = super_new(cls, name, bases, attrs)
  42. tasks.register(task_cls)
  43. return tasks[task_name].__class__
  44. class Task(object):
  45. """A celery task.
  46. All subclasses of :class:`Task` must define the :meth:`run` method,
  47. which is the actual method the ``celery`` daemon executes.
  48. The :meth:`run` method can take use of the default keyword arguments,
  49. as listed in the :meth:`run` documentation.
  50. .. attribute:: name
  51. Name of the task.
  52. .. attribute:: abstract
  53. If ``True`` the task is an abstract base class.
  54. .. attribute:: type
  55. The type of task, currently this can be ``regular``, or ``periodic``,
  56. however if you want a periodic task, you should subclass
  57. :class:`PeriodicTask` instead.
  58. .. attribute:: routing_key
  59. Override the global default ``routing_key`` for this task.
  60. .. attribute:: exchange
  61. Override the global default ``exchange`` for this task.
  62. .. attribute:: mandatory
  63. Mandatory message routing. An exception will be raised if the task
  64. can't be routed to a queue.
  65. .. attribute:: immediate:
  66. Request immediate delivery. An exception will be raised if the task
  67. can't be routed to a worker immediately.
  68. .. attribute:: priority:
  69. The message priority. A number from ``0`` to ``9``, where ``0`` is the
  70. highest. Note that RabbitMQ doesn't support priorities yet.
  71. .. attribute:: max_retries
  72. Maximum number of retries before giving up.
  73. .. attribute:: default_retry_delay
  74. Default time in seconds before a retry of the task should be
  75. executed. Default is a 1 minute delay.
  76. .. attribute:: rate_limit
  77. Set the rate limit for this task type, Examples: ``None`` (no rate
  78. limit), ``"100/s"`` (hundred tasks a second), ``"100/m"`` (hundred
  79. tasks a minute), ``"100/h"`` (hundred tasks an hour)
  80. .. attribute:: rate_limit_queue_type
  81. Type of queue used by the rate limiter for this kind of tasks.
  82. Default is a :class:`Queue.Queue`, but you can change this to
  83. a :class:`Queue.LifoQueue` or an invention of your own.
  84. .. attribute:: ignore_result
  85. Don't store the return value of this task.
  86. .. attribute:: disable_error_emails
  87. Disable all error e-mails for this task (only applicable if
  88. ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
  89. .. attribute:: serializer
  90. The name of a serializer that has been registered with
  91. :mod:`carrot.serialization.registry`. Example: ``"json"``.
  92. .. attribute:: backend
  93. The result store backend used for this task.
  94. .. attribute:: autoregister
  95. If ``True`` the task is automatically registered in the task
  96. registry, which is the default behaviour.
  97. The resulting class is callable, which if called will apply the
  98. :meth:`run` method.
  99. """
  100. __metaclass__ = TaskType
  101. name = None
  102. abstract = True
  103. autoregister = True
  104. type = "regular"
  105. exchange = None
  106. routing_key = None
  107. immediate = False
  108. mandatory = False
  109. priority = None
  110. ignore_result = conf.IGNORE_RESULT
  111. disable_error_emails = False
  112. max_retries = 3
  113. default_retry_delay = 3 * 60
  114. serializer = conf.TASK_SERIALIZER
  115. rate_limit = conf.DEFAULT_RATE_LIMIT
  116. rate_limit_queue_type = Queue
  117. backend = default_backend
  118. MaxRetriesExceededError = MaxRetriesExceededError
  119. def __call__(self, *args, **kwargs):
  120. return self.run(*args, **kwargs)
  121. def run(self, *args, **kwargs):
  122. """The body of the task executed by the worker.
  123. The following standard keyword arguments are reserved and is passed
  124. by the worker if the function/method supports them:
  125. * task_id
  126. * task_name
  127. * task_retries
  128. * logfile
  129. * loglevel
  130. Additional standard keyword arguments may be added in the future.
  131. To take these default arguments, the task can either list the ones
  132. it wants explicitly or just take an arbitrary list of keyword
  133. arguments (\*\*kwargs).
  134. """
  135. raise NotImplementedError("Tasks must define the run method.")
  136. @classmethod
  137. def get_logger(self, loglevel=None, logfile=None, **kwargs):
  138. """Get process-aware logger object.
  139. See :func:`celery.log.setup_logger`.
  140. """
  141. return setup_logger(loglevel=loglevel, logfile=logfile)
  142. @classmethod
  143. def establish_connection(self,
  144. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  145. """Establish a connection to the message broker."""
  146. return _establish_connection(connect_timeout)
  147. @classmethod
  148. def get_publisher(self, connection=None, exchange=None,
  149. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  150. """Get a celery task message publisher.
  151. :rtype: :class:`celery.messaging.TaskPublisher`.
  152. Please be sure to close the AMQP connection when you're done
  153. with this object, i.e.:
  154. >>> publisher = self.get_publisher()
  155. >>> # do something with publisher
  156. >>> publisher.connection.close()
  157. """
  158. if exchange is None:
  159. exchange = self.exchange
  160. connection = connection or self.establish_connection(connect_timeout)
  161. return TaskPublisher(connection=connection,
  162. exchange=exchange,
  163. routing_key=self.routing_key)
  164. @classmethod
  165. def get_consumer(self, connection=None,
  166. connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  167. """Get a celery task message consumer.
  168. :rtype: :class:`celery.messaging.TaskConsumer`.
  169. Please be sure to close the AMQP connection when you're done
  170. with this object. i.e.:
  171. >>> consumer = self.get_consumer()
  172. >>> # do something with consumer
  173. >>> consumer.connection.close()
  174. """
  175. connection = connection or self.establish_connection(connect_timeout)
  176. return TaskConsumer(connection=connection, exchange=self.exchange,
  177. routing_key=self.routing_key)
  178. @classmethod
  179. def delay(self, *args, **kwargs):
  180. """Shortcut to :meth:`apply_async`, with star arguments,
  181. but doesn't support the extra options.
  182. :param \*args: positional arguments passed on to the task.
  183. :param \*\*kwargs: keyword arguments passed on to the task.
  184. :returns: :class:`celery.result.AsyncResult`
  185. """
  186. return self.apply_async(args, kwargs)
  187. @classmethod
  188. def apply_async(self, args=None, kwargs=None, **options):
  189. """Delay this task for execution by the ``celery`` daemon(s).
  190. :param args: positional arguments passed on to the task.
  191. :param kwargs: keyword arguments passed on to the task.
  192. :keyword \*\*options: Any keyword arguments to pass on to
  193. :func:`celery.execute.apply_async`.
  194. See :func:`celery.execute.apply_async` for more information.
  195. :rtype: :class:`celery.result.AsyncResult`
  196. """
  197. return apply_async(self, args, kwargs, **options)
  198. @classmethod
  199. def retry(self, args, kwargs, exc=None, throw=True, **options):
  200. """Retry the task.
  201. :param args: Positional arguments to retry with.
  202. :param kwargs: Keyword arguments to retry with.
  203. :keyword exc: Optional exception to raise instead of
  204. :exc:`MaxRestartsExceededError` when the max restart limit has
  205. been exceeded.
  206. :keyword countdown: Time in seconds to delay the retry for.
  207. :keyword eta: Explicit time and date to run the retry at (must be a
  208. :class:`datetime.datetime` instance).
  209. :keyword throw: If this is ``False``, do not raise the
  210. :exc:`celery.exceptions.RetryTaskError` exception,
  211. that tells the worker that the task is to be retried.
  212. :keyword \*\*options: Any extra options to pass on to
  213. meth:`apply_async`. See :func:`celery.execute.apply_async`.
  214. :raises celery.exceptions.RetryTaskError: To tell the worker that the
  215. task has been re-sent for retry. This always happens except if
  216. the ``throw`` keyword argument has been explicitly set
  217. to ``False``.
  218. Example
  219. >>> class TwitterPostStatusTask(Task):
  220. ...
  221. ... def run(self, username, password, message, **kwargs):
  222. ... twitter = Twitter(username, password)
  223. ... try:
  224. ... twitter.post_status(message)
  225. ... except twitter.FailWhale, exc:
  226. ... # Retry in 5 minutes.
  227. ... self.retry([username, password, message], kwargs,
  228. ... countdown=60 * 5, exc=exc)
  229. """
  230. options["retries"] = kwargs.pop("task_retries", 0) + 1
  231. options["task_id"] = kwargs.pop("task_id", None)
  232. options["countdown"] = options.get("countdown",
  233. self.default_retry_delay)
  234. max_exc = exc or self.MaxRetriesExceededError(
  235. "Can't retry %s[%s] args:%s kwargs:%s" % (
  236. self.name, options["task_id"], args, kwargs))
  237. if options["retries"] > self.max_retries:
  238. raise max_exc
  239. # If task was executed eagerly using apply(),
  240. # then the retry must also be executed eagerly.
  241. if kwargs.get("task_is_eager", False):
  242. result = self.apply(args=args, kwargs=kwargs, **options)
  243. if isinstance(result, EagerResult):
  244. return result.get() # propogates exceptions.
  245. return result
  246. self.apply_async(args=args, kwargs=kwargs, **options)
  247. if throw:
  248. message = "Retry in %d seconds." % options["countdown"]
  249. raise RetryTaskError(message, exc)
  250. @classmethod
  251. def apply(self, args=None, kwargs=None, **options):
  252. """Execute this task at once, by blocking until the task
  253. has finished executing.
  254. :param args: positional arguments passed on to the task.
  255. :param kwargs: keyword arguments passed on to the task.
  256. :rtype: :class:`celery.result.EagerResult`
  257. See :func:`celery.execute.apply`.
  258. """
  259. return apply(self, args, kwargs, **options)
  260. @classmethod
  261. def AsyncResult(self, task_id):
  262. return BaseAsyncResult(task_id, backend=self.backend)
  263. def on_retry(self, exc, task_id, args, kwargs):
  264. """Retry handler.
  265. This is run by the worker when the task is to be retried.
  266. :param exc: The exception sent to :meth:`retry`.
  267. :param task_id: Unique id of the retried task.
  268. :param args: Original arguments for the retried task.
  269. :param kwargs: Original keyword arguments for the retried task.
  270. The return value of this handler is ignored.
  271. """
  272. pass
  273. def on_failure(self, exc, task_id, args, kwargs):
  274. """Error handler.
  275. This is run by the worker when the task fails.
  276. :param exc: The exception raised by the task.
  277. :param task_id: Unique id of the failed task.
  278. :param args: Original arguments for the task that failed.
  279. :param kwargs: Original keyword arguments for the task that failed.
  280. The return value of this handler is ignored.
  281. """
  282. pass
  283. def on_success(self, retval, task_id, args, kwargs):
  284. """Success handler.
  285. Run by the worker if the task executes successfully.
  286. :param retval: The return value of the task.
  287. :param task_id: Unique id of the executed task.
  288. :param args: Original arguments for the executed task.
  289. :param kwargs: Original keyword arguments for the executed task.
  290. The return value of this handler is ignored.
  291. """
  292. pass
  293. class ExecuteRemoteTask(Task):
  294. """Execute an arbitrary function or object.
  295. *Note* You probably want :func:`execute_remote` instead, which this
  296. is an internal component of.
  297. The object must be pickleable, so you can't use lambdas or functions
  298. defined in the REPL (that is the python shell, or ``ipython``).
  299. """
  300. name = "celery.execute_remote"
  301. def run(self, ser_callable, fargs, fkwargs, **kwargs):
  302. """
  303. :param ser_callable: A pickled function or callable object.
  304. :param fargs: Positional arguments to apply to the function.
  305. :param fkwargs: Keyword arguments to apply to the function.
  306. """
  307. return pickle.loads(ser_callable)(*fargs, **fkwargs)
  308. class AsynchronousMapTask(Task):
  309. """Task used internally by :func:`dmap_async` and
  310. :meth:`TaskSet.map_async`. """
  311. name = "celery.map_async"
  312. def run(self, ser_callable, args, timeout=None, **kwargs):
  313. """:see :meth:`TaskSet.dmap_async`."""
  314. return TaskSet.map(pickle.loads(ser_callable), args, timeout=timeout)
  315. class TaskSet(object):
  316. """A task containing several subtasks, making it possible
  317. to track how many, or when all of the tasks has been completed.
  318. :param task: The task class or name.
  319. Can either be a fully qualified task name, or a task class.
  320. :param args: A list of args, kwargs pairs.
  321. e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
  322. .. attribute:: task_name
  323. The name of the task.
  324. .. attribute:: arguments
  325. The arguments, as passed to the task set constructor.
  326. .. attribute:: total
  327. Total number of tasks in this task set.
  328. Example
  329. >>> from djangofeeds.tasks import RefreshFeedTask
  330. >>> taskset = TaskSet(RefreshFeedTask, args=[
  331. ... ([], {"feed_url": "http://cnn.com/rss"}),
  332. ... ([], {"feed_url": "http://bbc.com/rss"}),
  333. ... ([], {"feed_url": "http://xkcd.com/rss"})
  334. ... ])
  335. >>> taskset_result = taskset.apply_async()
  336. >>> list_of_return_values = taskset_result.join()
  337. """
  338. def __init__(self, task, args):
  339. try:
  340. task_name = task.name
  341. task_obj = task
  342. except AttributeError:
  343. task_name = task
  344. task_obj = tasks[task_name]
  345. # Get task instance
  346. task_obj = tasks[task_obj.name]
  347. self.task = task_obj
  348. self.task_name = task_name
  349. self.arguments = args
  350. self.total = len(args)
  351. def run(self, *args, **kwargs):
  352. """Deprecated alias to :meth:`apply_async`"""
  353. warnings.warn(PendingDeprecationWarning(
  354. "TaskSet.run will be deprecated in favor of TaskSet.apply_async "
  355. "in celery v1.2.0"))
  356. return self.apply_async(*args, **kwargs)
  357. def apply_async(self, connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
  358. """Run all tasks in the taskset.
  359. :returns: A :class:`celery.result.TaskSetResult` instance.
  360. Example
  361. >>> ts = TaskSet(RefreshFeedTask, args=[
  362. ... (["http://foo.com/rss"], {}),
  363. ... (["http://bar.com/rss"], {}),
  364. ... ])
  365. >>> result = ts.run()
  366. >>> result.taskset_id
  367. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  368. >>> result.subtask_ids
  369. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  370. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  371. >>> result.waiting()
  372. True
  373. >>> time.sleep(10)
  374. >>> result.ready()
  375. True
  376. >>> result.successful()
  377. True
  378. >>> result.failed()
  379. False
  380. >>> result.join()
  381. [True, True]
  382. """
  383. if conf.ALWAYS_EAGER:
  384. return self.apply()
  385. taskset_id = gen_unique_id()
  386. conn = self.task.establish_connection(connect_timeout=connect_timeout)
  387. publisher = self.task.get_publisher(connection=conn)
  388. try:
  389. subtasks = [self.apply_part(arglist, taskset_id, publisher)
  390. for arglist in self.arguments]
  391. finally:
  392. publisher.close()
  393. conn.close()
  394. return TaskSetResult(taskset_id, subtasks)
  395. def apply_part(self, arglist, taskset_id, publisher):
  396. args, kwargs, opts = mexpand(arglist, 3, default={})
  397. return apply_async(self.task, args, kwargs,
  398. taskset_id=taskset_id, publisher=publisher, **opts)
  399. def apply(self):
  400. taskset_id = gen_unique_id()
  401. subtasks = [apply(self.task, args, kwargs)
  402. for args, kwargs in self.arguments]
  403. return TaskSetResult(taskset_id, subtasks)
  404. @classmethod
  405. def remote_execute(cls, func, args):
  406. """Apply ``args`` to function by distributing the args to the
  407. celery server(s)."""
  408. pickled = pickle.dumps(func)
  409. arguments = [[[pickled, arg, {}], {}] for arg in args]
  410. return cls(ExecuteRemoteTask, arguments)
  411. @classmethod
  412. def map(cls, func, args, timeout=None):
  413. """Distribute processing of the arguments and collect the results."""
  414. remote_task = cls.remote_execute(func, args)
  415. return remote_task.run().join(timeout=timeout)
  416. @classmethod
  417. def map_async(cls, func, args, timeout=None):
  418. """Distribute processing of the arguments and collect the results
  419. asynchronously.
  420. :returns: :class:`celery.result.AsyncResult` instance.
  421. """
  422. serfunc = pickle.dumps(func)
  423. return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
  424. class PeriodicTask(Task):
  425. """A periodic task is a task that behaves like a :manpage:`cron` job.
  426. Results of periodic tasks are not stored by default.
  427. .. attribute:: run_every
  428. *REQUIRED* Defines how often the task is run (its interval),
  429. it can be either a :class:`datetime.timedelta` object or an
  430. integer specifying the time in seconds.
  431. .. attribute:: relative
  432. If set to ``True``, run times are relative to the time when the
  433. server was started. This was the previous behaviour, periodic tasks
  434. are now scheduled by the clock.
  435. :raises NotImplementedError: if the :attr:`run_every` attribute is
  436. not defined.
  437. Example
  438. >>> from celery.task import tasks, PeriodicTask
  439. >>> from datetime import timedelta
  440. >>> class MyPeriodicTask(PeriodicTask):
  441. ... run_every = timedelta(seconds=30)
  442. ...
  443. ... def run(self, **kwargs):
  444. ... logger = self.get_logger(**kwargs)
  445. ... logger.info("Running MyPeriodicTask")
  446. """
  447. abstract = True
  448. ignore_result = True
  449. type = "periodic"
  450. relative = False
  451. def __init__(self):
  452. if not hasattr(self, "run_every"):
  453. raise NotImplementedError(
  454. "Periodic tasks must have a run_every attribute")
  455. # If run_every is a integer, convert it to timedelta seconds.
  456. # Operate on the original class attribute so anyone accessing
  457. # it directly gets the right value.
  458. if isinstance(self.__class__.run_every, int):
  459. self.__class__.run_every = timedelta(seconds=self.run_every)
  460. super(PeriodicTask, self).__init__()
  461. def remaining_estimate(self, last_run_at):
  462. """Returns when the periodic task should run next as a timedelta."""
  463. next_run_at = last_run_at + self.run_every
  464. if not self.relative:
  465. next_run_at = self.delta_resolution(next_run_at, self.run_every)
  466. return next_run_at - datetime.now()
  467. def timedelta_seconds(self, delta):
  468. """Convert :class:`datetime.timedelta` to seconds.
  469. Doesn't account for negative timedeltas.
  470. """
  471. if delta.days < 0:
  472. return 0
  473. return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5)
  474. def is_due(self, last_run_at):
  475. """Returns tuple of two items ``(is_due, next_time_to_run)``,
  476. where next time to run is in seconds.
  477. e.g.
  478. * ``(True, 20)``, means the task should be run now, and the next
  479. time to run is in 20 seconds.
  480. * ``(False, 12)``, means the task should be run in 12 seconds.
  481. You can override this to decide the interval at runtime,
  482. but keep in mind the value of ``CELERYBEAT_MAX_LOOP_INTERVAL``, which
  483. decides the maximum number of seconds celerybeat can sleep between
  484. re-checking the periodic task intervals. So if you dynamically change
  485. the next run at value, and the max interval is set to 5 minutes, it
  486. will take 5 minutes for the change to take effect, so you may
  487. consider lowering the value of ``CELERYBEAT_MAX_LOOP_INTERVAL`` if
  488. responsiveness if of importance to you.
  489. """
  490. rem_delta = self.remaining_estimate(last_run_at)
  491. rem = self.timedelta_seconds(rem_delta)
  492. if rem == 0:
  493. return True, self.timedelta_seconds(self.run_every)
  494. return False, rem
  495. def delta_resolution(self, dt, delta):
  496. resolution = {3: lambda x: x / 86400,
  497. 4: lambda x: x / 3600,
  498. 5: lambda x: x / 60}
  499. args = dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second
  500. r = None
  501. for res, calc in resolution.items():
  502. if calc(self.timedelta_seconds(delta)):
  503. r = res
  504. return datetime(*args[:r])