|
@@ -40,23 +40,21 @@ The cache key expires after some time in case something unexpected happens
|
|
|
feed_url_digest = md5(feed_url).hexdigest()
|
|
|
lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)
|
|
|
|
|
|
- is_locked = lambda: str(cache.get(lock_id)) == "true"
|
|
|
- acquire_lock = lambda: cache.set(lock_id, "true", LOCK_EXPIRE)
|
|
|
- # memcache delete is very slow, so we'd rather set a false value
|
|
|
- # with a very low expiry time.
|
|
|
- release_lock = lambda: cache.set(lock_id, "nil", 1)
|
|
|
+ # cache.add fails if if the key already exists
|
|
|
+ acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
|
|
|
+ # memcache delete is very slow, but we have to use it to take
|
|
|
+ # advantage of using add() for atomic locking
|
|
|
+ release_lock = lambda: cache.delete(lock_id)
|
|
|
|
|
|
logger.debug("Importing feed: %s" % feed_url)
|
|
|
- if is_locked():
|
|
|
- logger.debug(
|
|
|
- "Feed %s is already being imported by another worker" % (
|
|
|
- feed_url))
|
|
|
- return
|
|
|
-
|
|
|
- acquire_lock()
|
|
|
- try:
|
|
|
- feed = Feed.objects.import_feed(feed_url)
|
|
|
- finally:
|
|
|
- release_lock()
|
|
|
-
|
|
|
- return feed.url
|
|
|
+ if aquire_lock():
|
|
|
+ try:
|
|
|
+ feed = Feed.objects.import_feed(feed_url)
|
|
|
+ finally:
|
|
|
+ release_lock()
|
|
|
+ return feed.url
|
|
|
+
|
|
|
+ logger.debug(
|
|
|
+ "Feed %s is already being imported by another worker" % (
|
|
|
+ feed_url))
|
|
|
+ return
|