ThreadPool.h 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. /*
  2. * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef THREADPOOL_H_
  11. #define THREADPOOL_H_
  12. #include <assert.h>
  13. #include <vector>
  14. #include "threadgroup.h"
  15. #include "TaskQueue.h"
  16. #include "TaskExecutor.h"
  17. #include "Util/util.h"
  18. #include "Util/logger.h"
  19. namespace toolkit {
  20. class ThreadPool : public TaskExecutor{
  21. public:
  22. enum Priority {
  23. PRIORITY_LOWEST = 0,
  24. PRIORITY_LOW,
  25. PRIORITY_NORMAL,
  26. PRIORITY_HIGH,
  27. PRIORITY_HIGHEST
  28. };
  29. //num:线程池线程个数
  30. ThreadPool(int num = 1,
  31. Priority priority = PRIORITY_HIGHEST,
  32. bool autoRun = true) :
  33. _thread_num(num), _priority(priority) {
  34. if(autoRun){
  35. start();
  36. }
  37. _logger = Logger::Instance().shared_from_this();
  38. }
  39. ~ThreadPool() {
  40. shutdown();
  41. wait();
  42. }
  43. //把任务打入线程池并异步执行
  44. Task::Ptr async(TaskIn task,bool may_sync = true) override {
  45. if (may_sync && _thread_group.is_this_thread_in()) {
  46. task();
  47. return nullptr;
  48. }
  49. auto ret = std::make_shared<Task>(std::move(task));
  50. _queue.push_task(ret);
  51. return ret;
  52. }
  53. Task::Ptr async_first(TaskIn task,bool may_sync = true) override{
  54. if (may_sync && _thread_group.is_this_thread_in()) {
  55. task();
  56. return nullptr;
  57. }
  58. auto ret = std::make_shared<Task>(std::move(task));
  59. _queue.push_task_first(ret);
  60. return ret;
  61. }
  62. size_t size(){
  63. return _queue.size();
  64. }
  65. static bool setPriority(Priority priority = PRIORITY_NORMAL,
  66. thread::native_handle_type threadId = 0) {
  67. // set priority
  68. #if defined(_WIN32)
  69. static int Priorities[] = { THREAD_PRIORITY_LOWEST, THREAD_PRIORITY_BELOW_NORMAL, THREAD_PRIORITY_NORMAL, THREAD_PRIORITY_ABOVE_NORMAL, THREAD_PRIORITY_HIGHEST };
  70. if (priority != PRIORITY_NORMAL && SetThreadPriority(GetCurrentThread(), Priorities[priority]) == 0) {
  71. return false;
  72. }
  73. return true;
  74. #else
  75. static int Min = sched_get_priority_min(SCHED_OTHER);
  76. if (Min == -1) {
  77. return false;
  78. }
  79. static int Max = sched_get_priority_max(SCHED_OTHER);
  80. if (Max == -1) {
  81. return false;
  82. }
  83. static int Priorities[] = { Min, Min + (Max - Min) / 4, Min
  84. + (Max - Min) / 2, Min + (Max - Min) * 3/ 4, Max };
  85. if (threadId == 0) {
  86. threadId = pthread_self();
  87. }
  88. struct sched_param params;
  89. params.sched_priority = Priorities[priority];
  90. return pthread_setschedparam(threadId, SCHED_OTHER, &params) == 0;
  91. #endif
  92. }
  93. void start() {
  94. if (_thread_num <= 0) {
  95. return;
  96. }
  97. size_t total = _thread_num - _thread_group.size();
  98. for (size_t i = 0; i < total; ++i) {
  99. _thread_group.create_thread(bind(&ThreadPool::run, this));
  100. }
  101. }
  102. private:
  103. void run() {
  104. ThreadPool::setPriority(_priority);
  105. Task::Ptr task;
  106. while (true) {
  107. startSleep();
  108. if (!_queue.get_task(task)) {
  109. //空任务,退出线程
  110. break;
  111. }
  112. sleepWakeUp();
  113. try {
  114. (*task)();
  115. task = nullptr;
  116. } catch (std::exception &ex) {
  117. ErrorL << "ThreadPool执行任务捕获到异常:" << ex.what();
  118. }
  119. }
  120. }
  121. void wait() {
  122. _thread_group.join_all();
  123. }
  124. void shutdown() {
  125. _queue.push_exit(_thread_num);
  126. }
  127. private:
  128. size_t _thread_num;
  129. TaskQueue<Task::Ptr> _queue;
  130. thread_group _thread_group;
  131. Priority _priority;
  132. Logger::Ptr _logger;
  133. };
  134. } /* namespace toolkit */
  135. #endif /* THREADPOOL_H_ */