test_queuing_server.cpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. /*************************************************************************
  2. * Copyright (C) [2019] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #include <gtest/gtest.h>
  21. #include <atomic>
  22. #include <chrono>
  23. #include <functional>
  24. #include <future>
  25. #include <memory>
  26. #include <mutex>
  27. #include <thread>
  28. #include "queuing_server.hpp"
  29. namespace cnstream {
  30. class QueuingServerTest {
  31. public:
  32. explicit QueuingServerTest(QueuingServer* server) : server_(server) {}
  33. int GetTicketSize() {
  34. std::lock_guard<std::mutex> lk(server_->mtx_);
  35. return static_cast<int>(server_->tickets_q_.size());
  36. }
  37. void SetReserved_(bool reserve) { server_->reserved_ = reserve; }
  38. bool GetReserved_() { return server_->reserved_; }
  39. int GetTickets_reserved_time() { return static_cast<int>(server_->tickets_q_.back().reserved_time); }
  40. int GetPreviousTickets_reserved_time(QueuingTicketRoot* qtr) { return static_cast<int>(qtr->reserved_time); }
  41. QueuingTicketRoot& GetCurrentQueueBack() { return server_->tickets_q_.back(); }
  42. int Get_shared_with_no_wait(QueuingTicket* pticket) {
  43. std::future_status status = pticket->wait_for(std::chrono::seconds(1));
  44. // if has not set_value
  45. if (status == std::future_status::timeout) {
  46. std::cout << "Timeout" << std::endl;
  47. return 1;
  48. }
  49. return 0;
  50. }
  51. private:
  52. QueuingServer* server_ = nullptr;
  53. }; // class QueuingServerTest
  54. TEST(Inferencer, QueuingServer_PickUpTicket) {
  55. std::shared_ptr<QueuingServer> qserver = std::make_shared<QueuingServer>();
  56. QueuingServerTest qserver_test(qserver.get());
  57. QueuingTicket ticket1;
  58. EXPECT_NO_THROW(ticket1 = qserver->PickUpTicket(true));
  59. EXPECT_EQ(1, qserver_test.GetTicketSize());
  60. // queue has only one ticket,call at once
  61. EXPECT_EQ(qserver_test.Get_shared_with_no_wait(&ticket1), 0);
  62. QueuingTicket ticket2 = qserver->PickUpTicket(true); // create another new ticket
  63. QueuingTicket ticket3 = qserver->PickUpTicket();
  64. EXPECT_EQ(qserver_test.GetTickets_reserved_time(), 2) << "error, should not set the reserved_time";
  65. // still an old ticket,should be called(shared_future multi call)
  66. EXPECT_EQ(qserver_test.Get_shared_with_no_wait(&ticket3), 0);
  67. QueuingTicket ticket4 = qserver->PickUpTicket();
  68. EXPECT_EQ(qserver_test.GetTickets_reserved_time(), 0);
  69. // a new ticket, should be not called
  70. EXPECT_EQ(qserver_test.Get_shared_with_no_wait(&ticket4), 1);
  71. }
  72. TEST(Inferencer, QueuingServer_PickUpNewTicket) {
  73. std::shared_ptr<QueuingServer> qserver = std::make_shared<QueuingServer>();
  74. QueuingServerTest qserver_test(qserver.get());
  75. QueuingTicket ticket1 = qserver->PickUpNewTicket(true);
  76. EXPECT_EQ(1, qserver_test.GetTicketSize());
  77. EXPECT_EQ(qserver_test.GetTickets_reserved_time(), 1);
  78. QueuingTicketRoot& root1 = qserver_test.GetCurrentQueueBack();
  79. // create another new ticket
  80. QueuingTicket ticket2 = qserver->PickUpNewTicket();
  81. // last time reserve but this time create a new ticket
  82. EXPECT_EQ(0, qserver_test.GetPreviousTickets_reserved_time(&root1));
  83. }
  84. TEST(Inferencer, QueuingServer_DeallingDone) {
  85. std::shared_ptr<QueuingServer> qserver = std::make_shared<QueuingServer>();
  86. QueuingServerTest qserver_test(qserver.get());
  87. /* no tickets, no throw */
  88. EXPECT_NO_THROW(qserver->DeallingDone());
  89. /* one tickets, no throw , ticket size from 1 to 0 */
  90. QueuingTicket ticket1 = qserver->PickUpTicket(false);
  91. ASSERT_EQ(1, qserver_test.GetTicketSize());
  92. QueuingTicket ticket2 = qserver->PickUpTicket(false); // a new ticket
  93. QueuingTicket ticket3 = qserver->PickUpTicket(true); // a new ticket
  94. QueuingTicket ticket4 = qserver->PickUpTicket(true); // an old ticket
  95. QueuingTicket ticket5 = qserver->PickUpTicket(false); // still an old ticket
  96. int wait_time = 100;
  97. std::atomic<bool> DeallingDoneCall(false);
  98. std::function<double()> wait_Dealling([&]() -> double {
  99. DeallingDoneCall.store(true);
  100. auto stime = std::chrono::steady_clock::now();
  101. std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
  102. qserver->DeallingDone();
  103. auto etime = std::chrono::steady_clock::now();
  104. std::chrono::duration<double, std::milli> diff = etime - stime;
  105. return diff.count();
  106. });
  107. std::future<double> task_future = std::async(std::launch::async, wait_Dealling);
  108. while (!DeallingDoneCall.load()) {
  109. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  110. }
  111. double real_wait_time = task_future.get();
  112. EXPECT_GE(real_wait_time, static_cast<double>(wait_time));
  113. // ticket1 poped, and ticket2 set_value
  114. EXPECT_EQ(qserver_test.Get_shared_with_no_wait(&ticket2), 0);
  115. qserver->DeallingDone();
  116. qserver->DeallingDone();
  117. EXPECT_EQ(qserver_test.GetTickets_reserved_time(), 1);
  118. EXPECT_EQ(1, qserver_test.GetTicketSize());
  119. }
  120. TEST(Inferencer, QueuingServer_WaitByTicket) {
  121. QueuingServer qserver;
  122. QueuingServerTest qserver_test(&qserver);
  123. QueuingTicket ticket1 = qserver.PickUpTicket();
  124. // ticket1 called immediately
  125. QueuingTicket ticket = qserver.PickUpTicket();
  126. /* check wait time */
  127. int wait_time = 100; // ms
  128. std::atomic<bool> task_run(false);
  129. std::function<double()> wait_task([&]() -> double {
  130. task_run.store(true);
  131. auto stime = std::chrono::steady_clock::now();
  132. qserver.WaitByTicket(&ticket);
  133. auto etime = std::chrono::steady_clock::now();
  134. std::chrono::duration<double, std::milli> diff = etime - stime;
  135. return diff.count();
  136. });
  137. std::future<double> task_future = std::async(std::launch::async, wait_task);
  138. while (!task_run.load()) {
  139. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  140. }
  141. std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
  142. qserver.DeallingDone();
  143. double real_wait_time = task_future.get();
  144. EXPECT_GE(real_wait_time, static_cast<double>(wait_time));
  145. }
  146. } // namespace cnstream