123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- /*
- * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
- *
- * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
- *
- * Use of this source code is governed by MIT license that can be found in the
- * LICENSE file in the root of the source tree. All contributing project authors
- * may be found in the AUTHORS file in the root of the source tree.
- */
- #ifndef ZLTOOLKIT_TASKEXECUTOR_H
- #define ZLTOOLKIT_TASKEXECUTOR_H
- #include <memory>
- #include <functional>
- #include "Util/List.h"
- #include "Util/util.h"
- #include "Util/onceToken.h"
- #include "Util/TimeTicker.h"
- using namespace std;
- namespace toolkit {
- /**
- * cpu负载计算器
- */
- class ThreadLoadCounter {
- public:
- /**
- * 构造函数
- * @param max_size 统计样本数量
- * @param max_usec 统计时间窗口,亦即最近{max_usec}的cpu负载率
- */
- ThreadLoadCounter(uint64_t max_size, uint64_t max_usec);
- ~ThreadLoadCounter() = default;
- /**
- * 线程进入休眠
- */
- void startSleep();
- /**
- * 休眠唤醒,结束休眠
- */
- void sleepWakeUp();
- /**
- * 返回当前线程cpu使用率,范围为 0 ~ 100
- * @return 当前线程cpu使用率
- */
- int load();
- private:
- struct TimeRecord {
- TimeRecord(uint64_t tm, bool slp) {
- _time = tm;
- _sleep = slp;
- }
- uint64_t _time;
- bool _sleep;
- };
- private:
- bool _sleeping = true;
- uint64_t _last_sleep_time;
- uint64_t _last_wake_time;
- uint64_t _max_size;
- uint64_t _max_usec;
- mutex _mtx;
- List<TimeRecord> _time_list;
- };
- class TaskCancelable : public noncopyable {
- public:
- TaskCancelable() = default;
- virtual ~TaskCancelable() = default;
- virtual void cancel() = 0;
- };
- template<class R, class... ArgTypes>
- class TaskCancelableImp;
- template<class R, class... ArgTypes>
- class TaskCancelableImp<R(ArgTypes...)> : public TaskCancelable {
- public:
- using Ptr = std::shared_ptr<TaskCancelableImp>;
- using func_type = function<R(ArgTypes...)>;
- ~TaskCancelableImp() = default;
- template<typename FUNC>
- TaskCancelableImp(FUNC &&task) {
- _strongTask = std::make_shared<func_type>(std::forward<FUNC>(task));
- _weakTask = _strongTask;
- }
- void cancel() override {
- _strongTask = nullptr;
- }
- operator bool() {
- return _strongTask && *_strongTask;
- }
- void operator=(nullptr_t) {
- _strongTask = nullptr;
- }
- R operator()(ArgTypes ...args) const {
- auto strongTask = _weakTask.lock();
- if (strongTask && *strongTask) {
- return (*strongTask)(forward<ArgTypes>(args)...);
- }
- return defaultValue<R>();
- }
- template<typename T>
- static typename std::enable_if<std::is_void<T>::value, void>::type
- defaultValue() {}
- template<typename T>
- static typename std::enable_if<std::is_pointer<T>::value, T>::type
- defaultValue() {
- return nullptr;
- }
- template<typename T>
- static typename std::enable_if<std::is_integral<T>::value, T>::type
- defaultValue() {
- return 0;
- }
- protected:
- std::weak_ptr<func_type> _weakTask;
- std::shared_ptr<func_type> _strongTask;
- };
- using TaskIn = function<void()>;
- using Task = TaskCancelableImp<void()>;
- class TaskExecutorInterface {
- public:
- TaskExecutorInterface() = default;
- virtual ~TaskExecutorInterface() = default;
- /**
- * 异步执行任务
- * @param task 任务
- * @param may_sync 是否允许同步执行该任务
- * @return 任务是否添加成功
- */
- virtual Task::Ptr async(TaskIn task, bool may_sync = true) = 0;
- /**
- * 最高优先级方式异步执行任务
- * @param task 任务
- * @param may_sync 是否允许同步执行该任务
- * @return 任务是否添加成功
- */
- virtual Task::Ptr async_first(TaskIn task, bool may_sync = true);
- /**
- * 同步执行任务
- * @param task
- * @return
- */
- void sync(const TaskIn &task);
- /**
- * 最高优先级方式同步执行任务
- * @param task
- * @return
- */
- void sync_first(const TaskIn &task);
- };
- /**
- * 任务执行器
- */
- class TaskExecutor : public ThreadLoadCounter, public TaskExecutorInterface {
- public:
- using Ptr = shared_ptr<TaskExecutor>;
- /**
- * 构造函数
- * @param max_size cpu负载统计样本数
- * @param max_usec cpu负载统计时间窗口大小
- */
- TaskExecutor(uint64_t max_size = 32, uint64_t max_usec = 2 * 1000 * 1000);
- ~TaskExecutor() = default;
- };
- class TaskExecutorGetter {
- public:
- using Ptr = shared_ptr<TaskExecutorGetter>;
- virtual ~TaskExecutorGetter() = default;
- /**
- * 获取任务执行器
- * @return 任务执行器
- */
- virtual TaskExecutor::Ptr getExecutor() = 0;
- };
- class TaskExecutorGetterImp : public TaskExecutorGetter {
- public:
- TaskExecutorGetterImp() = default;
- ~TaskExecutorGetterImp() = default;
- /**
- * 根据线程负载情况,获取最空闲的任务执行器
- * @return 任务执行器
- */
- TaskExecutor::Ptr getExecutor() override;
- /**
- * 获取所有线程的负载率
- * @return 所有线程的负载率
- */
- vector<int> getExecutorLoad();
- /**
- * 获取所有线程任务执行延时,单位毫秒
- * 通过此函数也可以大概知道线程负载情况
- * @return
- */
- void getExecutorDelay(const function<void(const vector<int> &)> &callback);
- /**
- * 遍历所有线程
- */
- void for_each(const function<void(const TaskExecutor::Ptr &)> &cb);
- protected:
- size_t addPoller(const string &name, size_t size, int priority, bool register_thread);
- protected:
- size_t _thread_pos = 0;
- vector<TaskExecutor::Ptr> _threads;
- };
- }//toolkit
- #endif //ZLTOOLKIT_TASKEXECUTOR_H
|