tasks.rst 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. ================
  2. Creating Tasks
  3. ================
  4. .. contents::
  5. :local:
  6. Ensuring a task is only executed one at a time
  7. ==============================================
  8. You can accomplish this by using a lock.
  9. In this example we'll be using the cache framework to set a lock that is
  10. accessible for all workers.
  11. It's part of an imaginary RSS feed importer called ``djangofeeds``.
  12. The task takes a feed URL as a single argument, and imports that feed into
  13. a Django model called ``Feed``. We ensure that it's not possible for two or
  14. more workers to import the same feed at the same time by setting a cache key
  15. consisting of the md5sum of the feed URL.
  16. The cache key expires after some time in case something unexpected happens
  17. (you never know, right?)
  18. .. code-block:: python
  19. from celery.task import Task
  20. from django.core.cache import cache
  21. from django.utils.hashcompat import md5_constructor as md5
  22. from djangofeeds.models import Feed
  23. LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
  24. class FeedImporter(Task):
  25. name = "feed.import"
  26. def run(self, feed_url, **kwargs):
  27. logger = self.get_logger(**kwargs)
  28. # The cache key consists of the task name and the MD5 digest
  29. # of the feed URL.
  30. feed_url_digest = md5(feed_url).hexdigest()
  31. lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)
  32. # cache.add fails if if the key already exists
  33. acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
  34. # memcache delete is very slow, but we have to use it to take
  35. # advantage of using add() for atomic locking
  36. release_lock = lambda: cache.delete(lock_id)
  37. logger.debug("Importing feed: %s" % feed_url)
  38. if aquire_lock():
  39. try:
  40. feed = Feed.objects.import_feed(feed_url)
  41. finally:
  42. release_lock()
  43. return feed.url
  44. logger.debug(
  45. "Feed %s is already being imported by another worker" % (
  46. feed_url))
  47. return