Grok  9.5.0
ThreadPool.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2016-2021 Grok Image Compression Inc.
3  *
4  * This source code is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Affero General Public License, version 3,
6  * as published by the Free Software Foundation.
7  *
8  * This source code is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Affero General Public License for more details.
12  *
13  * You should have received a copy of the GNU Affero General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  *
16  */
17 
18 #pragma once
19 
20 #include <vector>
21 #include <queue>
22 #include <memory>
23 #include <thread>
24 #include <mutex>
25 #include <condition_variable>
26 #include <future>
27 #include <functional>
28 #include <stdexcept>
29 #include <map>
30 #include <type_traits>
31 #include <iostream>
32 
34 {
35  public:
36  ThreadPool(size_t);
37  template<class F, class... Args>
38  auto enqueue(F&& f, Args&&... args)
39  -> std::future<typename std::invoke_result<F, Args...>::type>;
40  ~ThreadPool();
41  int thread_number(std::thread::id id)
42  {
43  if(id_map.find(id) != id_map.end())
44  return (int)id_map[id];
45  return -1;
46  }
47  size_t num_threads()
48  {
49  return m_num_threads;
50  }
51 
52  static ThreadPool* get()
53  {
54  return instance(0);
55  }
56  static ThreadPool* instance(uint32_t numthreads)
57  {
58  std::unique_lock<std::mutex> lock(singleton_mutex);
59  if(!singleton)
60  singleton = new ThreadPool(numthreads ? numthreads : hardware_concurrency());
61  return singleton;
62  }
63  static void release()
64  {
65  std::unique_lock<std::mutex> lock(singleton_mutex);
66  delete singleton;
67  singleton = nullptr;
68  }
69  static uint32_t hardware_concurrency()
70  {
71  return std::thread::hardware_concurrency();
72  }
73 
74  private:
75  // need to keep track of threads so we can join them
76  std::vector<std::thread> workers;
77  // the task queue
78  std::queue<std::function<void()>> tasks;
79 
80  // synchronization
81  std::mutex queue_mutex;
82  std::condition_variable condition;
83  bool stop;
84 
85  std::map<std::thread::id, size_t> id_map;
86  size_t m_num_threads;
87 
89  static std::mutex singleton_mutex;
90 };
91 
92 // the constructor just launches some amount of workers
93 inline ThreadPool::ThreadPool(size_t threads) : stop(false), m_num_threads(threads)
94 {
95  if(threads == 1)
96  return;
97 
98  for(size_t i = 0; i < threads; ++i)
99  workers.emplace_back([this] {
100  for(;;)
101  {
102  std::function<void()> task;
103 
104  {
105  std::unique_lock<std::mutex> lock(this->queue_mutex);
106  this->condition.wait(lock,
107  [this] { return this->stop || !this->tasks.empty(); });
108  if(this->stop && this->tasks.empty())
109  return;
110  task = std::move(this->tasks.front());
111  this->tasks.pop();
112  }
113 
114  task();
115  }
116  });
117  size_t thread_count = 0;
118  for(std::thread& worker : workers)
119  {
120  id_map[worker.get_id()] = thread_count;
121 #ifdef __linux__
122  // Create a cpu_set_t object representing a set of CPUs. Clear it and mark
123  // only CPU i as set.
124  // Note: we assume that the second half of the logical cores
125  // are hyper-threaded siblings to the first half
126  cpu_set_t cpuset;
127  CPU_ZERO(&cpuset);
128  CPU_SET(thread_count, &cpuset);
129  int rc = pthread_setaffinity_np(worker.native_handle(), sizeof(cpu_set_t), &cpuset);
130  if(rc != 0)
131  {
132  std::cerr << "Error calling pthread_setaffinity_np: " << rc << "\n";
133  }
134 #endif
135  thread_count++;
136  }
137 }
138 
139 // add new work item to the pool
140 template<class F, class... Args>
141 auto ThreadPool::enqueue(F&& f, Args&&... args)
142  -> std::future<typename std::invoke_result<F, Args...>::type>
143 {
144  assert(m_num_threads > 1);
145  using return_type = typename std::invoke_result<F, Args...>::type;
146 
147  auto task = std::make_shared<std::packaged_task<return_type()>>(
148  std::bind(std::forward<F>(f), std::forward<Args>(args)...));
149 
150  std::future<return_type> res = task->get_future();
151  {
152  std::unique_lock<std::mutex> lock(queue_mutex);
153 
154  // don't allow enqueueing after stopping the pool
155  if(stop)
156  throw std::runtime_error("enqueue on stopped ThreadPool");
157 
158  tasks.emplace([task]() { (*task)(); });
159  }
160  condition.notify_one();
161  return res;
162 }
163 
164 // the destructor joins all threads
166 {
167  {
168  std::unique_lock<std::mutex> lock(queue_mutex);
169  stop = true;
170  }
171  condition.notify_all();
172  for(std::thread& worker : workers)
173  worker.join();
174 }
Definition: ThreadPool.hpp:34
bool stop
Definition: ThreadPool.hpp:83
static ThreadPool * singleton
Definition: ThreadPool.hpp:88
static uint32_t hardware_concurrency()
Definition: ThreadPool.hpp:69
size_t m_num_threads
Definition: ThreadPool.hpp:86
static void release()
Definition: ThreadPool.hpp:63
auto enqueue(F &&f, Args &&... args) -> std::future< typename std::invoke_result< F, Args... >::type >
Definition: ThreadPool.hpp:141
~ThreadPool()
Definition: ThreadPool.hpp:165
size_t num_threads()
Definition: ThreadPool.hpp:47
std::condition_variable condition
Definition: ThreadPool.hpp:82
int thread_number(std::thread::id id)
Definition: ThreadPool.hpp:41
static std::mutex singleton_mutex
Definition: ThreadPool.hpp:89
std::vector< std::thread > workers
Definition: ThreadPool.hpp:76
std::map< std::thread::id, size_t > id_map
Definition: ThreadPool.hpp:85
static ThreadPool * instance(uint32_t numthreads)
Definition: ThreadPool.hpp:56
ThreadPool(size_t)
Definition: ThreadPool.hpp:93
std::mutex queue_mutex
Definition: ThreadPool.hpp:81
std::queue< std::function< void()> > tasks
Definition: ThreadPool.hpp:78
static ThreadPool * get()
Definition: ThreadPool.hpp:52