123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- #include <gtest/gtest.h>
- #include <atomic>
- #include <chrono>
- #include <functional>
- #include <future>
- #include <memory>
- #include <mutex>
- #include <thread>
- #include "queuing_server.hpp"
- namespace cnstream {
- class QueuingServerTest {
- public:
- explicit QueuingServerTest(QueuingServer* server) : server_(server) {}
- int GetTicketSize() {
- std::lock_guard<std::mutex> lk(server_->mtx_);
- return static_cast<int>(server_->tickets_q_.size());
- }
- void SetReserved_(bool reserve) { server_->reserved_ = reserve; }
- bool GetReserved_() { return server_->reserved_; }
- int GetTickets_reserved_time() { return static_cast<int>(server_->tickets_q_.back().reserved_time); }
- int GetPreviousTickets_reserved_time(QueuingTicketRoot* qtr) { return static_cast<int>(qtr->reserved_time); }
- QueuingTicketRoot& GetCurrentQueueBack() { return server_->tickets_q_.back(); }
- int Get_shared_with_no_wait(QueuingTicket* pticket) {
- std::future_status status = pticket->wait_for(std::chrono::seconds(1));
-
- if (status == std::future_status::timeout) {
- std::cout << "Timeout" << std::endl;
- return 1;
- }
- return 0;
- }
- private:
- QueuingServer* server_ = nullptr;
- };
- TEST(Inferencer, QueuingServer_PickUpTicket) {
- std::shared_ptr<QueuingServer> qserver = std::make_shared<QueuingServer>();
- QueuingServerTest qserver_test(qserver.get());
- QueuingTicket ticket1;
- EXPECT_NO_THROW(ticket1 = qserver->PickUpTicket(true));
- EXPECT_EQ(1, qserver_test.GetTicketSize());
-
- EXPECT_EQ(qserver_test.Get_shared_with_no_wait(&ticket1), 0);
- QueuingTicket ticket2 = qserver->PickUpTicket(true);
- QueuingTicket ticket3 = qserver->PickUpTicket();
- EXPECT_EQ(qserver_test.GetTickets_reserved_time(), 2) << "error, should not set the reserved_time";
-
- EXPECT_EQ(qserver_test.Get_shared_with_no_wait(&ticket3), 0);
- QueuingTicket ticket4 = qserver->PickUpTicket();
- EXPECT_EQ(qserver_test.GetTickets_reserved_time(), 0);
-
- EXPECT_EQ(qserver_test.Get_shared_with_no_wait(&ticket4), 1);
- }
- TEST(Inferencer, QueuingServer_PickUpNewTicket) {
- std::shared_ptr<QueuingServer> qserver = std::make_shared<QueuingServer>();
- QueuingServerTest qserver_test(qserver.get());
- QueuingTicket ticket1 = qserver->PickUpNewTicket(true);
- EXPECT_EQ(1, qserver_test.GetTicketSize());
- EXPECT_EQ(qserver_test.GetTickets_reserved_time(), 1);
- QueuingTicketRoot& root1 = qserver_test.GetCurrentQueueBack();
-
- QueuingTicket ticket2 = qserver->PickUpNewTicket();
-
- EXPECT_EQ(0, qserver_test.GetPreviousTickets_reserved_time(&root1));
- }
- TEST(Inferencer, QueuingServer_DeallingDone) {
- std::shared_ptr<QueuingServer> qserver = std::make_shared<QueuingServer>();
- QueuingServerTest qserver_test(qserver.get());
-
- EXPECT_NO_THROW(qserver->DeallingDone());
-
- QueuingTicket ticket1 = qserver->PickUpTicket(false);
- ASSERT_EQ(1, qserver_test.GetTicketSize());
- QueuingTicket ticket2 = qserver->PickUpTicket(false);
- QueuingTicket ticket3 = qserver->PickUpTicket(true);
- QueuingTicket ticket4 = qserver->PickUpTicket(true);
- QueuingTicket ticket5 = qserver->PickUpTicket(false);
- int wait_time = 100;
- std::atomic<bool> DeallingDoneCall(false);
- std::function<double()> wait_Dealling([&]() -> double {
- DeallingDoneCall.store(true);
- auto stime = std::chrono::steady_clock::now();
- std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
- qserver->DeallingDone();
- auto etime = std::chrono::steady_clock::now();
- std::chrono::duration<double, std::milli> diff = etime - stime;
- return diff.count();
- });
- std::future<double> task_future = std::async(std::launch::async, wait_Dealling);
- while (!DeallingDoneCall.load()) {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- double real_wait_time = task_future.get();
- EXPECT_GE(real_wait_time, static_cast<double>(wait_time));
-
- EXPECT_EQ(qserver_test.Get_shared_with_no_wait(&ticket2), 0);
- qserver->DeallingDone();
- qserver->DeallingDone();
- EXPECT_EQ(qserver_test.GetTickets_reserved_time(), 1);
- EXPECT_EQ(1, qserver_test.GetTicketSize());
- }
- TEST(Inferencer, QueuingServer_WaitByTicket) {
- QueuingServer qserver;
- QueuingServerTest qserver_test(&qserver);
- QueuingTicket ticket1 = qserver.PickUpTicket();
-
- QueuingTicket ticket = qserver.PickUpTicket();
-
- int wait_time = 100;
- std::atomic<bool> task_run(false);
- std::function<double()> wait_task([&]() -> double {
- task_run.store(true);
- auto stime = std::chrono::steady_clock::now();
- qserver.WaitByTicket(&ticket);
- auto etime = std::chrono::steady_clock::now();
- std::chrono::duration<double, std::milli> diff = etime - stime;
- return diff.count();
- });
- std::future<double> task_future = std::async(std::launch::async, wait_task);
- while (!task_run.load()) {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
- qserver.DeallingDone();
- double real_wait_time = task_future.get();
- EXPECT_GE(real_wait_time, static_cast<double>(wait_time));
- }
- }
|