task-cookbook.rst 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. .. _cookbook-tasks:
  2. ================
  3. Task Cookbook
  4. ================
  5. .. contents::
  6. :local:
  7. .. _cookbook-task-serial:
  8. Ensuring a task is only executed one at a time
  9. ==============================================
  10. You can accomplish this by using a lock.
  11. In this example we'll be using the cache framework to set a lock that is
  12. accessible for all workers.
  13. It's part of an imaginary RSS feed importer called `djangofeeds`.
  14. The task takes a feed URL as a single argument, and imports that feed into
  15. a Django model called `Feed`. We ensure that it's not possible for two or
  16. more workers to import the same feed at the same time by setting a cache key
  17. consisting of the MD5 checksum of the feed URL.
  18. The cache key expires after some time in case something unexpected happens
  19. (you never know, right?)
  20. .. code-block:: python
  21. from celery import task
  22. from celery.utils.log import get_task_logger
  23. from django.core.cache import cache
  24. from hashlib import md5
  25. from djangofeeds.models import Feed
  26. logger = get_task_logger(__name__)
  27. LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
  28. @task(bind=True)
  29. def import_feed(self, feed_url):
  30. # The cache key consists of the task name and the MD5 digest
  31. # of the feed URL.
  32. feed_url_hexdigest = md5(feed_url).hexdigest()
  33. lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
  34. # cache.add fails if the key already exists
  35. acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE)
  36. # memcache delete is very slow, but we have to use it to take
  37. # advantage of using add() for atomic locking
  38. release_lock = lambda: cache.delete(lock_id)
  39. logger.debug('Importing feed: %s', feed_url)
  40. if acquire_lock():
  41. try:
  42. feed = Feed.objects.import_feed(feed_url)
  43. finally:
  44. release_lock()
  45. return feed.url
  46. logger.debug(
  47. 'Feed %s is already being imported by another worker', feed_url)
  48. Note that in order for this to work correctly you need to be using a cache
  49. backend that supports an atomic ``.add`` operation. ``memcached`` is known
  50. to work well for this purpose.