task-cookbook.rst 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. .. _cookbook-tasks:
  2. ================
  3. Task Cookbook
  4. ================
  5. .. contents::
  6. :local:
  7. .. _cookbook-task-serial:
  8. Ensuring a task is only executed one at a time
  9. ==============================================
  10. You can accomplish this by using a lock.
  11. In this example we'll be using the cache framework to set a lock that's
  12. accessible for all workers.
  13. It's part of an imaginary RSS feed importer called `djangofeeds`.
  14. The task takes a feed URL as a single argument, and imports that feed into
  15. a Django model called `Feed`. We ensure that it's not possible for two or
  16. more workers to import the same feed at the same time by setting a cache key
  17. consisting of the MD5 check-sum of the feed URL.
  18. The cache key expires after some time in case something unexpected happens,
  19. and something always will...
  20. For this reason your tasks run-time shouldn't exceed the timeout.
  21. .. note::
  22. In order for this to work correctly you need to be using a cache
  23. backend where the ``.add`` operation is atomic. ``memcached`` is known
  24. to work well for this purpose.
  25. .. code-block:: python
  26. from celery import task
  27. from celery.five import monotonic
  28. from celery.utils.log import get_task_logger
  29. from contextlib import contextmanager
  30. from django.core.cache import cache
  31. from hashlib import md5
  32. from djangofeeds.models import Feed
  33. logger = get_task_logger(__name__)
  34. LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
  35. @contextmanager
  36. def memcache_lock(lock_id, oid):
  37. timeout_at = monotonic() + LOCK_EXPIRE - 3
  38. # cache.add fails if the key already exists
  39. status = cache.add(lock_id, oid, LOCK_EXPIRE)
  40. try:
  41. yield status
  42. finally:
  43. # memcache delete is very slow, but we have to use it to take
  44. # advantage of using add() for atomic locking
  45. if monotonic() < timeout_at and status:
  46. # don't release the lock if we exceeded the timeout
  47. # to lessen the chance of releasing an expired lock
  48. # owned by someone else
  49. # also don't release the lock if we didn't acquire it
  50. cache.delete(lock_id)
  51. @task(bind=True)
  52. def import_feed(self, feed_url):
  53. # The cache key consists of the task name and the MD5 digest
  54. # of the feed URL.
  55. feed_url_hexdigest = md5(feed_url).hexdigest()
  56. lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
  57. logger.debug('Importing feed: %s', feed_url)
  58. with memcache_lock(lock_id, self.app.oid) as acquired:
  59. if acquired:
  60. return Feed.objects.import_feed(feed_url).url
  61. logger.debug(
  62. 'Feed %s is already being imported by another worker', feed_url)