base.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. from carrot.connection import DjangoAMQPConnection
  2. from celery.conf import AMQP_CONNECTION_TIMEOUT
  3. from celery.messaging import TaskPublisher, TaskConsumer
  4. from celery.log import setup_logger
  5. from celery.result import TaskSetResult
  6. from celery.execute import apply_async, delay_task
  7. from datetime import timedelta
  8. import uuid
  9. try:
  10. import cPickle as pickle
  11. except ImportError:
  12. import pickle
  13. class Task(object):
  14. """A task that can be delayed for execution by the ``celery`` daemon.
  15. All subclasses of :class:`Task` must define the :meth:`run` method,
  16. which is the actual method the ``celery`` daemon executes.
  17. The :meth:`run` method supports both positional, and keyword arguments.
  18. .. attribute:: name
  19. *REQUIRED* All subclasses of :class:`Task` has to define the
  20. :attr:`name` attribute. This is the name of the task, registered
  21. in the task registry, and passed to :func:`delay_task`.
  22. .. attribute:: type
  23. The type of task, currently this can be ``regular``, or ``periodic``,
  24. however if you want a periodic task, you should subclass
  25. :class:`PeriodicTask` instead.
  26. .. attribute:: routing_key
  27. Override the global default ``routing_key`` for this task.
  28. .. attribute:: mandatory
  29. If set, the message has mandatory routing. By default the message
  30. is silently dropped by the broker if it can't be routed to a queue.
  31. However - If the message is mandatory, an exception will be raised
  32. instead.
  33. .. attribute:: immediate:
  34. Request immediate delivery. If the message cannot be routed to a
  35. task worker immediately, an exception will be raised. This is
  36. instead of the default behaviour, where the broker will accept and
  37. queue the message, but with no guarantee that the message will ever
  38. be consumed.
  39. .. attribute:: priority:
  40. The message priority. A number from ``0`` to ``9``.
  41. .. attribute:: ignore_result
  42. Don't store the status and return value. This means you can't
  43. use the :class:`celery.result.AsyncResult` to check if the task is
  44. done, or get its return value. Only use if you need the performance
  45. and is able live without these features. Any exceptions raised will
  46. store the return value/status as usual.
  47. .. attribute:: disable_error_emails
  48. Disable all error e-mails for this task (only applicable if
  49. ``settings.SEND_CELERY_ERROR_EMAILS`` is on.)
  50. :raises NotImplementedError: if the :attr:`name` attribute is not set.
  51. The resulting class is callable, which if called will apply the
  52. :meth:`run` method.
  53. Examples
  54. This is a simple task just logging a message,
  55. >>> from celery.task import tasks, Task
  56. >>> class MyTask(Task):
  57. ... name = "mytask"
  58. ...
  59. ... def run(self, some_arg=None, **kwargs):
  60. ... logger = self.get_logger(**kwargs)
  61. ... logger.info("Running MyTask with arg some_arg=%s" %
  62. ... some_arg))
  63. ... return 42
  64. ... tasks.register(MyTask)
  65. You can delay the task using the classmethod :meth:`delay`...
  66. >>> result = MyTask.delay(some_arg="foo")
  67. >>> result.status # after some time
  68. 'DONE'
  69. >>> result.result
  70. 42
  71. ...or using the :func:`delay_task` function, by passing the name of
  72. the task.
  73. >>> from celery.task import delay_task
  74. >>> result = delay_task(MyTask.name, some_arg="foo")
  75. """
  76. name = None
  77. type = "regular"
  78. routing_key = None
  79. immediate = False
  80. mandatory = False
  81. priority = None
  82. ignore_result = False
  83. disable_error_emails = False
  84. def __init__(self):
  85. if not self.name:
  86. raise NotImplementedError("Tasks must define a name attribute.")
  87. def __call__(self, *args, **kwargs):
  88. return self.run(*args, **kwargs)
  89. def run(self, *args, **kwargs):
  90. """*REQUIRED* The actual task.
  91. All subclasses of :class:`Task` must define the run method.
  92. :raises NotImplementedError: by default, so you have to override
  93. this method in your subclass.
  94. """
  95. raise NotImplementedError("Tasks must define a run method.")
  96. def get_logger(self, **kwargs):
  97. """Get process-aware logger object.
  98. See :func:`celery.log.setup_logger`.
  99. """
  100. return setup_logger(**kwargs)
  101. def get_publisher(self):
  102. """Get a celery task message publisher.
  103. :rtype: :class:`celery.messaging.TaskPublisher`.
  104. Please be sure to close the AMQP connection when you're done
  105. with this object, i.e.:
  106. >>> publisher = self.get_publisher()
  107. >>> # do something with publisher
  108. >>> publisher.connection.close()
  109. """
  110. return TaskPublisher(connection=DjangoAMQPConnection(
  111. connect_timeout=AMQP_CONNECTION_TIMEOUT))
  112. def get_consumer(self):
  113. """Get a celery task message consumer.
  114. :rtype: :class:`celery.messaging.TaskConsumer`.
  115. Please be sure to close the AMQP connection when you're done
  116. with this object. i.e.:
  117. >>> consumer = self.get_consumer()
  118. >>> # do something with consumer
  119. >>> consumer.connection.close()
  120. """
  121. return TaskConsumer(connection=DjangoAMQPConnection(
  122. connect_timeout=AMQP_CONNECTION_TIMEOUT))
  123. @classmethod
  124. def delay(cls, *args, **kwargs):
  125. """Delay this task for execution by the ``celery`` daemon(s).
  126. :param \*args: positional arguments passed on to the task.
  127. :param \*\*kwargs: keyword arguments passed on to the task.
  128. :rtype: :class:`celery.result.AsyncResult`
  129. See :func:`delay_task`.
  130. """
  131. return apply_async(cls, args, kwargs)
  132. @classmethod
  133. def apply_async(cls, args=None, kwargs=None, **options):
  134. """Delay this task for execution by the ``celery`` daemon(s).
  135. :param args: positional arguments passed on to the task.
  136. :param kwargs: keyword arguments passed on to the task.
  137. :rtype: :class:`celery.result.AsyncResult`
  138. See :func:`apply_async`.
  139. """
  140. return apply_async(cls, args, kwargs, **options)
  141. class TaskSet(object):
  142. """A task containing several subtasks, making it possible
  143. to track how many, or when all of the tasks has been completed.
  144. :param task: The task class or name.
  145. Can either be a fully qualified task name, or a task class.
  146. :param args: A list of args, kwargs pairs.
  147. e.g. ``[[args1, kwargs1], [args2, kwargs2], ..., [argsN, kwargsN]]``
  148. .. attribute:: task_name
  149. The name of the task.
  150. .. attribute:: arguments
  151. The arguments, as passed to the task set constructor.
  152. .. attribute:: total
  153. Total number of tasks in this task set.
  154. Example
  155. >>> from djangofeeds.tasks import RefreshFeedTask
  156. >>> taskset = TaskSet(RefreshFeedTask, args=[
  157. ... [], {"feed_url": "http://cnn.com/rss"},
  158. ... [], {"feed_url": "http://bbc.com/rss"},
  159. ... [], {"feed_url": "http://xkcd.com/rss"}])
  160. >>> taskset_result = taskset.run()
  161. >>> list_of_return_values = taskset.join()
  162. """
  163. def __init__(self, task, args):
  164. try:
  165. task_name = task.name
  166. task_obj = task
  167. except AttributeError:
  168. task_name = task
  169. task_obj = tasks[task_name]
  170. self.task = task_obj
  171. self.task_name = task_name
  172. self.arguments = args
  173. self.total = len(args)
  174. def run(self, connect_timeout=AMQP_CONNECTION_TIMEOUT):
  175. """Run all tasks in the taskset.
  176. :returns: A :class:`celery.result.TaskSetResult` instance.
  177. Example
  178. >>> ts = TaskSet(RefreshFeedTask, [
  179. ... ["http://foo.com/rss", {}],
  180. ... ["http://bar.com/rss", {}],
  181. ... )
  182. >>> result = ts.run()
  183. >>> result.taskset_id
  184. "d2c9b261-8eff-4bfb-8459-1e1b72063514"
  185. >>> result.subtask_ids
  186. ["b4996460-d959-49c8-aeb9-39c530dcde25",
  187. "598d2d18-ab86-45ca-8b4f-0779f5d6a3cb"]
  188. >>> result.waiting()
  189. True
  190. >>> time.sleep(10)
  191. >>> result.ready()
  192. True
  193. >>> result.successful()
  194. True
  195. >>> result.failed()
  196. False
  197. >>> result.join()
  198. [True, True]
  199. """
  200. taskset_id = str(uuid.uuid4())
  201. conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
  202. publisher = TaskPublisher(connection=conn)
  203. subtasks = [apply_async(self.task, args, kwargs,
  204. taskset_id=taskset_id, publisher=publisher)
  205. for args, kwargs in self.arguments]
  206. publisher.close()
  207. conn.close()
  208. return TaskSetResult(taskset_id, subtasks)
  209. def iterate(self):
  210. """Iterate over the results returned after calling :meth:`run`.
  211. If any of the tasks raises an exception, the exception will
  212. be re-raised.
  213. """
  214. return iter(self.run())
  215. def join(self, timeout=None):
  216. """Gather the results for all of the tasks in the taskset,
  217. and return a list with them ordered by the order of which they
  218. were called.
  219. :keyword timeout: The time in seconds, how long
  220. it will wait for results, before the operation times out.
  221. :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
  222. and the operation takes longer than ``timeout`` seconds.
  223. If any of the tasks raises an exception, the exception
  224. will be reraised by :meth:`join`.
  225. :returns: list of return values for all tasks in the taskset.
  226. """
  227. return self.run().join(timeout=timeout)
  228. @classmethod
  229. def remote_execute(cls, func, args):
  230. """Apply ``args`` to function by distributing the args to the
  231. celery server(s)."""
  232. pickled = pickle.dumps(func)
  233. arguments = [[[pickled, arg, {}], {}] for arg in args]
  234. return cls(ExecuteRemoteTask, arguments)
  235. @classmethod
  236. def map(cls, func, args, timeout=None):
  237. """Distribute processing of the arguments and collect the results."""
  238. remote_task = cls.remote_execute(func, args)
  239. return remote_task.join(timeout=timeout)
  240. @classmethod
  241. def map_async(cls, func, args, timeout=None):
  242. """Distribute processing of the arguments and collect the results
  243. asynchronously.
  244. :returns: :class:`celery.result.AsyncResult` instance.
  245. """
  246. serfunc = pickle.dumps(func)
  247. return AsynchronousMapTask.delay(serfunc, args, timeout=timeout)
  248. class PeriodicTask(Task):
  249. """A periodic task is a task that behaves like a :manpage:`cron` job.
  250. .. attribute:: run_every
  251. *REQUIRED* Defines how often the task is run (its interval),
  252. it can be either a :class:`datetime.timedelta` object or an
  253. integer specifying the time in seconds.
  254. :raises NotImplementedError: if the :attr:`run_every` attribute is
  255. not defined.
  256. You have to register the periodic task in the task registry.
  257. Example
  258. >>> from celery.task import tasks, PeriodicTask
  259. >>> from datetime import timedelta
  260. >>> class MyPeriodicTask(PeriodicTask):
  261. ... name = "my_periodic_task"
  262. ... run_every = timedelta(seconds=30)
  263. ...
  264. ... def run(self, **kwargs):
  265. ... logger = self.get_logger(**kwargs)
  266. ... logger.info("Running MyPeriodicTask")
  267. >>> tasks.register(MyPeriodicTask)
  268. """
  269. run_every = timedelta(days=1)
  270. type = "periodic"
  271. def __init__(self):
  272. if not self.run_every:
  273. raise NotImplementedError(
  274. "Periodic tasks must have a run_every attribute")
  275. # If run_every is a integer, convert it to timedelta seconds.
  276. if isinstance(self.run_every, int):
  277. self.run_every = timedelta(seconds=self.run_every)
  278. super(PeriodicTask, self).__init__()