test_dataset.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os
  15. import time
  16. import unittest
  17. import sys
  18. import logging
  19. import random
  20. import copy
  21. # add python path of PadleDetection to sys.path
  22. parent_path = os.path.abspath(os.path.join(__file__, *(['..'] * 4)))
  23. if parent_path not in sys.path:
  24. sys.path.append(parent_path)
  25. from ppdet.data.parallel_map import ParallelMap
  26. from ppdet.utils.check import enable_static_mode
  27. class MemorySource(object):
  28. """ memory data source for testing
  29. """
  30. def __init__(self, samples):
  31. self._epoch = -1
  32. self._pos = -1
  33. self._drained = False
  34. self._samples = samples
  35. def __iter__(self):
  36. return self
  37. def __next__(self):
  38. return self.next()
  39. def next(self):
  40. if self._epoch < 0:
  41. self.reset()
  42. if self._pos >= self.size():
  43. self._drained = True
  44. raise StopIteration("no more data in " + str(self))
  45. else:
  46. sample = copy.deepcopy(self._samples[self._pos])
  47. self._pos += 1
  48. return sample
  49. def reset(self):
  50. if self._epoch < 0:
  51. self._epoch = 0
  52. else:
  53. self._epoch += 1
  54. self._pos = 0
  55. self._drained = False
  56. random.shuffle(self._samples)
  57. def size(self):
  58. return len(self._samples)
  59. def drained(self):
  60. assert self._epoch >= 0, "the first epoch has not started yet"
  61. return self._pos >= self.size()
  62. def epoch_id(self):
  63. return self._epoch
  64. class TestDataset(unittest.TestCase):
  65. """Test cases for ppdet.data.dataset
  66. """
  67. @classmethod
  68. def setUpClass(cls):
  69. """ setup
  70. """
  71. pass
  72. @classmethod
  73. def tearDownClass(cls):
  74. """ tearDownClass """
  75. pass
  76. def test_next(self):
  77. """ test next
  78. """
  79. samples = list(range(10))
  80. mem_sc = MemorySource(samples)
  81. for i, d in enumerate(mem_sc):
  82. self.assertTrue(d in samples)
  83. def test_transform_with_abnormal_worker(self):
  84. """ test dataset transform with abnormally exit process
  85. """
  86. samples = list(range(20))
  87. mem_sc = MemorySource(samples)
  88. def _worker(sample):
  89. if sample == 3:
  90. sys.exit(1)
  91. return 2 * sample
  92. test_worker = ParallelMap(
  93. mem_sc, _worker, worker_num=2, use_process=True, memsize='2M')
  94. ct = 0
  95. for i, d in enumerate(test_worker):
  96. ct += 1
  97. self.assertTrue(d / 2 in samples)
  98. self.assertEqual(len(samples) - 1, ct)
  99. def test_transform_with_delay_worker(self):
  100. """ test dataset transform with delayed process
  101. """
  102. samples = list(range(20))
  103. mem_sc = MemorySource(samples)
  104. def _worker(sample):
  105. if sample == 3:
  106. time.sleep(30)
  107. return 2 * sample
  108. test_worker = ParallelMap(
  109. mem_sc, _worker, worker_num=2, use_process=True, memsize='2M')
  110. ct = 0
  111. for i, d in enumerate(test_worker):
  112. ct += 1
  113. self.assertTrue(d / 2 in samples)
  114. self.assertEqual(len(samples), ct)
  115. if __name__ == '__main__':
  116. enable_static_mode()
  117. logging.basicConfig()
  118. unittest.main()