123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- ================
- Creating Tasks
- ================
- .. contents::
- :local:
- Ensuring a task is only executed one at a time
- ==============================================
- You can accomplish this by using a lock.
- In this example we'll be using the cache framework to set a lock that is
- accessible for all workers.
- It's part of an imaginary RSS feed importer called ``djangofeeds``.
- The task takes a feed URL as a single argument, and imports that feed into
- a Django model called ``Feed``. We ensure that it's not possible for two or
- more workers to import the same feed at the same time by setting a cache key
- consisting of the md5sum of the feed URL.
- The cache key expires after some time in case something unexpected happens
- (you never know, right?)
- .. code-block:: python
- from celery.task import Task
- from django.core.cache import cache
- from django.utils.hashcompat import md5_constructor as md5
- from djangofeeds.models import Feed
- LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
- class FeedImporter(Task):
- name = "feed.import"
- def run(self, feed_url, **kwargs):
- logger = self.get_logger(**kwargs)
- # The cache key consists of the task name and the MD5 digest
- # of the feed URL.
- feed_url_digest = md5(feed_url).hexdigest()
- lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)
- # cache.add fails if if the key already exists
- acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
- # memcache delete is very slow, but we have to use it to take
- # advantage of using add() for atomic locking
- release_lock = lambda: cache.delete(lock_id)
- logger.debug("Importing feed: %s" % feed_url)
- if aquire_lock():
- try:
- feed = Feed.objects.import_feed(feed_url)
- finally:
- release_lock()
- return feed.url
- logger.debug(
- "Feed %s is already being imported by another worker" % (
- feed_url))
- return
|