TaskExecutor.h 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. /*
  2. * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef ZLTOOLKIT_TASKEXECUTOR_H
  11. #define ZLTOOLKIT_TASKEXECUTOR_H
  12. #include <memory>
  13. #include <functional>
  14. #include "Util/List.h"
  15. #include "Util/util.h"
  16. #include "Util/onceToken.h"
  17. #include "Util/TimeTicker.h"
  18. using namespace std;
  19. namespace toolkit {
  20. /**
  21. * cpu负载计算器
  22. */
  23. class ThreadLoadCounter {
  24. public:
  25. /**
  26. * 构造函数
  27. * @param max_size 统计样本数量
  28. * @param max_usec 统计时间窗口,亦即最近{max_usec}的cpu负载率
  29. */
  30. ThreadLoadCounter(uint64_t max_size, uint64_t max_usec);
  31. ~ThreadLoadCounter() = default;
  32. /**
  33. * 线程进入休眠
  34. */
  35. void startSleep();
  36. /**
  37. * 休眠唤醒,结束休眠
  38. */
  39. void sleepWakeUp();
  40. /**
  41. * 返回当前线程cpu使用率,范围为 0 ~ 100
  42. * @return 当前线程cpu使用率
  43. */
  44. int load();
  45. private:
  46. struct TimeRecord {
  47. TimeRecord(uint64_t tm, bool slp) {
  48. _time = tm;
  49. _sleep = slp;
  50. }
  51. uint64_t _time;
  52. bool _sleep;
  53. };
  54. private:
  55. bool _sleeping = true;
  56. uint64_t _last_sleep_time;
  57. uint64_t _last_wake_time;
  58. uint64_t _max_size;
  59. uint64_t _max_usec;
  60. mutex _mtx;
  61. List<TimeRecord> _time_list;
  62. };
  63. class TaskCancelable : public noncopyable {
  64. public:
  65. TaskCancelable() = default;
  66. virtual ~TaskCancelable() = default;
  67. virtual void cancel() = 0;
  68. };
  69. template<class R, class... ArgTypes>
  70. class TaskCancelableImp;
  71. template<class R, class... ArgTypes>
  72. class TaskCancelableImp<R(ArgTypes...)> : public TaskCancelable {
  73. public:
  74. using Ptr = std::shared_ptr<TaskCancelableImp>;
  75. using func_type = function<R(ArgTypes...)>;
  76. ~TaskCancelableImp() = default;
  77. template<typename FUNC>
  78. TaskCancelableImp(FUNC &&task) {
  79. _strongTask = std::make_shared<func_type>(std::forward<FUNC>(task));
  80. _weakTask = _strongTask;
  81. }
  82. void cancel() override {
  83. _strongTask = nullptr;
  84. }
  85. operator bool() {
  86. return _strongTask && *_strongTask;
  87. }
  88. void operator=(nullptr_t) {
  89. _strongTask = nullptr;
  90. }
  91. R operator()(ArgTypes ...args) const {
  92. auto strongTask = _weakTask.lock();
  93. if (strongTask && *strongTask) {
  94. return (*strongTask)(forward<ArgTypes>(args)...);
  95. }
  96. return defaultValue<R>();
  97. }
  98. template<typename T>
  99. static typename std::enable_if<std::is_void<T>::value, void>::type
  100. defaultValue() {}
  101. template<typename T>
  102. static typename std::enable_if<std::is_pointer<T>::value, T>::type
  103. defaultValue() {
  104. return nullptr;
  105. }
  106. template<typename T>
  107. static typename std::enable_if<std::is_integral<T>::value, T>::type
  108. defaultValue() {
  109. return 0;
  110. }
  111. protected:
  112. std::weak_ptr<func_type> _weakTask;
  113. std::shared_ptr<func_type> _strongTask;
  114. };
  115. using TaskIn = function<void()>;
  116. using Task = TaskCancelableImp<void()>;
  117. class TaskExecutorInterface {
  118. public:
  119. TaskExecutorInterface() = default;
  120. virtual ~TaskExecutorInterface() = default;
  121. /**
  122. * 异步执行任务
  123. * @param task 任务
  124. * @param may_sync 是否允许同步执行该任务
  125. * @return 任务是否添加成功
  126. */
  127. virtual Task::Ptr async(TaskIn task, bool may_sync = true) = 0;
  128. /**
  129. * 最高优先级方式异步执行任务
  130. * @param task 任务
  131. * @param may_sync 是否允许同步执行该任务
  132. * @return 任务是否添加成功
  133. */
  134. virtual Task::Ptr async_first(TaskIn task, bool may_sync = true);
  135. /**
  136. * 同步执行任务
  137. * @param task
  138. * @return
  139. */
  140. void sync(const TaskIn &task);
  141. /**
  142. * 最高优先级方式同步执行任务
  143. * @param task
  144. * @return
  145. */
  146. void sync_first(const TaskIn &task);
  147. };
  148. /**
  149. * 任务执行器
  150. */
  151. class TaskExecutor : public ThreadLoadCounter, public TaskExecutorInterface {
  152. public:
  153. using Ptr = shared_ptr<TaskExecutor>;
  154. /**
  155. * 构造函数
  156. * @param max_size cpu负载统计样本数
  157. * @param max_usec cpu负载统计时间窗口大小
  158. */
  159. TaskExecutor(uint64_t max_size = 32, uint64_t max_usec = 2 * 1000 * 1000);
  160. ~TaskExecutor() = default;
  161. };
  162. class TaskExecutorGetter {
  163. public:
  164. using Ptr = shared_ptr<TaskExecutorGetter>;
  165. virtual ~TaskExecutorGetter() = default;
  166. /**
  167. * 获取任务执行器
  168. * @return 任务执行器
  169. */
  170. virtual TaskExecutor::Ptr getExecutor() = 0;
  171. };
  172. class TaskExecutorGetterImp : public TaskExecutorGetter {
  173. public:
  174. TaskExecutorGetterImp() = default;
  175. ~TaskExecutorGetterImp() = default;
  176. /**
  177. * 根据线程负载情况,获取最空闲的任务执行器
  178. * @return 任务执行器
  179. */
  180. TaskExecutor::Ptr getExecutor() override;
  181. /**
  182. * 获取所有线程的负载率
  183. * @return 所有线程的负载率
  184. */
  185. vector<int> getExecutorLoad();
  186. /**
  187. * 获取所有线程任务执行延时,单位毫秒
  188. * 通过此函数也可以大概知道线程负载情况
  189. * @return
  190. */
  191. void getExecutorDelay(const function<void(const vector<int> &)> &callback);
  192. /**
  193. * 遍历所有线程
  194. */
  195. void for_each(const function<void(const TaskExecutor::Ptr &)> &cb);
  196. protected:
  197. size_t addPoller(const string &name, size_t size, int priority, bool register_thread);
  198. protected:
  199. size_t _thread_pos = 0;
  200. vector<TaskExecutor::Ptr> _threads;
  201. };
  202. }//toolkit
  203. #endif //ZLTOOLKIT_TASKEXECUTOR_H