datastructures.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. """
  2. Custom Datastructures
  3. """
  4. from UserList import UserList
  5. class PositionQueue(UserList):
  6. """A positional queue of a specific length, with slots that are either
  7. filled or unfilled. When all of the positions are filled, the queue
  8. is considered :meth:`full`.
  9. :param length: see :attr:`length`.
  10. .. attribute:: length
  11. The number of items required for the queue to be considered full.
  12. """
  13. class UnfilledPosition(object):
  14. """Describes an unfilled slot."""
  15. def __init__(self, position):
  16. self.position = position
  17. def __init__(self, length):
  18. self.length = length
  19. self.data = map(self.UnfilledPosition, xrange(length))
  20. def full(self):
  21. """Returns ``True`` if all of the slots has been filled."""
  22. return len(self) >= self.length
  23. def __len__(self):
  24. """``len(self)`` -> number of slots filled with real values."""
  25. return len(self.filled)
  26. @property
  27. def filled(self):
  28. """Returns the filled slots as a list."""
  29. return filter(lambda v: not isinstance(v, self.UnfilledPosition),
  30. self.data)
  31. class TaskProcessQueue(UserList):
  32. """Queue of running child processes, which starts waiting for the
  33. processes to finish when the queue limit has been reached.
  34. :param limit: see :attr:`limit` attribute.
  35. :param logger: see :attr:`logger` attribute.
  36. :param done_msg: see :attr:`done_msg` attribute.
  37. .. attribute:: limit
  38. The number of processes that can run simultaneously until
  39. we start collecting results.
  40. .. attribute:: logger
  41. The logger used to print the :attr:`done_msg`.
  42. .. attribute:: done_msg
  43. Message logged when a tasks result has been collected.
  44. The message is logged with loglevel :const:`logging.INFO`.
  45. """
  46. def __init__(self, limit, logger=None, done_msg=None):
  47. self.limit = limit
  48. self.logger = logger
  49. self.done_msg = done_msg
  50. self.data = []
  51. def add(self, result, task_name, task_id):
  52. """Add a process to the queue.
  53. If the queue is full, it will start to collect return values from
  54. the tasks executed. When all return values has been collected,
  55. it deletes the current queue and is ready to accept new processes.
  56. :param result: A :class:`multiprocessing.AsyncResult` instance, as
  57. returned by :meth:`multiprocessing.Pool.apply_async`.
  58. :param task_name: Name of the task executed.
  59. :param task_id: Id of the task executed.
  60. """
  61. self.data.append([result, task_name, task_id])
  62. if self.data and len(self.data) >= self.limit:
  63. for result, task_name, task_id in self.data:
  64. ret_value = result.get()
  65. if self.done_msg and self.logger:
  66. self.logger.info(self.done_msg % {
  67. "name": task_name,
  68. "id": task_id,
  69. "return_value": ret_value})
  70. self.data = []