| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 | .. _cookbook-tasks:================ Task Cookbook================.. contents::    :local:.. _cookbook-task-serial:Ensuring a task is only executed one at a time==============================================You can accomplish this by using a lock.In this example we'll be using the cache framework to set a lock that'saccessible for all workers.It's part of an imaginary RSS feed importer called `djangofeeds`.The task takes a feed URL as a single argument, and imports that feed intoa Django model called `Feed`. We ensure that it's not possible for two ormore workers to import the same feed at the same time by setting a cache keyconsisting of the MD5 check-sum of the feed URL.The cache key expires after some time in case something unexpected happens,and something always will...For this reason your tasks run-time shouldn't exceed the timeout... note::    In order for this to work correctly you need to be using a cache    backend where the ``.add`` operation is atomic.  ``memcached`` is known    to work well for this purpose... code-block:: python    from celery import task    from celery.five import monotonic    from celery.utils.log import get_task_logger    from contextlib import contextmanager    from django.core.cache import cache    from hashlib import md5    from djangofeeds.models import Feed    logger = get_task_logger(__name__)    LOCK_EXPIRE = 60 * 10  # Lock expires in 10 minutes    @contextmanager    def memcache_lock(lock_id, oid):        timeout_at = monotonic() + LOCK_EXPIRE - 3        # cache.add fails if the key already exists        status = cache.add(lock_id, oid, LOCK_EXPIRE)        try:            yield status        finally:            # memcache delete is very slow, but we have to use it to take            # advantage of using add() for atomic locking            if monotonic() < timeout_at:                # don't release the lock if we exceeded the timeout                # to lessen the chance of releasing an expired lock                # owned by someone else.                cache.delete(lock_id)    @task(bind=True)    def import_feed(self, feed_url):        # The cache key consists of the task name and the MD5 digest        # of the feed URL.        feed_url_hexdigest = md5(feed_url).hexdigest()        lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)        logger.debug('Importing feed: %s', feed_url)        with memcache_lock(lock_id, self.app.oid) as acquired:            if acquired:                return Feed.objects.import_feed(feed_url).url        logger.debug(            'Feed %s is already being imported by another worker', feed_url)
 |