tasks.rst 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. ================
  2. Creating Tasks
  3. ================
  4. Ensuring a task is only executed one at a time
  5. ----------------------------------------------
  6. You can accomplish this by using a lock.
  7. In this example we'll be using the cache framework to set a lock that is
  8. accessible for all workers.
  9. It's part of an imaginary RSS Feed application called ``djangofeeds``.
  10. The task takes a feed URL as a single argument, and imports that feed into
  11. a Django model called ``Feed``. We ensure that it's not possible for two or
  12. more workers to import the same feed at the same time by setting a cache key
  13. consisting of the md5sum of the feed URL.
  14. The cache key expires after some time in case something unexpected happens
  15. (you never know, right?)
  16. .. code-block:: python
  17. from celery.task import Task
  18. from celery.registry import tasks
  19. from django.core.cache import cache
  20. from django.utils.hashcompat import md5_constructor as md5
  21. from djangofeeds.models import Feed
  22. LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
  23. class FeedImporter(Task):
  24. name = "feed.import"
  25. def run(self, feed_url, **kwargs):
  26. logger = self.get_logger(**kwargs)
  27. # The cache key consists of the task name and the MD5 digest
  28. # of the feed URL.
  29. feed_url_digest = md5(feed_url).hexdigest()
  30. lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)
  31. is_locked = lambda: str(cache.get(lock_id)) == "true"
  32. acquire_lock = lambda: cache.set(lock_id, "true", LOCK_EXPIRE)
  33. # memcache delete is very slow, so we'd rather set a false value
  34. # with a very low expiry time.
  35. release_lock = lambda: cache.set(lock_id, "nil", 1)
  36. logger.debug("Importing feed: %s" % feed_url)
  37. if is_locked():
  38. logger.debug(
  39. "Feed %s is already being imported by another worker" % (
  40. feed_url))
  41. return
  42. acquire_lock()
  43. try:
  44. feed = Feed.objects.import_feed(feed_url)
  45. finally:
  46. release_lock()
  47. return feed.url
  48. tasks.register(FeedImporter)