cnstream_eventbus.hpp 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. /*************************************************************************
  2. * Copyright (C) [2019] by Cambricon, Inc. All rights reserved
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * The above copyright notice and this permission notice shall be included in
  11. * all copies or substantial portions of the Software.
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  13. * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. *************************************************************************/
  20. #ifndef CNSTREAM_EVENT_BUS_HPP_
  21. #define CNSTREAM_EVENT_BUS_HPP_
  22. /**
  23. * @file cnstream_eventbus.hpp
  24. *
  25. * This file contains a declaration of the EventBus class.
  26. */
  27. #include <atomic>
  28. #include <functional>
  29. #include <list>
  30. #include <mutex>
  31. #include <string>
  32. #include <thread>
  33. #include <utility>
  34. #include "cnstream_common.hpp"
  35. #include "util/cnstream_queue.hpp"
  36. namespace cnstream {
  37. class Pipeline;
  38. /*!
  39. * @enum EventType
  40. *
  41. * @brief Enumeration variables describing the type of event.
  42. */
  43. enum class EventType {
  44. EVENT_INVALID, /*!< An invalid event type. */
  45. EVENT_ERROR, /*!< An error event. */
  46. EVENT_WARNING, /*!< A warning event. */
  47. EVENT_EOS, /*!< An EOS event. */
  48. EVENT_STOP, /*!< A stop event. */
  49. EVENT_STREAM_ERROR, /*!< A stream error event. */
  50. EVENT_TYPE_END /*!< Reserved for users custom events. */
  51. };
  52. /**
  53. * @enum EventHandleFlag
  54. *
  55. * @brief Enumeration variables describing the way how bus watchers handle an event.
  56. */
  57. enum class EventHandleFlag {
  58. EVENT_HANDLE_NULL, /*!< The event is not handled. */
  59. EVENT_HANDLE_INTERCEPTION, /*!< The event has been handled and other bus watchers needn't to handle it. */
  60. EVENT_HANDLE_SYNCED, /*!< The event has been handled and other bus watchers are going to handle it. */
  61. EVENT_HANDLE_STOP /*!< The event has been handled and bus watchers stop all other events' processing. */
  62. };
  63. /**
  64. * @struct Event
  65. *
  66. * @brief The Event is a structure describing the event information.
  67. */
  68. struct Event {
  69. EventType type; ///< The event type.
  70. std::string stream_id; ///< The stream that posts this event.
  71. std::string message; ///< More detailed messages describing the event.
  72. std::string module_name; ///< The module that posts this event.
  73. std::thread::id thread_id; ///< The thread ID from which the event is posted.
  74. };
  75. /**
  76. * @brief Defines an alias of bus watcher function.
  77. *
  78. * @param[in] event The event is polled from the event bus.
  79. *
  80. * @return Returns the flag that specifies how the event is handled.
  81. */
  82. using BusWatcher = std::function<EventHandleFlag(const Event &)>;
  83. /**
  84. * @class EventBus
  85. *
  86. * @brief EventBus is a class that transmits events from modules to a pipeline.
  87. */
  88. class EventBus : private NonCopyable {
  89. public:
  90. friend class Pipeline;
  91. /**
  92. * @brief Destructor. A destructor to destruct event bus.
  93. *
  94. * @return No return value.
  95. */
  96. ~EventBus();
  97. /**
  98. * @brief Starts an event bus thread.
  99. *
  100. * @return Returns true if start successfully, otherwise false.
  101. */
  102. bool Start();
  103. /**
  104. * @brief Stops an event bus thread.
  105. *
  106. * @return No return values.
  107. */
  108. void Stop();
  109. /**
  110. * @brief Adds a watcher to the event bus.
  111. *
  112. * @param[in] func The bus watcher to be added.
  113. *
  114. * @return The number of bus watchers that has been added to this event bus.
  115. */
  116. uint32_t AddBusWatch(BusWatcher func);
  117. /**
  118. * @brief Posts an event to a bus.
  119. *
  120. * @param[in] event The event to be posted.
  121. *
  122. * @return Returns true if this function run successfully. Otherwise, returns false.
  123. */
  124. bool PostEvent(Event event);
  125. #ifndef UNIT_TEST
  126. private: // NOLINT
  127. #else
  128. Event PollEventToTest();
  129. #endif
  130. EventBus() = default;
  131. /**
  132. * @brief Polls an event from a bus.
  133. *
  134. * @return Returns the event.
  135. *
  136. * @note This function is blocked until an event availabe or the bus stopped.
  137. */
  138. Event PollEvent();
  139. /**
  140. * @brief Gets all bus watchers from the event bus.
  141. *
  142. * @return A list with pairs of bus watcher and module.
  143. */
  144. const std::list<BusWatcher> &GetBusWatchers() const;
  145. /**
  146. * @brief Removes all bus watchers.
  147. *
  148. * @return No return value.
  149. */
  150. void ClearAllWatchers();
  151. /**
  152. * @brief Checks if the event bus is running.
  153. *
  154. * @return Returns true if the event bus is running. Otherwise, returns false.
  155. */
  156. bool IsRunning();
  157. void EventLoop();
  158. private:
  159. mutable std::mutex watcher_mtx_;
  160. ThreadSafeQueue<Event> queue_;
  161. #ifdef UNIT_TEST
  162. ThreadSafeQueue<Event> test_eventq_;
  163. bool unit_test = true;
  164. #endif
  165. std::list<BusWatcher> bus_watchers_;
  166. std::thread event_thread_;
  167. std::atomic<bool> running_{false};
  168. }; // class EventBus
  169. } // namespace cnstream
  170. #endif // CNSTREAM_EVENT_BUS_HPP_