RingBuffer.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. /*
  2. * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef UTIL_RINGBUFFER_H_
  11. #define UTIL_RINGBUFFER_H_
  12. #include <assert.h>
  13. #include <atomic>
  14. #include <memory>
  15. #include <mutex>
  16. #include <unordered_map>
  17. #include <condition_variable>
  18. #include <functional>
  19. #include "List.h"
  20. #include "Poller/EventPoller.h"
  21. // GOP缓存最大长度下限值
  22. #define RING_MIN_SIZE 32
  23. #define LOCK_GUARD(mtx) std::lock_guard<decltype(mtx)> lck(mtx)
  24. namespace toolkit {
  25. using ReaderInfo = std::shared_ptr<void>;
  26. template <typename T>
  27. class RingDelegate {
  28. public:
  29. using Ptr = std::shared_ptr<RingDelegate>;
  30. RingDelegate() = default;
  31. virtual ~RingDelegate() = default;
  32. virtual void onWrite(T in, bool is_key = true) = 0;
  33. };
  34. template <typename T>
  35. class _RingStorage;
  36. template <typename T>
  37. class _RingReaderDispatcher;
  38. /**
  39. * 环形缓存读取器
  40. * 该对象的事件触发都会在绑定的poller线程中执行
  41. * 所以把锁去掉了
  42. * 对该对象的一切操作都应该在poller线程中执行
  43. */
  44. template <typename T>
  45. class _RingReader {
  46. public:
  47. using Ptr = std::shared_ptr<_RingReader>;
  48. friend class _RingReaderDispatcher<T>;
  49. _RingReader(std::shared_ptr<_RingStorage<T>> storage) { _storage = std::move(storage); }
  50. ~_RingReader() = default;
  51. void setReadCB(std::function<void(const T &)> cb) {
  52. if (!cb) {
  53. _read_cb = [](const T &) {};
  54. } else {
  55. _read_cb = std::move(cb);
  56. flushGop();
  57. }
  58. }
  59. void setDetachCB(std::function<void()> cb) {
  60. _detach_cb = cb ? std::move(cb) : []() {};
  61. }
  62. void setGetInfoCB(std::function<ReaderInfo()> cb) {
  63. _get_info = cb ? std::move(cb) : []() { return ReaderInfo(); };
  64. }
  65. private:
  66. void onRead(const T &data, bool /*is_key*/) { _read_cb(data); }
  67. void onDetach() const { _detach_cb(); }
  68. void flushGop() {
  69. if (!_storage) {
  70. return;
  71. }
  72. _storage->getCache().for_each([this](const List<std::pair<bool, T>> &lst) {
  73. lst.for_each([this](const std::pair<bool, T> &pr) { onRead(pr.second, pr.first); });
  74. });
  75. }
  76. ReaderInfo getInfo() { return _get_info(); }
  77. private:
  78. std::shared_ptr<_RingStorage<T>> _storage;
  79. std::function<void(void)> _detach_cb = []() {};
  80. std::function<void(const T &)> _read_cb = [](const T &) {};
  81. std::function<ReaderInfo()> _get_info = []() { return ReaderInfo(); };
  82. };
  83. template <typename T>
  84. class _RingStorage {
  85. public:
  86. using Ptr = std::shared_ptr<_RingStorage>;
  87. using GopType = List<List<std::pair<bool, T>>>;
  88. _RingStorage(size_t max_size, size_t max_gop_size) {
  89. // gop缓存个数不能小于32
  90. if (max_size < RING_MIN_SIZE) {
  91. max_size = RING_MIN_SIZE;
  92. }
  93. _max_size = max_size;
  94. _max_gop_size = max_gop_size;
  95. clearCache();
  96. }
  97. ~_RingStorage() = default;
  98. /**
  99. * 写入环形缓存数据
  100. * @param in 数据
  101. * @param is_key 是否为关键帧
  102. * @return 是否触发重置环形缓存大小
  103. */
  104. void write(T in, bool is_key = true) {
  105. if (is_key) {
  106. _have_idr = true;
  107. _started = true;
  108. if (!_data_cache.back().empty()) {
  109. //当前gop列队还没收到任意缓存
  110. _data_cache.emplace_back();
  111. }
  112. if (_data_cache.size() > _max_gop_size) {
  113. // GOP个数超过限制,那么移除最早的GOP
  114. popFrontGop();
  115. }
  116. }
  117. if (!_have_idr && _started) {
  118. //缓存中没有关键帧,那么gop缓存无效
  119. return;
  120. }
  121. _data_cache.back().emplace_back(std::make_pair(is_key, std::move(in)));
  122. if (++_size > _max_size) {
  123. // GOP缓存溢出
  124. while (_data_cache.size() > 1) {
  125. //先尝试清除老的GOP缓存
  126. popFrontGop();
  127. }
  128. if (_size > _max_size) {
  129. //还是大于最大缓冲限制,那么清空所有GOP
  130. clearCache();
  131. }
  132. }
  133. }
  134. Ptr clone() const {
  135. Ptr ret(new _RingStorage());
  136. ret->_size = _size;
  137. ret->_have_idr = _have_idr;
  138. ret->_started = _started;
  139. ret->_max_size = _max_size;
  140. ret->_max_gop_size = _max_gop_size;
  141. ret->_data_cache = _data_cache;
  142. return ret;
  143. }
  144. const GopType &getCache() const { return _data_cache; }
  145. void clearCache() {
  146. _size = 0;
  147. _have_idr = false;
  148. _data_cache.clear();
  149. _data_cache.emplace_back();
  150. }
  151. private:
  152. _RingStorage() = default;
  153. void popFrontGop() {
  154. if (!_data_cache.empty()) {
  155. _size -= _data_cache.front().size();
  156. _data_cache.pop_front();
  157. if (_data_cache.empty()) {
  158. _data_cache.emplace_back();
  159. }
  160. }
  161. }
  162. private:
  163. bool _started = false;
  164. bool _have_idr;
  165. size_t _size;
  166. size_t _max_size;
  167. size_t _max_gop_size;
  168. GopType _data_cache;
  169. };
  170. template <typename T>
  171. class RingBuffer;
  172. /**
  173. * 环形缓存事件派发器,只能一个poller线程操作它
  174. * @tparam T
  175. */
  176. template <typename T>
  177. class _RingReaderDispatcher : public std::enable_shared_from_this<_RingReaderDispatcher<T>> {
  178. public:
  179. using Ptr = std::shared_ptr<_RingReaderDispatcher>;
  180. using RingReader = _RingReader<T>;
  181. using RingStorage = _RingStorage<T>;
  182. using onChangeInfoCB = std::function<ReaderInfo(ReaderInfo &&info)>;
  183. friend class RingBuffer<T>;
  184. ~_RingReaderDispatcher() {
  185. decltype(_reader_map) reader_map;
  186. reader_map.swap(_reader_map);
  187. for (auto &pr : reader_map) {
  188. auto reader = pr.second.lock();
  189. if (reader) {
  190. reader->onDetach();
  191. }
  192. }
  193. }
  194. private:
  195. _RingReaderDispatcher(
  196. const typename RingStorage::Ptr &storage, std::function<void(int, bool)> onSizeChanged) {
  197. _reader_size = 0;
  198. _storage = storage;
  199. _on_size_changed = std::move(onSizeChanged);
  200. assert(_on_size_changed);
  201. }
  202. void write(T in, bool is_key = true) {
  203. for (auto it = _reader_map.begin(); it != _reader_map.end();) {
  204. auto reader = it->second.lock();
  205. if (!reader) {
  206. it = _reader_map.erase(it);
  207. --_reader_size;
  208. onSizeChanged(false);
  209. continue;
  210. }
  211. reader->onRead(in, is_key);
  212. ++it;
  213. }
  214. _storage->write(std::move(in), is_key);
  215. }
  216. std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache) {
  217. if (!poller->isCurrentThread()) {
  218. throw std::runtime_error("You can attach RingBuffer only in it's poller thread");
  219. }
  220. std::weak_ptr<_RingReaderDispatcher> weak_self = this->shared_from_this();
  221. auto on_dealloc = [weak_self, poller](RingReader *ptr) {
  222. poller->async([weak_self, ptr]() {
  223. auto strong_self = weak_self.lock();
  224. if (strong_self && strong_self->_reader_map.erase(ptr)) {
  225. --strong_self->_reader_size;
  226. strong_self->onSizeChanged(false);
  227. }
  228. delete ptr;
  229. });
  230. };
  231. std::shared_ptr<RingReader> reader(new RingReader(use_cache ? _storage : nullptr), on_dealloc);
  232. _reader_map[reader.get()] = reader;
  233. ++_reader_size;
  234. onSizeChanged(true);
  235. return reader;
  236. }
  237. void onSizeChanged(bool add_flag) { _on_size_changed(_reader_size, add_flag); }
  238. void clearCache() {
  239. if (_reader_size == 0) {
  240. _storage->clearCache();
  241. }
  242. }
  243. std::list<ReaderInfo> getInfoList(const onChangeInfoCB &on_change) {
  244. std::list<ReaderInfo> ret;
  245. for (auto &pr : _reader_map) {
  246. auto reader = pr.second.lock();
  247. if (!reader) {
  248. continue;
  249. }
  250. auto info = reader->getInfo();
  251. if (!info) {
  252. continue;
  253. }
  254. ret.emplace_back(on_change(std::move(info)));
  255. }
  256. return ret;
  257. }
  258. private:
  259. std::atomic_int _reader_size;
  260. std::function<void(int, bool)> _on_size_changed;
  261. typename RingStorage::Ptr _storage;
  262. std::unordered_map<void *, std::weak_ptr<RingReader>> _reader_map;
  263. };
  264. template <typename T>
  265. class RingBuffer : public std::enable_shared_from_this<RingBuffer<T>> {
  266. public:
  267. using Ptr = std::shared_ptr<RingBuffer>;
  268. using RingReader = _RingReader<T>;
  269. using RingStorage = _RingStorage<T>;
  270. using RingReaderDispatcher = _RingReaderDispatcher<T>;
  271. using onReaderChanged = std::function<void(int size)>;
  272. using onGetInfoCB = std::function<void(std::list<ReaderInfo> &info_list)>;
  273. RingBuffer(size_t max_size = 1024, onReaderChanged cb = nullptr, size_t max_gop_size = 1) {
  274. _storage = std::make_shared<RingStorage>(max_size, max_gop_size);
  275. _on_reader_changed = cb ? std::move(cb) : [](int size) {};
  276. //先触发无人观看
  277. _on_reader_changed(0);
  278. }
  279. ~RingBuffer() = default;
  280. void write(T in, bool is_key = true) {
  281. if (_delegate) {
  282. _delegate->onWrite(std::move(in), is_key);
  283. return;
  284. }
  285. LOCK_GUARD(_mtx_map);
  286. for (auto &pr : _dispatcher_map) {
  287. auto &second = pr.second;
  288. //切换线程后触发onRead事件
  289. pr.first->async([second, in, is_key]() { second->write(std::move(const_cast<T &>(in)), is_key); }, false);
  290. }
  291. _storage->write(std::move(in), is_key);
  292. }
  293. void setDelegate(const typename RingDelegate<T>::Ptr &delegate) { _delegate = delegate; }
  294. std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache = true) {
  295. typename RingReaderDispatcher::Ptr dispatcher;
  296. {
  297. LOCK_GUARD(_mtx_map);
  298. auto &ref = _dispatcher_map[poller];
  299. if (!ref) {
  300. std::weak_ptr<RingBuffer> weak_self = this->shared_from_this();
  301. auto onSizeChanged = [weak_self, poller](int size, bool add_flag) {
  302. if (auto strong_self = weak_self.lock()) {
  303. strong_self->onSizeChanged(poller, size, add_flag);
  304. }
  305. };
  306. auto onDealloc = [poller](RingReaderDispatcher *ptr) { poller->async([ptr]() { delete ptr; }); };
  307. ref.reset(new RingReaderDispatcher(_storage->clone(), std::move(onSizeChanged)), std::move(onDealloc));
  308. }
  309. dispatcher = ref;
  310. }
  311. return dispatcher->attach(poller, use_cache);
  312. }
  313. int readerCount() { return _total_count; }
  314. void clearCache() {
  315. LOCK_GUARD(_mtx_map);
  316. _storage->clearCache();
  317. for (auto &pr : _dispatcher_map) {
  318. auto &second = pr.second;
  319. //切换线程后清空缓存
  320. pr.first->async([second]() { second->clearCache(); }, false);
  321. }
  322. }
  323. void getInfoList(const onGetInfoCB &cb, const typename RingReaderDispatcher::onChangeInfoCB &on_change = nullptr) {
  324. if (!cb) {
  325. return;
  326. }
  327. if (!on_change) {
  328. const_cast<typename RingReaderDispatcher::onChangeInfoCB &>(on_change) = [](ReaderInfo &&info) { return std::move(info); };
  329. }
  330. LOCK_GUARD(_mtx_map);
  331. auto info_vec = std::make_shared<std::vector<std::list<ReaderInfo>>>();
  332. // 1、最少确保一个元素
  333. info_vec->resize(_dispatcher_map.empty() ? 1 : _dispatcher_map.size());
  334. std::shared_ptr<void> on_finished(nullptr, [cb, info_vec](void *) mutable {
  335. // 2、防止这里为空
  336. auto &lst = *info_vec->begin();
  337. for (auto &item : *info_vec) {
  338. if (&lst != &item) {
  339. lst.insert(lst.end(), item.begin(), item.end());
  340. }
  341. }
  342. cb(lst);
  343. });
  344. auto i = 0U;
  345. for (auto &pr : _dispatcher_map) {
  346. auto &second = pr.second;
  347. pr.first->async([second, info_vec, on_finished, i, on_change]() { (*info_vec)[i] = second->getInfoList(on_change); });
  348. ++i;
  349. }
  350. }
  351. private:
  352. void onSizeChanged(const EventPoller::Ptr &poller, int size, bool add_flag) {
  353. if (size == 0) {
  354. LOCK_GUARD(_mtx_map);
  355. _dispatcher_map.erase(poller);
  356. }
  357. if (add_flag) {
  358. ++_total_count;
  359. } else {
  360. --_total_count;
  361. }
  362. _on_reader_changed(_total_count);
  363. }
  364. private:
  365. struct HashOfPtr {
  366. std::size_t operator()(const EventPoller::Ptr &key) const { return (std::size_t)key.get(); }
  367. };
  368. private:
  369. std::mutex _mtx_map;
  370. std::atomic_int _total_count { 0 };
  371. typename RingStorage::Ptr _storage;
  372. typename RingDelegate<T>::Ptr _delegate;
  373. onReaderChanged _on_reader_changed;
  374. std::unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;
  375. };
  376. } /* namespace toolkit */
  377. #endif /* UTIL_RINGBUFFER_H_ */