123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- #ifndef CNSTREAM_EVENT_BUS_HPP_
- #define CNSTREAM_EVENT_BUS_HPP_
- #include <atomic>
- #include <functional>
- #include <list>
- #include <mutex>
- #include <string>
- #include <thread>
- #include <utility>
- #include "cnstream_common.hpp"
- #include "util/cnstream_queue.hpp"
- namespace cnstream {
- class Pipeline;
- enum class EventType {
- EVENT_INVALID,
- EVENT_ERROR,
- EVENT_WARNING,
- EVENT_EOS,
- EVENT_STOP,
- EVENT_STREAM_ERROR,
- EVENT_TYPE_END
- };
- enum class EventHandleFlag {
- EVENT_HANDLE_NULL,
- EVENT_HANDLE_INTERCEPTION,
- EVENT_HANDLE_SYNCED,
- EVENT_HANDLE_STOP
- };
- struct Event {
- EventType type;
- std::string stream_id;
- std::string message;
- std::string module_name;
- std::thread::id thread_id;
- };
- using BusWatcher = std::function<EventHandleFlag(const Event &)>;
- class EventBus : private NonCopyable {
- public:
- friend class Pipeline;
-
- ~EventBus();
-
- bool Start();
-
- void Stop();
-
- uint32_t AddBusWatch(BusWatcher func);
-
- bool PostEvent(Event event);
- #ifndef UNIT_TEST
- private:
- #else
- Event PollEventToTest();
- #endif
- EventBus() = default;
-
- Event PollEvent();
-
- const std::list<BusWatcher> &GetBusWatchers() const;
-
- void ClearAllWatchers();
-
- bool IsRunning();
- void EventLoop();
- private:
- mutable std::mutex watcher_mtx_;
- ThreadSafeQueue<Event> queue_;
- #ifdef UNIT_TEST
- ThreadSafeQueue<Event> test_eventq_;
- bool unit_test = true;
- #endif
- std::list<BusWatcher> bus_watchers_;
- std::thread event_thread_;
- std::atomic<bool> running_{false};
- };
- }
- #endif
|