SeqAn3  3.2.0-rc.1
The Modern C++ library for sequence analysis.
execution_handler_parallel.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <concepts>
16 #include <functional>
17 #include <ranges>
18 #include <thread>
19 #include <type_traits>
20 #include <vector>
21 
25 
26 namespace seqan3::detail
27 {
28 
54 class execution_handler_parallel
55 {
56 private:
58  using task_type = std::function<void()>;
59 
60 public:
73  execution_handler_parallel(size_t const thread_count) : state{std::make_unique<internal_state>()}
74  {
75  auto * q = &(state->queue);
76  for (size_t i = 0; i < thread_count; ++i)
77  {
78  state->thread_pool.emplace_back(
79  [q]()
80  {
81  for (;;)
82  {
83  task_type task;
84  if (q->wait_pop(task) == contrib::queue_op_status::closed)
85  return;
86 
87  task();
88  }
89  });
90  }
91  }
92 
107  execution_handler_parallel() : execution_handler_parallel{1u}
108  {}
109 
110  execution_handler_parallel(execution_handler_parallel const &) = delete;
111  execution_handler_parallel(execution_handler_parallel &&) = default;
112  execution_handler_parallel & operator=(execution_handler_parallel const &) = delete;
113  execution_handler_parallel & operator=(execution_handler_parallel &&) = default;
114  ~execution_handler_parallel() = default;
115 
117 
136  template <std::copy_constructible algorithm_t, typename algorithm_input_t, std::copy_constructible callback_t>
137  requires std::invocable<algorithm_t, algorithm_input_t, callback_t>
138  && (std::is_lvalue_reference_v<algorithm_input_t> || std::move_constructible<algorithm_input_t>)
139  void execute(algorithm_t && algorithm, algorithm_input_t && input, callback_t && callback)
140  {
141  assert(state != nullptr);
142 
143  // Note: Unfortunately, we can't use std::forward_as_tuple here because a std::function object (`task_type`)
144  // cannot be constructed if the tuple element type is a rvalue-reference.
145  // So we capture the input as a `tuple<algorithm_input_t>` which either is a lvalue reference or has no
146  // reference type according to the reference collapsing rules of forwarding references.
147  // Then we forward the input into the tuple which either just stores the reference or the input is moved into
148  // the tuple. When the task is executed by some thread the stored input will either be forwarded as a
149  // lvalue-reference to the algorithm or the input is moved into the algorithm from the tuple. This is valid
150  // since the task is executed only once by the parallel execution handler.
151  // Here is a discussion about the problem on stackoverflow:
152  // https://stackoverflow.com/questions/26831382/capturing-perfectly-forwarded-variable-in-lambda/
153 
154  // Asynchronously pushes the algorithm job as a task to the queue.
155  // Note: that lambda is mutable, s.t. we can move out the content of input_tpl
156  task_type task =
157  [=, input_tpl = std::tuple<algorithm_input_t>{std::forward<algorithm_input_t>(input)}]() mutable
158  {
159  using forward_input_t = std::tuple_element_t<0, decltype(input_tpl)>;
160  algorithm(std::forward<forward_input_t>(std::get<0>(input_tpl)), std::move(callback));
161  };
162 
163  [[maybe_unused]] contrib::queue_op_status status = state->queue.wait_push(std::move(task));
164  assert(status == contrib::queue_op_status::success);
165  }
166 
183  template <std::copy_constructible algorithm_t,
184  std::ranges::input_range algorithm_input_range_t,
185  std::copy_constructible callback_t>
186  requires std::invocable<algorithm_t, std::ranges::range_reference_t<algorithm_input_range_t>, callback_t>
187  void bulk_execute(algorithm_t && algorithm, algorithm_input_range_t && input_range, callback_t && callback)
188  {
189  for (auto && input : input_range)
190  execute(algorithm, std::forward<decltype(input)>(input), callback);
191 
192  wait();
193  }
194 
196  void wait()
197  {
198  assert(state != nullptr);
199 
200  state->stop_and_wait();
201  }
202 
203 private:
212  class internal_state
213  {
214  public:
219  internal_state() = default;
220  internal_state(internal_state const &) = delete;
221  internal_state(internal_state &&) = default;
222  internal_state & operator=(internal_state const &) = delete;
223  internal_state & operator=(internal_state &&) = default;
224 
226  ~internal_state()
227  {
228  stop_and_wait();
229  }
231 
240  void stop_and_wait()
241  {
242  queue.close();
243 
244  for (auto & t : thread_pool)
245  {
246  if (t.joinable())
247  t.join();
248  }
249  }
250 
252  std::vector<std::thread> thread_pool{};
254  contrib::fixed_buffer_queue<task_type> queue{10000};
255  };
256 
258  std::unique_ptr<internal_state> state{nullptr};
259 };
260 
261 } // namespace seqan3::detail
Provides various type traits on generic types.
Provides seqan3::buffer_queue.
The <concepts> header from C++20's standard library.
T forward(T... args)
requires requires
The rank_type of the semi-alphabet; defined as the return type of seqan3::to_rank....
Definition: alphabet/concept.hpp:164
T make_unique(T... args)
SeqAn specific customisations in the standard namespace.
The <ranges> header from C++20's standard library.
Provides seqan3::detail::reader_writer_manager.