test_request.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. /*************************************************************************
  2. * Copyright (C) [2020] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #include <glog/logging.h>
  21. #include <gtest/gtest.h>
  22. #include <opencv2/opencv.hpp>
  23. #include <chrono>
  24. #include <fstream>
  25. #include <future>
  26. #include <iostream>
  27. #include <list>
  28. #include <memory>
  29. #include <string>
  30. #include <thread>
  31. #include <utility>
  32. #include <vector>
  33. #include "cnis/infer_server.h"
  34. #include "cnis/processor.h"
  35. #include "fixture.h"
  36. #ifdef CNIS_WITH_CONTRIB
  37. #include "cnis/contrib/opencv_frame.h"
  38. #include "cnis/contrib/video_helper.h"
  39. #ifndef CNIS_USE_MAGICMIND
  40. #include "device/mlu_context.h"
  41. #endif
  42. using infer_server::Buffer;
  43. namespace infer_server {
  44. using video::PixelFmt;
  45. using video::PreprocessorMLU;
  46. using video::PreprocessType;
  47. #ifdef CNIS_USE_MAGICMIND
  48. static constexpr const char* model_url =
  49. "http://video.cambricon.com/models/MLU370/resnet50_nhwc_tfu_0.5_int8_fp16.model";
  50. static const std::vector<float> preproc_mean{104.f, 117.f, 123.f};
  51. static const std::vector<float> preproc_std{1.f, 1.f, 1.f};
  52. static constexpr bool preproc_normalize{false};
  53. static constexpr bool keep_aspect_ratio{true};
  54. static constexpr int pad_value{128};
  55. static constexpr bool transpose{false};
  56. #else
  57. constexpr const char* model_url =
  58. "http://video.cambricon.com/models/MLU270/Primary_Detector/ssd/resnet34_ssd.cambricon";
  59. #endif
  60. constexpr const char* image_path = "../../../tests/data/500x500.jpg";
  61. class TestObserver : public Observer {
  62. public:
  63. TestObserver(std::promise<Status>& get_response) : get_response_(get_response) {} // NOLINT
  64. void Response(Status status, PackagePtr data, any user_data) noexcept override {
  65. std::lock_guard<std::mutex> lk(response_mut_);
  66. response_list_.emplace_back(std::move(data));
  67. udata_list_.emplace_back(std::move(user_data));
  68. if (first_response_) {
  69. get_response_.set_value(status);
  70. first_response_ = false;
  71. }
  72. }
  73. PackagePtr GetPackage() {
  74. std::lock_guard<std::mutex> lk(response_mut_);
  75. if (response_list_.empty()) {
  76. return nullptr;
  77. }
  78. auto response = std::move(response_list_.front());
  79. response_list_.pop_front();
  80. return response;
  81. }
  82. std::pair<PackagePtr, any> GetResponse() {
  83. std::lock_guard<std::mutex> lk(response_mut_);
  84. if (response_list_.empty()) {
  85. return {nullptr, nullptr};
  86. }
  87. auto response = std::move(response_list_.front());
  88. auto udata = std::move(udata_list_.front());
  89. response_list_.pop_front();
  90. udata_list_.pop_front();
  91. return std::make_pair(response, udata);
  92. }
  93. uint32_t ResponseNum() {
  94. std::lock_guard<std::mutex> lk(response_mut_);
  95. return response_list_.size();
  96. }
  97. private:
  98. std::promise<Status>& get_response_;
  99. std::list<PackagePtr> response_list_;
  100. std::list<any> udata_list_;
  101. std::mutex response_mut_;
  102. bool first_response_{true};
  103. };
  104. auto g_empty_postproc_func = [](InferData*, const ModelIO&, const ModelInfo& m) { return true; };
  105. auto g_empty_preproc_func = [](ModelIO*, const InferData&, const ModelInfo&) { return true; };
  106. class InferServerRequestTest : public InferServerTestAPI {
  107. protected:
  108. void SetUp() override {
  109. SetMluContext();
  110. model_ = server_->LoadModel(model_url);
  111. if (!model_) {
  112. std::cerr << "load model failed";
  113. std::terminate();
  114. }
  115. preproc_mlu_ = PreprocessorMLU::Create();
  116. preproc_host_ = PreprocessorHost::Create();
  117. preproc_host_->SetParams<PreprocessorHost::ProcessFunction>("process_function", g_empty_preproc_func);
  118. empty_preproc_host_ = PreprocessorHost::Create();
  119. postproc_ = Postprocessor::Create();
  120. postproc_->SetParams<Postprocessor::ProcessFunction>("process_function", g_empty_postproc_func);
  121. observer_ = std::make_shared<TestObserver>(get_response_);
  122. }
  123. void TearDown() override {}
  124. video::VideoFrame ConvertToVideoFrame(uint8_t* img_nv12, size_t w, size_t h) {
  125. size_t frame_size = w * h;
  126. Buffer y_memory(frame_size, 0);
  127. Buffer uv_memory(frame_size / 2, 0);
  128. y_memory.CopyFrom(img_nv12, frame_size);
  129. uv_memory.CopyFrom(img_nv12 + frame_size, frame_size / 2);
  130. video::VideoFrame frame;
  131. frame.plane[0] = y_memory;
  132. frame.plane[1] = uv_memory;
  133. frame.stride[0] = 1;
  134. frame.stride[1] = 1;
  135. frame.width = w;
  136. frame.height = h;
  137. frame.plane_num = 2;
  138. frame.format = PixelFmt::NV12;
  139. return frame;
  140. }
  141. PackagePtr PrepareInput(const std::string& image_path, size_t data_num) {
  142. PackagePtr in = std::make_shared<Package>();
  143. cv::Mat img = cv::imread(GetExePath() + image_path);
  144. size_t frame_size = img.cols * img.rows;
  145. uint8_t* img_nv12 = new uint8_t[frame_size * 3 / 2];
  146. cvt_bgr_to_yuv420sp(img, 1, PixelFmt::NV12, img_nv12);
  147. for (size_t i = 0; i < data_num; ++i) {
  148. in->data.emplace_back(new InferData);
  149. in->data[i]->Set(ConvertToVideoFrame(img_nv12, img.cols, img.rows));
  150. }
  151. delete[] img_nv12;
  152. return in;
  153. }
  154. PackagePtr PrepareOpenCVInput(const std::string& image_path, size_t data_num) {
  155. PackagePtr in = std::make_shared<Package>();
  156. std::cout << "image path: " << image_path << std::endl;
  157. cv::Mat img = cv::imread(GetExePath() + image_path);
  158. for (size_t i = 0; i < data_num; ++i) {
  159. video::OpencvFrame cvframe;
  160. cvframe.img = img;
  161. cvframe.fmt = PixelFmt::BGR24;
  162. in->data.emplace_back(new InferData);
  163. in->data[i]->Set(std::move(cvframe));
  164. }
  165. return in;
  166. }
  167. Session_t PrepareSession(const std::string& name, std::shared_ptr<Processor> preproc,
  168. std::shared_ptr<Processor> postproc, size_t batch_timeout, BatchStrategy strategy,
  169. std::shared_ptr<Observer> observer, bool cncv_used = false) {
  170. SessionDesc desc;
  171. desc.name = name;
  172. desc.model = model_;
  173. desc.strategy = strategy;
  174. desc.preproc = std::move(preproc);
  175. desc.postproc = std::move(postproc);
  176. desc.batch_timeout = batch_timeout;
  177. desc.host_input_layout = {DataType::FLOAT32, DimOrder::NHWC};
  178. desc.engine_num = 2;
  179. desc.show_perf = true;
  180. desc.priority = 0;
  181. desc.host_output_layout = {infer_server::DataType::FLOAT32, infer_server::DimOrder::NCHW};
  182. #ifdef CNIS_USE_MAGICMIND
  183. desc.preproc->SetParams("dst_format", PixelFmt::RGB24, "preprocess_type", PreprocessType::CNCV_PREPROC, "mean",
  184. preproc_mean, "std", preproc_std, "normalize", preproc_normalize);
  185. #else
  186. edk::MluContext context(device_id_);
  187. auto version = context.GetCoreVersion();
  188. if (version != edk::CoreVersion::MLU220 && version != edk::CoreVersion::MLU270) {
  189. std::cerr << "Unsupport core version" << static_cast<int>(version) << std::endl;
  190. return nullptr;
  191. }
  192. auto p_type = cncv_used ? PreprocessType::CNCV_PREPROC : PreprocessType::RESIZE_CONVERT;
  193. if (version == edk::CoreVersion::MLU220) p_type = PreprocessType::SCALER;
  194. desc.preproc->SetParams("dst_format", PixelFmt::ARGB, "preprocess_type", p_type);
  195. #endif
  196. if (observer) {
  197. return server_->CreateSession(desc, observer);
  198. } else {
  199. return server_->CreateSyncSession(desc);
  200. }
  201. }
  202. void WaitAsyncDone() {
  203. auto f = get_response_.get_future();
  204. ASSERT_NE(f.wait_for(std::chrono::seconds(30)), std::future_status::timeout) << "wait for response timeout";
  205. EXPECT_EQ(f.get(), Status::SUCCESS);
  206. }
  207. ModelPtr model_;
  208. std::shared_ptr<Processor> preproc_mlu_;
  209. std::shared_ptr<Processor> preproc_host_;
  210. std::shared_ptr<Processor> empty_preproc_host_;
  211. std::shared_ptr<Processor> postproc_;
  212. std::shared_ptr<TestObserver> observer_;
  213. std::promise<Status> get_response_;
  214. };
  215. class MyPostprocessor : public ProcessorForkable<MyPostprocessor> {
  216. public:
  217. MyPostprocessor() : ProcessorForkable<MyPostprocessor>("MyPostprocessor") {}
  218. ~MyPostprocessor() {}
  219. Status Process(PackagePtr pack) noexcept override {
  220. if (!pack->predict_io || !pack->predict_io->HasValue()) return Status::ERROR_BACKEND;
  221. if (!SetCurrentDevice(dev_id_)) return Status::ERROR_BACKEND;
  222. auto output = pack->predict_io->Get<ModelIO>();
  223. auto& shape = output.shapes[0];
  224. for (uint32_t idx = 0; idx < pack->data.size(); ++idx) {
  225. auto buf_size = output.buffers[0].MemorySize() / shape[0];
  226. Buffer buf(buf_size, dev_id_);
  227. buf.CopyFrom(output.buffers[0](idx * shape.DataCount()), buf_size);
  228. pack->data[idx]->Set(buf);
  229. }
  230. return Status::SUCCESS;
  231. }
  232. Status Init() noexcept override {
  233. constexpr const char* params[] = {"model_info", "device_id"};
  234. for (auto p : params) {
  235. if (!HaveParam(p)) {
  236. LOG(ERROR) << p << " has not been set!";
  237. return Status::INVALID_PARAM;
  238. }
  239. }
  240. try {
  241. model_ = GetParam<ModelPtr>("model_info");
  242. dev_id_ = GetParam<int>("device_id");
  243. } catch (bad_any_cast&) {
  244. LOG(ERROR) << "Unmatched data type";
  245. return Status::WRONG_TYPE;
  246. }
  247. return Status::SUCCESS;
  248. }
  249. private:
  250. ModelPtr model_;
  251. int dev_id_;
  252. };
  253. TEST_F(InferServerRequestTest, EmptyPackage) {
  254. Session_t session =
  255. PrepareSession("empty package process", preproc_mlu_, postproc_, 5, BatchStrategy::DYNAMIC, observer_);
  256. ASSERT_NE(session, nullptr);
  257. EXPECT_EQ(model_, server_->GetModel(session));
  258. constexpr const char* tag = "EmptyPackage";
  259. auto in = PrepareInput(image_path, 10);
  260. in->tag = tag;
  261. ASSERT_TRUE(server_->Request(session, std::move(in), nullptr));
  262. in = Package::Create(0, tag);
  263. ASSERT_TRUE(server_->Request(session, std::move(in), nullptr));
  264. video::VideoInferServer vs(device_id_);
  265. ASSERT_TRUE(vs.Request(session, video::VideoFrame{}, {}, tag, nullptr));
  266. server_->WaitTaskDone(session, tag);
  267. WaitAsyncDone();
  268. EXPECT_EQ(observer_->ResponseNum(), 3u);
  269. server_->DestroySession(session);
  270. }
  271. TEST_F(InferServerRequestTest, DynamicBatch) {
  272. Session_t session =
  273. PrepareSession("dynamic batch process", preproc_mlu_, postproc_, 5, BatchStrategy::DYNAMIC, observer_);
  274. ASSERT_NE(session, nullptr);
  275. auto in = PrepareInput(image_path, 10);
  276. server_->Request(session, std::move(in), nullptr);
  277. WaitAsyncDone();
  278. server_->DestroySession(session);
  279. }
  280. TEST_F(InferServerRequestTest, SkipPostproc) {
  281. // no process_function param
  282. postproc_->PopParam<Postprocessor::ProcessFunction>("process_function");
  283. Session_t session =
  284. PrepareSession("skip postproc process", preproc_mlu_, postproc_, 5, BatchStrategy::DYNAMIC, observer_);
  285. ASSERT_NE(session, nullptr);
  286. constexpr size_t data_number = 10;
  287. auto in = PrepareInput(image_path, data_number);
  288. server_->Request(session, std::move(in), nullptr);
  289. WaitAsyncDone();
  290. auto response = observer_->GetPackage();
  291. EXPECT_NO_THROW(response->data[0]->Get<ModelIO>());
  292. EXPECT_EQ(response->data.size(), data_number);
  293. server_->DestroySession(session);
  294. // no postprocessor
  295. session = PrepareSession("skip postproc process", preproc_mlu_, nullptr, 5, BatchStrategy::DYNAMIC, nullptr);
  296. ASSERT_NE(session, nullptr);
  297. in = PrepareInput(image_path, data_number);
  298. response.reset(new Package);
  299. Status status;
  300. server_->RequestSync(session, std::move(in), &status, response, 1000);
  301. EXPECT_NO_THROW(response->data[0]->Get<ModelIO>());
  302. EXPECT_EQ(response->data.size(), data_number);
  303. server_->DestroySession(session);
  304. }
  305. TEST_F(InferServerRequestTest, StaticBatch) {
  306. Session_t session =
  307. PrepareSession("static batch process", preproc_mlu_, postproc_, 5, BatchStrategy::STATIC, observer_);
  308. ASSERT_NE(session, nullptr);
  309. constexpr size_t data_number = 10;
  310. auto in = PrepareInput(image_path, data_number);
  311. server_->Request(session, std::move(in), nullptr);
  312. WaitAsyncDone();
  313. auto response = observer_->GetPackage();
  314. EXPECT_EQ(response->data.size(), data_number);
  315. server_->DestroySession(session);
  316. }
  317. TEST_F(InferServerRequestTest, ProcessFailed) {
  318. std::string tag = "process failed";
  319. Session_t session = PrepareSession(tag, preproc_mlu_, postproc_, 5, BatchStrategy::STATIC, nullptr);
  320. ASSERT_NE(session, nullptr);
  321. constexpr size_t data_number = 1;
  322. auto in = Package::Create(data_number, "");
  323. Status s;
  324. PackagePtr out = Package::Create(0);
  325. server_->RequestSync(session, std::move(in), &s, out);
  326. server_->WaitTaskDone(session, tag);
  327. EXPECT_NE(s, Status::SUCCESS);
  328. auto fut = std::async(std::launch::async, [this, session]() { server_->DestroySession(session); });
  329. EXPECT_NE(std::future_status::timeout, fut.wait_for(std::chrono::seconds(1)));
  330. }
  331. TEST_F(InferServerRequestTest, DynamicBatchPreprocessHost) {
  332. #ifdef CNIS_USE_MAGICMIND
  333. preproc_host_->SetParams<PreprocessorHost::ProcessFunction>(
  334. "process_function", video::OpencvPreproc::GetFunction(PixelFmt::RGB24, preproc_mean, preproc_std,
  335. preproc_normalize, keep_aspect_ratio, pad_value,
  336. transpose));
  337. #endif
  338. Session_t session = PrepareSession("dynamic batch process with preprocess host", preproc_host_, postproc_, 5,
  339. BatchStrategy::DYNAMIC, observer_);
  340. ASSERT_NE(session, nullptr);
  341. constexpr size_t data_number = 4;
  342. auto in = PrepareOpenCVInput(image_path, data_number);
  343. server_->Request(session, std::move(in), nullptr);
  344. WaitAsyncDone();
  345. auto response = observer_->GetPackage();
  346. ASSERT_TRUE(response);
  347. EXPECT_EQ(response->data.size(), data_number);
  348. server_->DestroySession(session);
  349. }
  350. TEST_F(InferServerRequestTest, InputContinuousData) {
  351. int dev_id = device_id_;
  352. Session_t session =
  353. PrepareSession("continuous data input", empty_preproc_host_, postproc_, 5, BatchStrategy::STATIC, nullptr);
  354. ASSERT_NE(session, nullptr);
  355. size_t data_size = 12;
  356. auto in = Package::Create(data_size);
  357. ModelIO input;
  358. input.buffers.reserve(model_->InputNum());
  359. input.shapes.reserve(model_->InputNum());
  360. for (uint32_t idx = 0; idx < model_->InputNum(); ++idx) {
  361. size_t len = model_->InputShape(idx).BatchDataCount() * GetTypeSize(model_->InputLayout(idx).dtype);
  362. input.buffers.emplace_back(len, dev_id);
  363. (void)input.buffers[idx].MutableData();
  364. input.shapes.emplace_back(model_->InputShape(idx));
  365. input.shapes[idx][0] = data_size;
  366. }
  367. in->predict_io.reset(new InferData);
  368. in->predict_io->Set(input);
  369. Status status;
  370. PackagePtr output = std::make_shared<Package>();
  371. server_->RequestSync(session, std::move(in), &status, output);
  372. EXPECT_EQ(output->data.size(), data_size);
  373. server_->DestroySession(session);
  374. }
  375. TEST_F(InferServerRequestTest, DynamicBatchSync) {
  376. auto postproc = std::make_shared<Postprocessor>();
  377. postproc->SetParams<Postprocessor::ProcessFunction>("process_function", g_empty_postproc_func);
  378. Session_t session = PrepareSession("dynamic batch sync", preproc_mlu_, postproc_, 5, BatchStrategy::DYNAMIC, nullptr);
  379. ASSERT_NE(session, nullptr);
  380. auto in = PrepareInput(image_path, 10);
  381. Status status;
  382. auto out = std::make_shared<Package>();
  383. EXPECT_TRUE(server_->RequestSync(session, in, &status, out));
  384. EXPECT_EQ(out->data.size(), in->data.size());
  385. EXPECT_EQ(status, Status::SUCCESS);
  386. server_->DestroySession(session);
  387. }
  388. TEST_F(InferServerRequestTest, DynamicBatchSyncTimeout) {
  389. Session_t session =
  390. PrepareSession("dynamic batch sync timeout", preproc_mlu_, postproc_, 5, BatchStrategy::DYNAMIC, nullptr);
  391. ASSERT_NE(session, nullptr);
  392. auto in = PrepareInput(image_path, 10);
  393. Status status;
  394. auto out = std::make_shared<Package>();
  395. EXPECT_TRUE(server_->RequestSync(session, std::move(in), &status, out, 5));
  396. EXPECT_EQ(out->data.size(), 0u);
  397. EXPECT_EQ(status, Status::TIMEOUT);
  398. server_->DestroySession(session);
  399. }
  400. TEST_F(InferServerRequestTest, OutputMluData) {
  401. std::shared_ptr<Processor> postproc = MyPostprocessor::Create();
  402. Session_t session = PrepareSession("output mlu data", preproc_mlu_, postproc, 5, BatchStrategy::DYNAMIC, nullptr);
  403. ASSERT_NE(session, nullptr);
  404. auto in = PrepareInput(image_path, 10);
  405. Status status;
  406. auto out = std::make_shared<Package>();
  407. EXPECT_TRUE(server_->RequestSync(session, in, &status, out));
  408. ASSERT_EQ(out->data.size(), in->data.size());
  409. ASSERT_EQ(status, Status::SUCCESS);
  410. for (auto& it : out->data) {
  411. Buffer buf;
  412. ASSERT_NO_THROW(buf = it->Get<Buffer>());
  413. EXPECT_TRUE(buf.OnMlu());
  414. EXPECT_TRUE(buf.OwnMemory());
  415. }
  416. out->data.clear();
  417. server_->DestroySession(session);
  418. }
  419. TEST_F(InferServerRequestTest, ParallelInfer) {
  420. int dev_id = device_id_;
  421. auto my_postproc = MyPostprocessor::Create();
  422. auto another_empty_preproc_host = PreprocessorHost::Create();
  423. Session_t session1 =
  424. PrepareSession("continuous data input 1", empty_preproc_host_, postproc_, 5, BatchStrategy::STATIC, nullptr);
  425. ASSERT_NE(session1, nullptr);
  426. Session_t session2 = PrepareSession("continuous data input 2", another_empty_preproc_host, my_postproc, 5,
  427. BatchStrategy::STATIC, nullptr);
  428. ASSERT_NE(session2, nullptr);
  429. size_t data_size = 1;
  430. ModelIO input;
  431. input.buffers.reserve(model_->InputNum());
  432. for (uint32_t idx = 0; idx < model_->InputNum(); ++idx) {
  433. size_t len = model_->InputShape(idx).BatchDataCount() * GetTypeSize(model_->InputLayout(idx).dtype);
  434. input.buffers.emplace_back(len, dev_id);
  435. (void)input.buffers[idx].MutableData();
  436. input.shapes.emplace_back(model_->InputShape(idx));
  437. input.shapes[idx][0] = data_size;
  438. }
  439. auto in1 = Package::Create(data_size);
  440. in1->predict_io.reset(new InferData);
  441. in1->predict_io->Set(input);
  442. Status status1;
  443. PackagePtr output1 = std::make_shared<Package>();
  444. auto in2 = Package::Create(data_size);
  445. in2->predict_io.reset(new InferData);
  446. in2->predict_io->Set(input);
  447. Status status2;
  448. PackagePtr output2 = std::make_shared<Package>();
  449. auto fut1 = std::async(std::launch::async, [this, session1, &in1, &status1, &output1]() {
  450. return server_->RequestSync(session1, std::move(in1), &status1, output1);
  451. });
  452. auto fut2 = std::async(std::launch::async, [this, session2, &in2, &status2, &output2]() {
  453. return server_->RequestSync(session2, std::move(in2), &status2, output2);
  454. });
  455. ASSERT_TRUE(fut1.get());
  456. ASSERT_TRUE(fut2.get());
  457. ASSERT_EQ(status1, Status::SUCCESS);
  458. ASSERT_EQ(status2, Status::SUCCESS);
  459. EXPECT_EQ(output1->data.size(), data_size);
  460. EXPECT_EQ(output2->data.size(), data_size);
  461. server_->DestroySession(session1);
  462. server_->DestroySession(session2);
  463. }
  464. TEST_F(InferServerRequestTest, ResponseOrder) {
  465. Session_t session = PrepareSession("response order", preproc_host_, nullptr, 200, BatchStrategy::DYNAMIC, observer_);
  466. ASSERT_NE(session, nullptr);
  467. constexpr int test_number = 1000;
  468. constexpr const char* tag = "test response order";
  469. for (int idx = 0; idx < test_number; ++idx) {
  470. auto in = Package::Create(4);
  471. in->tag = tag;
  472. ASSERT_TRUE(server_->Request(session, std::move(in), idx));
  473. }
  474. server_->WaitTaskDone(session, tag);
  475. int response_number = 0;
  476. while (response_number < test_number) {
  477. auto response = observer_->GetResponse();
  478. if (!response.first) continue;
  479. EXPECT_EQ(response.first->data.size(), 4u);
  480. EXPECT_EQ(any_cast<int>(response.second), response_number++);
  481. }
  482. server_->DestroySession(session);
  483. }
  484. TEST_F(InferServerRequestTest, MultiSessionProcessDynamic) {
  485. InferServer s(device_id_);
  486. cv::Mat img = cv::imread(GetExePath() + image_path);
  487. uint8_t* img_nv12 = new uint8_t[img.cols * img.rows * 3 / 2];
  488. cvt_bgr_to_yuv420sp(img, 1, PixelFmt::NV12, img_nv12);
  489. std::vector<std::thread> ts;
  490. auto proc_func = [this, img_nv12, &img](int id) {
  491. std::promise<Status> get_response;
  492. auto postproc = std::make_shared<Postprocessor>();
  493. postproc->SetParams<Postprocessor::ProcessFunction>("process_function", g_empty_postproc_func);
  494. Session_t session =
  495. PrepareSession("multisession dynamic batch [" + std::to_string(id), std::make_shared<PreprocessorMLU>(),
  496. postproc, 200, BatchStrategy::DYNAMIC, std::make_shared<TestObserver>(get_response));
  497. ASSERT_NE(session, nullptr);
  498. PackagePtr in = std::make_shared<Package>();
  499. for (int i = 0; i < 10; ++i) {
  500. in->data.push_back(std::make_shared<InferData>());
  501. in->data[i]->Set(ConvertToVideoFrame(img_nv12, img.cols, img.rows));
  502. }
  503. server_->Request(session, std::move(in), nullptr);
  504. auto f = get_response.get_future();
  505. ASSERT_NE(f.wait_for(std::chrono::seconds(1)), std::future_status::timeout) << "wait for response timeout";
  506. EXPECT_EQ(f.get(), Status::SUCCESS);
  507. server_->DestroySession(session);
  508. };
  509. for (int i = 0; i < 10; i++) {
  510. ts.emplace_back(proc_func, i);
  511. }
  512. for (int i = 0; i < 10; ++i) {
  513. if (ts[i].joinable()) {
  514. ts[i].join();
  515. }
  516. }
  517. delete[] img_nv12;
  518. }
  519. TEST_F(InferServerRequestTest, MultiThreadProcessDynamic) {
  520. InferServer s(device_id_);
  521. cv::Mat img = cv::imread(GetExePath() + image_path);
  522. uint8_t* img_nv12 = new uint8_t[img.cols * img.rows * 3 / 2];
  523. cvt_bgr_to_yuv420sp(img, 1, PixelFmt::NV12, img_nv12);
  524. std::vector<std::thread> ts;
  525. auto postproc = std::make_shared<Postprocessor>();
  526. postproc->SetParams<Postprocessor::ProcessFunction>("process_function", g_empty_postproc_func);
  527. Session_t session = PrepareSession("multithread dynamic batch", std::make_shared<PreprocessorMLU>(), postproc, 200,
  528. BatchStrategy::DYNAMIC, nullptr);
  529. ASSERT_NE(session, nullptr);
  530. auto proc_func = [this, img_nv12, &img, session](int id) {
  531. PackagePtr in = std::make_shared<Package>();
  532. for (int i = 0; i < 1; ++i) {
  533. in->data.push_back(std::make_shared<InferData>());
  534. in->data[i]->Set(ConvertToVideoFrame(img_nv12, img.cols, img.rows));
  535. }
  536. in->tag = std::to_string(id);
  537. Status status;
  538. auto out = std::make_shared<Package>();
  539. ASSERT_TRUE(server_->RequestSync(session, std::move(in), &status, out, 10000));
  540. EXPECT_EQ(status, Status::SUCCESS);
  541. EXPECT_EQ(out->data.size(), 1u);
  542. };
  543. for (int i = 0; i < 10; i++) {
  544. ts.emplace_back(proc_func, i);
  545. }
  546. for (int i = 0; i < 10; ++i) {
  547. if (ts[i].joinable()) {
  548. ts[i].join();
  549. }
  550. }
  551. server_->DestroySession(session);
  552. delete[] img_nv12;
  553. }
  554. #ifdef CNIS_HAVE_CNCV
  555. TEST_F(InferServerRequestTest, DynamicBatch_CNCV) {
  556. Session_t session =
  557. PrepareSession("dynamic batch process", preproc_mlu_, postproc_, 5, BatchStrategy::DYNAMIC, observer_, true);
  558. ASSERT_NE(session, nullptr);
  559. auto in = PrepareInput(image_path, 10);
  560. server_->Request(session, std::move(in), nullptr);
  561. WaitAsyncDone();
  562. server_->DestroySession(session);
  563. }
  564. TEST_F(InferServerRequestTest, StaticBatch_CNCV) {
  565. Session_t session =
  566. PrepareSession("static batch process", preproc_mlu_, postproc_, 5, BatchStrategy::STATIC, observer_, true);
  567. ASSERT_NE(session, nullptr);
  568. constexpr size_t data_number = 10;
  569. auto in = PrepareInput(image_path, data_number);
  570. server_->Request(session, std::move(in), nullptr);
  571. WaitAsyncDone();
  572. auto response = observer_->GetPackage();
  573. EXPECT_EQ(response->data.size(), data_number);
  574. server_->DestroySession(session);
  575. }
  576. TEST_F(InferServerRequestTest, ProcessFailed_CNCV) {
  577. std::string tag = "process failed";
  578. Session_t session = PrepareSession(tag, preproc_mlu_, postproc_, 5, BatchStrategy::STATIC, nullptr, true);
  579. ASSERT_NE(session, nullptr);
  580. constexpr size_t data_number = 1;
  581. auto in = Package::Create(data_number, "");
  582. Status s;
  583. PackagePtr out = Package::Create(0);
  584. server_->RequestSync(session, std::move(in), &s, out);
  585. server_->WaitTaskDone(session, tag);
  586. EXPECT_NE(s, Status::SUCCESS);
  587. auto fut = std::async(std::launch::async, [this, session]() { server_->DestroySession(session); });
  588. EXPECT_NE(std::future_status::timeout, fut.wait_for(std::chrono::seconds(1)));
  589. }
  590. TEST_F(InferServerRequestTest, DynamicBatchSyncTimeout_CNCV) {
  591. Session_t session =
  592. PrepareSession("dynamic batch sync timeout", preproc_mlu_, postproc_, 5, BatchStrategy::DYNAMIC, nullptr, true);
  593. ASSERT_NE(session, nullptr);
  594. auto in = PrepareInput(image_path, 10);
  595. Status status;
  596. auto out = std::make_shared<Package>();
  597. EXPECT_TRUE(server_->RequestSync(session, std::move(in), &status, out, 5));
  598. EXPECT_EQ(out->data.size(), 0u);
  599. EXPECT_EQ(status, Status::TIMEOUT);
  600. server_->DestroySession(session);
  601. }
  602. #endif // CNIS_HAVE_CNCV
  603. } // namespace infer_server
  604. #endif // CNIS_WITH_CONTRIB