task-cookbook.rst 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. .. _cookbook-tasks:
  2. ================
  3. Task Cookbook
  4. ================
  5. .. contents::
  6. :local:
  7. .. _cookbook-task-serial:
  8. Ensuring a task is only executed one at a time
  9. ==============================================
  10. You can accomplish this by using a lock.
  11. In this example we'll be using the cache framework to set a lock that is
  12. accessible for all workers.
  13. It's part of an imaginary RSS feed importer called `djangofeeds`.
  14. The task takes a feed URL as a single argument, and imports that feed into
  15. a Django model called `Feed`. We ensure that it's not possible for two or
  16. more workers to import the same feed at the same time by setting a cache key
  17. consisting of the MD5 checksum of the feed URL.
  18. The cache key expires after some time in case something unexpected happens,
  19. and something always will...
  20. For this reason your tasks runtime should not exceeed the timeout.
  21. .. code-block:: python
  22. from celery import task
  23. from celery.five import monotonic
  24. from celery.utils.log import get_task_logger
  25. from contextlib import contextmanager
  26. from django.core.cache import cache
  27. from hashlib import md5
  28. from djangofeeds.models import Feed
  29. logger = get_task_logger(__name__)
  30. LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
  31. @contextmanager
  32. def memcache_lock(lock_id, oid):
  33. timeout_at = monotonic() + LOCK_EXPIRE - 3
  34. # cache.add fails if the key already exists
  35. status = cache.add(lock_id, oid, LOCK_EXPIRE)
  36. try:
  37. yield status
  38. finally:
  39. # memcache delete is very slow, but we have to use it to take
  40. # advantage of using add() for atomic locking
  41. if monotonic() < timeout_at:
  42. # do not release the lock if we exceeded the timeout
  43. # to lessen the chance of releasing an expired lock
  44. # owned by someone else.
  45. cache.delete(lock_id)
  46. @task(bind=True)
  47. def import_feed(self, feed_url):
  48. # The cache key consists of the task name and the MD5 digest
  49. # of the feed URL.
  50. feed_url_hexdigest = md5(feed_url).hexdigest()
  51. lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
  52. logger.debug('Importing feed: %s', feed_url)
  53. with memcache_lock(lock_id, self.app.oid) as acquired:
  54. if acquired:
  55. return Feed.objects.import_feed(feed_url).url
  56. logger.debug(
  57. 'Feed %s is already being imported by another worker', feed_url)