TaskExecutor.h 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. /*
  2. * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef ZLTOOLKIT_TASKEXECUTOR_H
  11. #define ZLTOOLKIT_TASKEXECUTOR_H
  12. #include <mutex>
  13. #include <memory>
  14. #include <functional>
  15. #include "Util/List.h"
  16. #include "Util/util.h"
  17. namespace toolkit {
  18. /**
  19. * cpu负载计算器
  20. */
  21. class ThreadLoadCounter {
  22. public:
  23. /**
  24. * 构造函数
  25. * @param max_size 统计样本数量
  26. * @param max_usec 统计时间窗口,亦即最近{max_usec}的cpu负载率
  27. */
  28. ThreadLoadCounter(uint64_t max_size, uint64_t max_usec);
  29. ~ThreadLoadCounter() = default;
  30. /**
  31. * 线程进入休眠
  32. */
  33. void startSleep();
  34. /**
  35. * 休眠唤醒,结束休眠
  36. */
  37. void sleepWakeUp();
  38. /**
  39. * 返回当前线程cpu使用率,范围为 0 ~ 100
  40. * @return 当前线程cpu使用率
  41. */
  42. int load();
  43. private:
  44. struct TimeRecord {
  45. TimeRecord(uint64_t tm, bool slp) {
  46. _time = tm;
  47. _sleep = slp;
  48. }
  49. bool _sleep;
  50. uint64_t _time;
  51. };
  52. private:
  53. bool _sleeping = true;
  54. uint64_t _last_sleep_time;
  55. uint64_t _last_wake_time;
  56. uint64_t _max_size;
  57. uint64_t _max_usec;
  58. std::mutex _mtx;
  59. List<TimeRecord> _time_list;
  60. };
  61. class TaskCancelable : public noncopyable {
  62. public:
  63. TaskCancelable() = default;
  64. virtual ~TaskCancelable() = default;
  65. virtual void cancel() = 0;
  66. };
  67. template<class R, class... ArgTypes>
  68. class TaskCancelableImp;
  69. template<class R, class... ArgTypes>
  70. class TaskCancelableImp<R(ArgTypes...)> : public TaskCancelable {
  71. public:
  72. using Ptr = std::shared_ptr<TaskCancelableImp>;
  73. using func_type = std::function<R(ArgTypes...)>;
  74. ~TaskCancelableImp() = default;
  75. template<typename FUNC>
  76. TaskCancelableImp(FUNC &&task) {
  77. _strongTask = std::make_shared<func_type>(std::forward<FUNC>(task));
  78. _weakTask = _strongTask;
  79. }
  80. void cancel() override {
  81. _strongTask = nullptr;
  82. }
  83. operator bool() {
  84. return _strongTask && *_strongTask;
  85. }
  86. void operator=(std::nullptr_t) {
  87. _strongTask = nullptr;
  88. }
  89. R operator()(ArgTypes ...args) const {
  90. auto strongTask = _weakTask.lock();
  91. if (strongTask && *strongTask) {
  92. return (*strongTask)(std::forward<ArgTypes>(args)...);
  93. }
  94. return defaultValue<R>();
  95. }
  96. template<typename T>
  97. static typename std::enable_if<std::is_void<T>::value, void>::type
  98. defaultValue() {}
  99. template<typename T>
  100. static typename std::enable_if<std::is_pointer<T>::value, T>::type
  101. defaultValue() {
  102. return nullptr;
  103. }
  104. template<typename T>
  105. static typename std::enable_if<std::is_integral<T>::value, T>::type
  106. defaultValue() {
  107. return 0;
  108. }
  109. protected:
  110. std::weak_ptr<func_type> _weakTask;
  111. std::shared_ptr<func_type> _strongTask;
  112. };
  113. using TaskIn = std::function<void()>;
  114. using Task = TaskCancelableImp<void()>;
  115. class TaskExecutorInterface {
  116. public:
  117. TaskExecutorInterface() = default;
  118. virtual ~TaskExecutorInterface() = default;
  119. /**
  120. * 异步执行任务
  121. * @param task 任务
  122. * @param may_sync 是否允许同步执行该任务
  123. * @return 任务是否添加成功
  124. */
  125. virtual Task::Ptr async(TaskIn task, bool may_sync = true) = 0;
  126. /**
  127. * 最高优先级方式异步执行任务
  128. * @param task 任务
  129. * @param may_sync 是否允许同步执行该任务
  130. * @return 任务是否添加成功
  131. */
  132. virtual Task::Ptr async_first(TaskIn task, bool may_sync = true);
  133. /**
  134. * 同步执行任务
  135. * @param task
  136. * @return
  137. */
  138. void sync(const TaskIn &task);
  139. /**
  140. * 最高优先级方式同步执行任务
  141. * @param task
  142. * @return
  143. */
  144. void sync_first(const TaskIn &task);
  145. };
  146. /**
  147. * 任务执行器
  148. */
  149. class TaskExecutor : public ThreadLoadCounter, public TaskExecutorInterface {
  150. public:
  151. using Ptr = std::shared_ptr<TaskExecutor>;
  152. /**
  153. * 构造函数
  154. * @param max_size cpu负载统计样本数
  155. * @param max_usec cpu负载统计时间窗口大小
  156. */
  157. TaskExecutor(uint64_t max_size = 32, uint64_t max_usec = 2 * 1000 * 1000);
  158. ~TaskExecutor() = default;
  159. };
  160. class TaskExecutorGetter {
  161. public:
  162. using Ptr = std::shared_ptr<TaskExecutorGetter>;
  163. virtual ~TaskExecutorGetter() = default;
  164. /**
  165. * 获取任务执行器
  166. * @return 任务执行器
  167. */
  168. virtual TaskExecutor::Ptr getExecutor() = 0;
  169. /**
  170. * 获取执行器个数
  171. */
  172. virtual size_t getExecutorSize() const = 0;
  173. };
  174. class TaskExecutorGetterImp : public TaskExecutorGetter {
  175. public:
  176. TaskExecutorGetterImp() = default;
  177. ~TaskExecutorGetterImp() = default;
  178. /**
  179. * 根据线程负载情况,获取最空闲的任务执行器
  180. * @return 任务执行器
  181. */
  182. TaskExecutor::Ptr getExecutor() override;
  183. /**
  184. * 获取所有线程的负载率
  185. * @return 所有线程的负载率
  186. */
  187. std::vector<int> getExecutorLoad();
  188. /**
  189. * 获取所有线程任务执行延时,单位毫秒
  190. * 通过此函数也可以大概知道线程负载情况
  191. * @return
  192. */
  193. void getExecutorDelay(const std::function<void(const std::vector<int> &)> &callback);
  194. /**
  195. * 遍历所有线程
  196. */
  197. void for_each(const std::function<void(const TaskExecutor::Ptr &)> &cb);
  198. /**
  199. * 获取线程数
  200. */
  201. size_t getExecutorSize() const override;
  202. protected:
  203. size_t addPoller(const std::string &name, size_t size, int priority, bool register_thread, bool enable_cpu_affinity = true);
  204. protected:
  205. size_t _thread_pos = 0;
  206. std::vector<TaskExecutor::Ptr> _threads;
  207. };
  208. }//toolkit
  209. #endif //ZLTOOLKIT_TASKEXECUTOR_H