test_cache.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. /*************************************************************************
  2. * Copyright (C) [2020] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #include <gtest/gtest.h>
  21. #include <future>
  22. #include <memory>
  23. #include <random>
  24. #include <utility>
  25. #include <vector>
  26. #include "core/cache.h"
  27. #include "core/session.h"
  28. namespace infer_server {
  29. namespace {
  30. TEST(InferServerCoreDeathTest, PushNullToCache) {
  31. CacheDynamic cache_d(16, Priority(0), 0);
  32. cache_d.Start();
  33. EXPECT_DEATH(cache_d.Push(nullptr), "");
  34. auto pack = std::make_shared<Package>();
  35. pack->data.emplace_back(new InferData);
  36. EXPECT_DEATH(cache_d.Push(std::move(pack)), "");
  37. cache_d.Stop();
  38. CacheStatic cache_s(16, Priority(0));
  39. cache_s.Start();
  40. EXPECT_DEATH(cache_s.Push(nullptr), "");
  41. pack = std::make_shared<Package>();
  42. pack->data.emplace_back(new InferData);
  43. EXPECT_DEATH(cache_s.Push(std::move(pack)), "");
  44. cache_s.Stop();
  45. }
  46. auto empty_response_func = [](Status, PackagePtr) {};
  47. auto empty_notifier_func = [](const RequestControl*) {};
  48. constexpr uint32_t d_batch_size = 4;
  49. constexpr uint32_t s_batch_size = 8;
  50. constexpr Priority priority(0);
  51. constexpr uint32_t batch_timeout = 0;
  52. TEST(InferServerCore, DynamicCache_StartStop) {
  53. constexpr uint32_t capacity = 10;
  54. std::shared_ptr<CacheBase> cache = std::make_shared<CacheDynamic>(d_batch_size, priority, batch_timeout);
  55. ASSERT_EQ(cache->BatchSize(), d_batch_size);
  56. ASSERT_EQ(cache->GetPriority(), priority);
  57. ASSERT_FALSE(cache->Running());
  58. EXPECT_FALSE(cache->Push(std::make_shared<Package>()));
  59. cache->Start();
  60. ASSERT_TRUE(cache->Running());
  61. ASSERT_TRUE(cache->Push(std::make_shared<Package>()));
  62. std::unique_ptr<RequestControl> ctrl(
  63. new RequestControl(empty_response_func, empty_notifier_func, "", 0, d_batch_size * capacity));
  64. for (size_t idx = 0; idx < capacity; ++idx) {
  65. auto pack = std::make_shared<Package>();
  66. for (size_t b_idx = 0; b_idx < d_batch_size; ++b_idx) {
  67. pack->data.emplace_back(new InferData);
  68. pack->data[b_idx]->ctrl = ctrl.get();
  69. }
  70. ASSERT_TRUE(cache->Push(std::move(pack)));
  71. }
  72. auto out = cache->Pop();
  73. ASSERT_TRUE(out);
  74. ASSERT_EQ(out->data.size(), d_batch_size);
  75. for (auto& it : out->data) {
  76. EXPECT_TRUE(it->ctrl);
  77. }
  78. // clear cache
  79. cache->Stop();
  80. uint32_t out_cnt = 0;
  81. while ((out = cache->Pop())) {
  82. ASSERT_EQ(out->data.size(), d_batch_size);
  83. for (auto& it : out->data) {
  84. EXPECT_TRUE(it->ctrl);
  85. }
  86. ++out_cnt;
  87. }
  88. EXPECT_EQ(out_cnt, capacity - 1);
  89. }
  90. TEST(InferServerCore, DynamicCache_OverBatchSize) {
  91. constexpr uint32_t capacity = 12;
  92. std::shared_ptr<CacheBase> cache = std::make_shared<CacheDynamic>(d_batch_size, priority, batch_timeout);
  93. cache->Start();
  94. ASSERT_TRUE(cache->Running());
  95. std::unique_ptr<RequestControl> ctrl(
  96. new RequestControl(empty_response_func, empty_notifier_func, "", 0, d_batch_size * capacity));
  97. for (size_t idx = 0; idx < d_batch_size; ++idx) {
  98. auto pack = std::make_shared<Package>();
  99. for (size_t b_idx = 0; b_idx < capacity; ++b_idx) {
  100. pack->data.emplace_back(new InferData);
  101. pack->data[b_idx]->ctrl = ctrl.get();
  102. }
  103. ASSERT_TRUE(cache->Push(std::move(pack)));
  104. }
  105. cache->Stop();
  106. uint32_t out_cnt = 0;
  107. while (auto out = cache->Pop()) {
  108. ASSERT_EQ(out->data.size(), d_batch_size);
  109. for (auto& it : out->data) {
  110. EXPECT_TRUE(it->ctrl);
  111. }
  112. ++out_cnt;
  113. }
  114. EXPECT_EQ(out_cnt, capacity);
  115. }
  116. TEST(InferServerCore, StaticCache_StartStop) {
  117. constexpr uint32_t capacity = 10;
  118. constexpr uint32_t data_num = 6;
  119. std::shared_ptr<CacheBase> cache = std::make_shared<CacheStatic>(s_batch_size, priority);
  120. ASSERT_EQ(cache->BatchSize(), s_batch_size);
  121. ASSERT_EQ(cache->GetPriority(), priority);
  122. ASSERT_FALSE(cache->Running());
  123. EXPECT_FALSE(cache->Push(std::make_shared<Package>()));
  124. cache->Start();
  125. ASSERT_TRUE(cache->Running());
  126. EXPECT_TRUE(cache->Push(std::make_shared<Package>()));
  127. std::unique_ptr<RequestControl> ctrl(
  128. new RequestControl(empty_response_func, empty_notifier_func, "", 0, data_num * capacity));
  129. for (size_t idx = 0; idx < capacity; ++idx) {
  130. auto pack = std::make_shared<Package>();
  131. for (size_t b_idx = 0; b_idx < data_num; ++b_idx) {
  132. pack->data.emplace_back(new InferData);
  133. pack->data[b_idx]->ctrl = ctrl.get();
  134. }
  135. ASSERT_TRUE(cache->Push(std::move(pack)));
  136. }
  137. auto out = cache->Pop();
  138. ASSERT_EQ(out->data.size(), data_num);
  139. for (auto& it : out->data) {
  140. EXPECT_TRUE(it->ctrl);
  141. }
  142. }
  143. TEST(InferServerCore, StaticCache_RandomDataNum) {
  144. constexpr uint32_t capacity = 50;
  145. std::shared_ptr<CacheBase> cache = std::make_shared<CacheStatic>(s_batch_size, priority);
  146. cache->Start();
  147. ASSERT_TRUE(cache->Running());
  148. std::unique_ptr<RequestControl> ctrl(
  149. new RequestControl(empty_response_func, empty_notifier_func, "", 0, d_batch_size * capacity));
  150. std::vector<uint32_t> data_num_record;
  151. for (size_t idx = 0; idx < capacity - 5;) {
  152. auto pack = std::make_shared<Package>();
  153. std::random_device rd;
  154. std::mt19937 gen(rd());
  155. std::uniform_int_distribution<int> data_num_dis(1, 4 * s_batch_size);
  156. int data_num = data_num_dis(gen);
  157. for (int b_idx = 0; b_idx < data_num; ++b_idx) {
  158. pack->data.emplace_back(new InferData);
  159. pack->data[b_idx]->ctrl = ctrl.get();
  160. }
  161. ASSERT_TRUE(cache->Push(std::move(pack)));
  162. while (data_num > 0) {
  163. data_num_record.push_back(static_cast<uint32_t>(data_num) < s_batch_size ? data_num : s_batch_size);
  164. data_num -= s_batch_size;
  165. ++idx;
  166. }
  167. }
  168. cache->Stop();
  169. int index = 0;
  170. while (auto out = cache->Pop()) {
  171. ASSERT_EQ(out->data.size(), data_num_record[index]);
  172. ++index;
  173. for (auto& it : out->data) {
  174. EXPECT_TRUE(it->ctrl);
  175. }
  176. }
  177. }
  178. TEST(InferServerCore, DynamicCache_ConcurrentAndDiscard) {
  179. constexpr uint32_t capacity = 50;
  180. constexpr uint32_t parallel = 3;
  181. constexpr uint32_t total_data_num = capacity * d_batch_size;
  182. std::shared_ptr<CacheBase> cache = std::make_shared<CacheDynamic>(d_batch_size, priority, batch_timeout);
  183. cache->Start();
  184. ASSERT_TRUE(cache->Running());
  185. std::vector<std::future<void>> rets;
  186. std::vector<std::shared_ptr<RequestControl>> ctrls;
  187. rets.reserve(parallel);
  188. ctrls.reserve(parallel);
  189. for (size_t push_idx = 0; push_idx < parallel; ++push_idx) {
  190. auto ctrl = new RequestControl(empty_response_func, empty_notifier_func, std::to_string(push_idx), push_idx,
  191. d_batch_size * capacity);
  192. ctrls.emplace_back(ctrl);
  193. rets.emplace_back(std::async(std::launch::async, [cache, ctrl]() {
  194. std::uniform_int_distribution<uint32_t> data_num_dis(1, d_batch_size);
  195. std::random_device rd;
  196. std::mt19937 gen(rd());
  197. for (size_t idx = 0; idx < total_data_num / parallel - d_batch_size;) {
  198. auto pack = std::make_shared<Package>();
  199. uint32_t data_num = data_num_dis(gen);
  200. idx += data_num;
  201. for (size_t b_idx = 0; b_idx < data_num; ++b_idx) {
  202. pack->data.emplace_back(new InferData);
  203. pack->data[b_idx]->ctrl = ctrl;
  204. }
  205. ASSERT_TRUE(cache->Push(std::move(pack)));
  206. }
  207. }));
  208. }
  209. for (auto& it : rets) {
  210. it.get();
  211. }
  212. std::random_device rd;
  213. std::mt19937 gen(rd());
  214. std::uniform_int_distribution<uint32_t> discard_dis(0, parallel - 1);
  215. uint32_t discard_idx = discard_dis(gen);
  216. ctrls[discard_idx]->Discard();
  217. cache->Stop();
  218. while (auto pack = cache->Pop()) {
  219. for (auto& it : pack->data) {
  220. EXPECT_FALSE(it->ctrl->IsDiscarded()) << "discarded data should not be popped out";
  221. EXPECT_NE(it->ctrl->Tag(), std::to_string(discard_idx));
  222. }
  223. }
  224. }
  225. TEST(InferServerCore, StaticCache_ConcurrentAndDiscard) {
  226. constexpr uint32_t capacity = 50;
  227. constexpr uint32_t batch_size = 16;
  228. constexpr uint32_t parallel = 3;
  229. std::shared_ptr<CacheBase> cache = std::make_shared<CacheStatic>(batch_size, priority);
  230. cache->Start();
  231. ASSERT_TRUE(cache->Running());
  232. std::vector<std::future<void>> rets;
  233. std::vector<std::unique_ptr<RequestControl>> ctrls;
  234. rets.reserve(parallel);
  235. ctrls.reserve(parallel);
  236. for (size_t push_idx = 0; push_idx < parallel; ++push_idx) {
  237. ctrls.emplace_back(new RequestControl(empty_response_func, empty_notifier_func, std::to_string(push_idx), 0,
  238. batch_size * capacity));
  239. rets.emplace_back(std::async(std::launch::async, [cache, push_idx, &ctrls]() {
  240. for (size_t idx = 0; idx < capacity / parallel; ++idx) {
  241. auto pack = std::make_shared<Package>();
  242. for (size_t b_idx = 0; b_idx < batch_size; ++b_idx) {
  243. pack->data.emplace_back(new InferData);
  244. pack->data[b_idx]->ctrl = ctrls[push_idx].get();
  245. pack->tag = ctrls[push_idx]->Tag();
  246. }
  247. ASSERT_TRUE(cache->Push(std::move(pack)));
  248. }
  249. }));
  250. }
  251. for (auto& it : rets) {
  252. it.get();
  253. }
  254. std::random_device rd;
  255. std::mt19937 gen(rd());
  256. std::uniform_int_distribution<uint32_t> discard_dis(0, parallel - 1);
  257. uint32_t discard_idx = discard_dis(gen);
  258. ctrls[discard_idx]->Discard();
  259. cache->Stop();
  260. while (auto pack = cache->Pop()) {
  261. for (auto& it : pack->data) {
  262. EXPECT_FALSE(it->ctrl->IsDiscarded());
  263. EXPECT_NE(it->ctrl->Tag(), std::to_string(discard_idx));
  264. }
  265. }
  266. }
  267. } // namespace
  268. } // namespace infer_server