123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- #ifndef CNSTREAM_SOURCE_HPP_
- #define CNSTREAM_SOURCE_HPP_
- #include <atomic>
- #include <memory>
- #include <string>
- #include <unordered_map>
- #include <utility>
- #include <vector>
- #include "cnstream_common.hpp"
- #include "cnstream_module.hpp"
- namespace cnstream {
- class SourceHandler;
- class SourceModule : public Module {
- public:
-
- explicit SourceModule(const std::string &name) : Module(name) { hasTransmit_.store(1); }
-
- virtual ~SourceModule() { RemoveSources(); }
-
- int AddSource(std::shared_ptr<SourceHandler> handler);
-
- std::shared_ptr<SourceHandler> GetSourceHandler(const std::string &stream_id);
-
- int RemoveSource(std::shared_ptr<SourceHandler> handler, bool force = false);
-
- int RemoveSource(const std::string &stream_id, bool force = false);
-
- int RemoveSources(bool force = false);
- #ifdef UNIT_TEST
- public:
- #else
- protected:
- #endif
- friend class SourceHandler;
-
- uint32_t GetStreamIndex(const std::string &stream_id);
-
- void ReturnStreamIndex(const std::string &stream_id);
-
- bool SendData(std::shared_ptr<CNFrameInfo> data);
- private:
- int Process(std::shared_ptr<CNFrameInfo> data) override {
- (void)data;
- LOGE(CORE) << "As a source module, Process() should not be invoked\n";
- return 0;
- }
- std::mutex mutex_;
- std::unordered_map<std::string , std::shared_ptr<SourceHandler>> source_map_;
- };
- class SourceHandler : private NonCopyable {
- public:
-
- explicit SourceHandler(SourceModule *module, const std::string &stream_id) : module_(module), stream_id_(stream_id) {
- if (module_) {
- stream_index_ = module_->GetStreamIndex(stream_id_);
- }
- }
-
- virtual ~SourceHandler() {
- if (module_) {
- module_->ReturnStreamIndex(stream_id_);
- }
- }
-
- virtual bool Open() = 0;
-
- virtual void Close() = 0;
-
- std::string GetStreamId() const { return stream_id_; }
-
- std::shared_ptr<CNFrameInfo> CreateFrameInfo(bool eos = false, std::shared_ptr<CNFrameInfo> payload = nullptr) {
- std::shared_ptr<CNFrameInfo> data = CNFrameInfo::Create(stream_id_, eos, payload);
- if (data) {
- data->SetStreamIndex(stream_index_);
- }
- return data;
- }
-
- bool SendData(std::shared_ptr<CNFrameInfo> data) {
- if (this->module_) {
- return this->module_->SendData(data);
- }
- return false;
- }
- protected:
- SourceModule *module_ = nullptr;
- mutable std::string stream_id_;
- uint32_t stream_index_ = INVALID_STREAM_IDX;
- };
- }
- #endif
|