strategy.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. from celery.utils import chunks
  2. def even_time_distribution(task, size, time_window, iterable, **apply_kwargs):
  3. """With an iterator yielding task args, kwargs tuples, evenly distribute
  4. the processing of its tasks throughout the time window available.
  5. :param task: The kind of task (a :class:`celery.task.base.Task`.)
  6. :param size: Total number of elements the iterator gives.
  7. :param time_window: Total time available, in minutes.
  8. :param iterable: Iterable yielding task args, kwargs tuples.
  9. :param \*\*apply_kwargs: Additional keyword arguments to be passed on to
  10. :func:`celery.execute.apply_async`.
  11. Example
  12. >>> class RefreshAllFeeds(Task):
  13. ...
  14. ... def run(self, **kwargs):
  15. ... feeds = Feed.objects.all()
  16. ... total = feeds.count()
  17. ...
  18. ... time_window = REFRESH_FEEDS_EVERY_INTERVAL_MINUTES
  19. ...
  20. ... def iter_feed_task_args(iterable):
  21. ... for feed in iterable:
  22. ... yield ([feed.feed_url], {}) # args, kwargs tuple
  23. ...
  24. ... it = iter_feed_task_args(feeds.iterator())
  25. ...
  26. ... even_time_distribution(RefreshFeedTask, total,
  27. ... time_window, it)
  28. """
  29. bucketsize = size / time_window
  30. buckets = chunks(iterable, int(bucketsize))
  31. connection = task.establish_connection()
  32. try:
  33. for bucket_count, bucket in enumerate(buckets):
  34. # Skew the countdown for items in this bucket by one.
  35. seconds_eta = (60 * bucket_count if bucket_count else None)
  36. for args, kwargs in bucket:
  37. task.apply_async(args=args, kwargs=kwargs,
  38. connection=connection,
  39. countdown=seconds_eta,
  40. **apply_kwargs)
  41. finally:
  42. connection.close()