threadsafe_queue.h 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. /*************************************************************************
  2. * Copyright (C) [2019] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #ifndef CXXUTIL_THREADSAFE_QUEUE_H_
  21. #define CXXUTIL_THREADSAFE_QUEUE_H_
  22. #include <condition_variable>
  23. #include <mutex>
  24. #include <queue>
  25. #include <utility>
  26. namespace edk {
  27. /**
  28. * @brief encapsulated queue with threadsafe operation
  29. */
  30. template <typename T>
  31. class ThreadSafeQueue {
  32. public:
  33. ThreadSafeQueue() = default;
  34. ThreadSafeQueue(const ThreadSafeQueue& other) = delete;
  35. ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;
  36. /**
  37. * @brief try to pop an element from queue
  38. *
  39. * @param value reference to poped element
  40. * @retval true if succeeded
  41. * @retval false if failed
  42. */
  43. bool TryPop(T& value); // NOLINT
  44. /**
  45. * @brief wait until queue is not empty, and pop an element
  46. *
  47. * @param value reference to poped element
  48. */
  49. void WaitAndPop(T& value); // NOLINT
  50. /**
  51. * @brief wait until queue is not empty and pop an element, or timeout reached
  52. *
  53. * @param value reference to poped element
  54. * @param timeout maximum duration to block for
  55. * @retval true if succeeded
  56. * @retval false if timeout reached
  57. */
  58. bool WaitAndTryPop(T& value, const std::chrono::microseconds timeout); //NOLINT
  59. /**
  60. * @brief Push the given element value to the end of the queue
  61. *
  62. * @param new_value the value of the element to push
  63. */
  64. void Push(const T& new_value);
  65. /**
  66. * @brief Push the given element value to the end of the queue
  67. *
  68. * @param new_value the value of the element to push
  69. */
  70. void Push(T&& new_value);
  71. /**
  72. * @brief Check if the queue has no elements
  73. *
  74. * @retval true if the queue is empty
  75. * @retval false otherwise
  76. */
  77. bool Empty() {
  78. std::lock_guard<std::mutex> lk(data_m_);
  79. return q_.empty();
  80. }
  81. /**
  82. * @brief Returns the number of elements in the queue
  83. *
  84. * @return The number of elements in the queue
  85. */
  86. uint32_t Size() {
  87. std::lock_guard<std::mutex> lk(data_m_);
  88. return q_.size();
  89. }
  90. private:
  91. std::mutex data_m_;
  92. std::queue<T> q_;
  93. std::condition_variable notempty_cond_;
  94. };
  95. template <typename T>
  96. bool ThreadSafeQueue<T>::TryPop(T& value) { //NOLINT
  97. std::lock_guard<std::mutex> lk(data_m_);
  98. if (q_.empty()) {
  99. return false;
  100. } else {
  101. value = q_.front();
  102. q_.pop();
  103. return true;
  104. }
  105. }
  106. template <typename T>
  107. void ThreadSafeQueue<T>::WaitAndPop(T& value) { //NOLINT
  108. std::unique_lock<std::mutex> lk(data_m_);
  109. notempty_cond_.wait(lk, [&] { return !q_.empty(); });
  110. value = q_.front();
  111. q_.pop();
  112. }
  113. template <typename T>
  114. bool ThreadSafeQueue<T>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time) { //NOLINT
  115. std::unique_lock<std::mutex> lk(data_m_);
  116. if (notempty_cond_.wait_for(lk, rel_time, [&] { return !q_.empty(); })) {
  117. value = q_.front();
  118. q_.pop();
  119. return true;
  120. } else {
  121. return false;
  122. }
  123. }
  124. template <typename T>
  125. void ThreadSafeQueue<T>::Push(const T& new_value) {
  126. std::lock_guard<std::mutex> lk(data_m_);
  127. q_.push(new_value);
  128. notempty_cond_.notify_one();
  129. }
  130. template <typename T>
  131. void ThreadSafeQueue<T>::Push(T&& new_value) {
  132. std::lock_guard<std::mutex> lk(data_m_);
  133. q_.push(std::move(new_value));
  134. notempty_cond_.notify_one();
  135. }
  136. } // namespace edk
  137. #endif // CXXUTIL_THREADSAFE_QUEUE_H_