RingBuffer.h 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. /*
  2. * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef UTIL_RINGBUFFER_H_
  11. #define UTIL_RINGBUFFER_H_
  12. #include <atomic>
  13. #include <memory>
  14. #include <mutex>
  15. #include <unordered_map>
  16. #include <condition_variable>
  17. #include <functional>
  18. #include "List.h"
  19. #include "Poller/EventPoller.h"
  20. using namespace std;
  21. //GOP缓存最大长度下限值
  22. #define RING_MIN_SIZE 32
  23. #define LOCK_GUARD(mtx) lock_guard<decltype(mtx)> lck(mtx)
  24. namespace toolkit {
  25. template<typename T>
  26. class RingDelegate {
  27. public:
  28. typedef std::shared_ptr<RingDelegate> Ptr;
  29. RingDelegate() {}
  30. virtual ~RingDelegate() {}
  31. virtual void onWrite(T in, bool is_key = true) = 0;
  32. };
  33. template<typename T>
  34. class _RingStorage;
  35. template<typename T>
  36. class _RingReaderDispatcher;
  37. /**
  38. * 环形缓存读取器
  39. * 该对象的事件触发都会在绑定的poller线程中执行
  40. * 所以把锁去掉了
  41. * 对该对象的一切操作都应该在poller线程中执行
  42. * @tparam T
  43. */
  44. template<typename T>
  45. class _RingReader {
  46. public:
  47. typedef std::shared_ptr<_RingReader> Ptr;
  48. friend class _RingReaderDispatcher<T>;
  49. _RingReader(const std::shared_ptr<_RingStorage<T> > &storage, bool use_cache) {
  50. _storage = storage;
  51. _use_cache = use_cache;
  52. }
  53. ~_RingReader() {}
  54. void setReadCB(const function<void(const T &)> &cb) {
  55. if (!cb) {
  56. _read_cb = [](const T &) {};
  57. } else {
  58. _read_cb = cb;
  59. flushGop();
  60. }
  61. }
  62. void setDetachCB(const function<void()> &cb) {
  63. if (!cb) {
  64. _detach_cb = []() {};
  65. } else {
  66. _detach_cb = cb;
  67. }
  68. }
  69. private:
  70. void onRead(const T &data, bool is_key) {
  71. _read_cb(data);
  72. }
  73. void onDetach() const {
  74. _detach_cb();
  75. }
  76. void flushGop() {
  77. if (!_use_cache) {
  78. return;
  79. }
  80. _storage->getCache().for_each([&](const pair<bool, T> &pr) {
  81. onRead(pr.second, pr.first);
  82. });
  83. }
  84. private:
  85. bool _use_cache;
  86. shared_ptr<_RingStorage<T> > _storage;
  87. function<void(void)> _detach_cb = []() {};
  88. function<void(const T &)> _read_cb = [](const T &) {};
  89. };
  90. template<typename T>
  91. class _RingStorage {
  92. public:
  93. typedef std::shared_ptr<_RingStorage> Ptr;
  94. _RingStorage(int max_size) {
  95. //gop缓存个数不能小于32
  96. if(max_size < RING_MIN_SIZE){
  97. max_size = RING_MIN_SIZE;
  98. }
  99. _max_size = max_size;
  100. }
  101. ~_RingStorage() {}
  102. /**
  103. * 写入环形缓存数据
  104. * @param in 数据
  105. * @param is_key 是否为关键帧
  106. * @return 是否触发重置环形缓存大小
  107. */
  108. void write(T in, bool is_key = true) {
  109. if (is_key) {
  110. //遇到I帧,那么移除老数据
  111. _size = 0;
  112. _have_idr = true;
  113. _data_cache.clear();
  114. }
  115. if (!_have_idr) {
  116. //缓存中没有关键帧,那么gop缓存无效
  117. return;
  118. }
  119. _data_cache.emplace_back(std::make_pair(is_key, std::move(in)));
  120. if (++_size > _max_size) {
  121. //GOP缓存溢出,清空关老数据
  122. _size = 0;
  123. _have_idr = false;
  124. _data_cache.clear();
  125. }
  126. }
  127. Ptr clone() const {
  128. Ptr ret(new _RingStorage());
  129. ret->_size = _size;
  130. ret->_have_idr = _have_idr;
  131. ret->_max_size = _max_size;
  132. ret->_data_cache = _data_cache;
  133. return ret;
  134. }
  135. const List<pair<bool, T> > &getCache() const {
  136. return _data_cache;
  137. }
  138. void clearCache(){
  139. _size = 0;
  140. _data_cache.clear();
  141. }
  142. private:
  143. _RingStorage() = default;
  144. private:
  145. bool _have_idr = false;
  146. int _size = 0;
  147. int _max_size;
  148. List<pair<bool, T> > _data_cache;
  149. };
  150. template<typename T>
  151. class RingBuffer;
  152. /**
  153. * 环形缓存事件派发器,只能一个poller线程操作它
  154. * @tparam T
  155. */
  156. template<typename T>
  157. class _RingReaderDispatcher : public enable_shared_from_this<_RingReaderDispatcher<T> > {
  158. public:
  159. typedef std::shared_ptr<_RingReaderDispatcher> Ptr;
  160. typedef _RingReader<T> RingReader;
  161. typedef _RingStorage<T> RingStorage;
  162. friend class RingBuffer<T>;
  163. ~_RingReaderDispatcher() {
  164. decltype(_reader_map) reader_map;
  165. reader_map.swap(_reader_map);
  166. for (auto &pr : reader_map) {
  167. auto reader = pr.second.lock();
  168. if (reader) {
  169. reader->onDetach();
  170. }
  171. }
  172. }
  173. private:
  174. _RingReaderDispatcher(const typename RingStorage::Ptr &storage, const function<void(int, bool)> &onSizeChanged) {
  175. _storage = storage;
  176. _reader_size = 0;
  177. _on_size_changed = onSizeChanged;
  178. }
  179. void write(T in, bool is_key = true) {
  180. for (auto it = _reader_map.begin(); it != _reader_map.end();) {
  181. auto reader = it->second.lock();
  182. if (!reader) {
  183. it = _reader_map.erase(it);
  184. --_reader_size;
  185. onSizeChanged(false);
  186. continue;
  187. }
  188. reader->onRead(in, is_key);
  189. ++it;
  190. }
  191. _storage->write(std::move(in), is_key);
  192. }
  193. std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache) {
  194. if (!poller->isCurrentThread()) {
  195. throw std::runtime_error("必须在绑定的poller线程中执行attach操作");
  196. }
  197. weak_ptr<_RingReaderDispatcher> weakSelf = this->shared_from_this();
  198. auto on_dealloc = [weakSelf, poller](RingReader *ptr) {
  199. poller->async([weakSelf, ptr]() {
  200. auto strongSelf = weakSelf.lock();
  201. if (strongSelf && strongSelf->_reader_map.erase(ptr)) {
  202. --strongSelf->_reader_size;
  203. strongSelf->onSizeChanged(false);
  204. }
  205. delete ptr;
  206. });
  207. };
  208. std::shared_ptr<RingReader> reader(new RingReader(_storage, use_cache), on_dealloc);
  209. _reader_map[reader.get()] = std::move(reader);
  210. ++_reader_size;
  211. onSizeChanged(true);
  212. return reader;
  213. }
  214. void onSizeChanged(bool add_flag) {
  215. _on_size_changed(_reader_size, add_flag);
  216. }
  217. void clearCache(){
  218. if(_reader_size == 0){
  219. _storage->clearCache();
  220. }
  221. }
  222. private:
  223. atomic_int _reader_size;
  224. function<void(int, bool)> _on_size_changed;
  225. typename RingStorage::Ptr _storage;
  226. unordered_map<void *, std::weak_ptr<RingReader> > _reader_map;
  227. };
  228. template<typename T>
  229. class RingBuffer : public enable_shared_from_this<RingBuffer<T> > {
  230. public:
  231. typedef std::shared_ptr<RingBuffer> Ptr;
  232. typedef _RingReader<T> RingReader;
  233. typedef _RingStorage<T> RingStorage;
  234. typedef _RingReaderDispatcher<T> RingReaderDispatcher;
  235. typedef function<void(int size)> onReaderChanged;
  236. RingBuffer(int max_size = 1024, const onReaderChanged &cb = nullptr) {
  237. _on_reader_changed = cb;
  238. _storage = std::make_shared<RingStorage>(max_size);
  239. }
  240. ~RingBuffer() {}
  241. void write(T in, bool is_key = true) {
  242. if (_delegate) {
  243. _delegate->onWrite(std::move(in), is_key);
  244. return;
  245. }
  246. LOCK_GUARD(_mtx_map);
  247. for (auto &pr : _dispatcher_map) {
  248. auto &second = pr.second;
  249. //切换线程后触发onRead事件
  250. pr.first->async([second, in, is_key]() {
  251. second->write(std::move(const_cast<T &>(in)), is_key);
  252. }, false);
  253. }
  254. _storage->write(std::move(in), is_key);
  255. }
  256. void setDelegate(const typename RingDelegate<T>::Ptr &delegate) {
  257. _delegate = delegate;
  258. }
  259. std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache = true) {
  260. typename RingReaderDispatcher::Ptr dispatcher;
  261. {
  262. LOCK_GUARD(_mtx_map);
  263. auto &ref = _dispatcher_map[poller];
  264. if (!ref) {
  265. weak_ptr<RingBuffer> weakSelf = this->shared_from_this();
  266. auto onSizeChanged = [weakSelf, poller](int size, bool add_flag) {
  267. auto strongSelf = weakSelf.lock();
  268. if (!strongSelf) {
  269. return;
  270. }
  271. strongSelf->onSizeChanged(poller, size, add_flag);
  272. };
  273. auto onDealloc = [poller](RingReaderDispatcher *ptr) {
  274. poller->async([ptr]() {
  275. delete ptr;
  276. });
  277. };
  278. ref.reset(new RingReaderDispatcher(_storage->clone(), std::move(onSizeChanged)), std::move(onDealloc));
  279. }
  280. dispatcher = ref;
  281. }
  282. return dispatcher->attach(poller, use_cache);
  283. }
  284. int readerCount() {
  285. return _total_count;
  286. }
  287. void clearCache(){
  288. LOCK_GUARD(_mtx_map);
  289. _storage->clearCache();
  290. for (auto &pr : _dispatcher_map) {
  291. auto &second = pr.second;
  292. //切换线程后清空缓存
  293. pr.first->async([second]() {
  294. second->clearCache();
  295. }, false);
  296. }
  297. }
  298. private:
  299. void onSizeChanged(const EventPoller::Ptr &poller, int size, bool add_flag) {
  300. if (size == 0) {
  301. LOCK_GUARD(_mtx_map);
  302. _dispatcher_map.erase(poller);
  303. }
  304. if (add_flag) {
  305. ++_total_count;
  306. } else {
  307. --_total_count;
  308. }
  309. if (_on_reader_changed) {
  310. _on_reader_changed(_total_count);
  311. }
  312. }
  313. private:
  314. struct HashOfPtr {
  315. std::size_t operator()(const EventPoller::Ptr &key) const {
  316. return (std::size_t) key.get();
  317. }
  318. };
  319. private:
  320. mutex _mtx_map;
  321. atomic_int _total_count {0};
  322. typename RingStorage::Ptr _storage;
  323. typename RingDelegate<T>::Ptr _delegate;
  324. onReaderChanged _on_reader_changed;
  325. unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;
  326. };
  327. } /* namespace toolkit */
  328. #endif /* UTIL_RINGBUFFER_H_ */