|
@@ -23,20 +23,41 @@ a Django model called `Feed`. We ensure that it's not possible for two or
|
|
|
more workers to import the same feed at the same time by setting a cache key
|
|
|
consisting of the MD5 checksum of the feed URL.
|
|
|
|
|
|
-The cache key expires after some time in case something unexpected happens
|
|
|
-(you never know, right?)
|
|
|
+The cache key expires after some time in case something unexpected happens,
|
|
|
+and something always will...
|
|
|
+
|
|
|
+For this reason your tasks runtime should not exceeed the timeout.
|
|
|
+
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
from celery import task
|
|
|
+ from celery.five import monotonic
|
|
|
from celery.utils.log import get_task_logger
|
|
|
+ from contextlib import contextmanager
|
|
|
from django.core.cache import cache
|
|
|
from hashlib import md5
|
|
|
from djangofeeds.models import Feed
|
|
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
|
- LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
|
|
|
+ LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
|
|
|
+
|
|
|
+ @contextmanager
|
|
|
+ def memcache_lock(lock_id, oid):
|
|
|
+ timeout_at = monotonic() + LOCK_EXPIRE - 3
|
|
|
+ # cache.add fails if the key already exists
|
|
|
+ status = cache.add(lock_id, oid, LOCK_EXPIRE)
|
|
|
+ try:
|
|
|
+ yield status
|
|
|
+ finally:
|
|
|
+ # memcache delete is very slow, but we have to use it to take
|
|
|
+ # advantage of using add() for atomic locking
|
|
|
+ if monotonic() < timeout_at:
|
|
|
+ # do not release the lock if we exceeded the timeout
|
|
|
+ # to lessen the chance of releasing an expired lock
|
|
|
+ # owned by someone else.
|
|
|
+ cache.delete(lock_id)
|
|
|
|
|
|
@task(bind=True)
|
|
|
def import_feed(self, feed_url):
|
|
@@ -44,20 +65,9 @@ The cache key expires after some time in case something unexpected happens
|
|
|
# of the feed URL.
|
|
|
feed_url_hexdigest = md5(feed_url).hexdigest()
|
|
|
lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
|
|
|
-
|
|
|
- # cache.add fails if the key already exists
|
|
|
- acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE)
|
|
|
- # memcache delete is very slow, but we have to use it to take
|
|
|
- # advantage of using add() for atomic locking
|
|
|
- release_lock = lambda: cache.delete(lock_id)
|
|
|
-
|
|
|
logger.debug('Importing feed: %s', feed_url)
|
|
|
- if acquire_lock():
|
|
|
- try:
|
|
|
- feed = Feed.objects.import_feed(feed_url)
|
|
|
- finally:
|
|
|
- release_lock()
|
|
|
- return feed.url
|
|
|
-
|
|
|
+ with memcache_lock(lock_id, self.app.oid) as acquired:
|
|
|
+ if acquired:
|
|
|
+ return Feed.objects.import_feed(feed_url).url
|
|
|
logger.debug(
|
|
|
'Feed %s is already being imported by another worker', feed_url)
|