datastructures.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. """
  2. Custom Datastructures
  3. """
  4. from UserList import UserList
  5. class PositionQueue(UserList):
  6. """A positional queue of a specific length, with slots that are either
  7. filled or unfilled. When all of the positions are filled, the queue
  8. is considered :meth:`full`.
  9. :param length: The number of items required for the queue to be filled.
  10. """
  11. class UnfilledPosition(object):
  12. """Describes an unfilled slot."""
  13. def __init__(self, position):
  14. self.position = position
  15. def __init__(self, length):
  16. self.length = length
  17. self.data = map(self.UnfilledPosition, xrange(length))
  18. def full(self):
  19. """Returns ``True`` if all of the slots has been filled."""
  20. return len(self) >= self.length
  21. def __len__(self):
  22. """``len(self)`` -> number of slots filled with real values."""
  23. return len(self.filled)
  24. @property
  25. def filled(self):
  26. """Returns the filled slots as a list."""
  27. return filter(lambda v: not isinstance(v, self.UnfilledPosition),
  28. self.data)
  29. class TaskProcessQueue(UserList):
  30. """Queue of running child processes, which starts waiting for the
  31. processes to finish when the queue limit is reached.
  32. :param limit: see :attr:`limit` attribute.
  33. :param logger: see :attr:`logger` attribute.
  34. :param done_msg: see :attr:`done_msg` attribute.
  35. .. attribute:: limit
  36. The number of processes that can run simultaneously until
  37. we start collecting results.
  38. .. attribute:: logger
  39. The logger used to print the :attr:`done_msg`.
  40. .. attribute:: done_msg
  41. Message logged when a tasks result has been collected.
  42. The message is logged with loglevel :const:`logging.INFO`.
  43. """
  44. def __init__(self, limit, logger=None, done_msg=None):
  45. self.limit = limit
  46. self.logger = logger
  47. self.done_msg = done_msg
  48. self.data = []
  49. def add(self, result, task_name, task_id):
  50. """Add a process to the queue.
  51. If the queue is full, it will start to collect return values from
  52. the tasks executed. When all return values has been collected,
  53. it deletes the current queue and is ready to accept new processes.
  54. :param result: A :class:`multiprocessing.AsyncResult` instance, as
  55. returned by :meth:`multiprocessing.Pool.apply_async`.
  56. :param task_name: Name of the task executed.
  57. :param task_id: Id of the task executed.
  58. """
  59. self.data.append([result, task_name, task_id])
  60. if self.data and len(self.data) >= self.limit:
  61. for result, task_name, task_id in self.data:
  62. ret_value = result.get()
  63. if self.done_msg and self.logger:
  64. self.logger.info(self.done_msg % {
  65. "name": task_name,
  66. "id": task_id,
  67. "return_value": ret_value})
  68. self.data = []