SeqAn3  3.2.0
The Modern C++ library for sequence analysis.
async_input_buffer.hpp
Go to the documentation of this file.
1 // -----------------------------------------------------------------------------------------------------
2 // Copyright (c) 2006-2022, Knut Reinert & Freie Universität Berlin
3 // Copyright (c) 2016-2022, Knut Reinert & MPI für molekulare Genetik
4 // This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License
5 // shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md
6 // -----------------------------------------------------------------------------------------------------
7 
13 #pragma once
14 
15 #include <concepts>
16 #include <iterator>
17 #include <memory>
18 #include <ranges>
19 #include <thread>
20 
23 
24 //-----------------------------------------------------------------------------
25 // This is the path a value takes when using this views::
26 // urange
27 // → async_input_buffer_view.buffer [size n]
28 // → iterator.cached_value [size 1]
29 // → user
30 //-----------------------------------------------------------------------------
31 
32 namespace seqan3::detail
33 {
34 
40 template <std::ranges::range urng_t>
41 class async_input_buffer_view : public std::ranges::view_interface<async_input_buffer_view<urng_t>>
42 {
43 private:
44  static_assert(std::ranges::input_range<urng_t>,
45  "The range parameter to async_input_buffer_view must be at least a std::ranges::input_range.");
46  static_assert(std::ranges::view<urng_t>,
47  "The range parameter to async_input_buffer_view must model std::ranges::view.");
48  static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
49  "The range parameter to async_input_buffer_view must have a value_type that is std::movable.");
50  static_assert(
51  std::constructible_from<std::ranges::range_value_t<urng_t>,
52  std::remove_reference_t<std::ranges::range_reference_t<urng_t>> &&>,
53  "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved "
54  "value of its reference type.");
55 
57  using urng_iterator_type = std::ranges::iterator_t<urng_t>;
58 
60  struct state
61  {
63  urng_t urange;
64 
66  contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> buffer;
67 
69  std::thread producer;
70  };
71 
73  std::shared_ptr<state> state_ptr = nullptr;
74 
76  class iterator;
77 
78 public:
82  async_input_buffer_view() = default;
83  async_input_buffer_view(async_input_buffer_view const &) = default;
84  async_input_buffer_view(async_input_buffer_view &&) = default;
85  async_input_buffer_view & operator=(async_input_buffer_view const &) = default;
86  async_input_buffer_view & operator=(async_input_buffer_view &&) = default;
87  ~async_input_buffer_view() = default;
88 
90  async_input_buffer_view(urng_t _urng, size_t const buffer_size)
91  {
92  auto deleter = [](state * p)
93  {
94  if (p != nullptr)
95  {
96  p->buffer.close();
97  p->producer.join();
98  delete p;
99  }
100  };
101 
102  state_ptr = std::shared_ptr<state>(
103  new state{std::move(_urng),
104  contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>>{buffer_size},
105  std::thread{}}, // thread is set/started below, needs rest of state
106  deleter);
107 
108  auto runner = [&state = *state_ptr]()
109  {
110  for (auto && val : state.urange)
111  if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed)
112  break;
113 
114  state.buffer.close();
115  };
116 
117  state_ptr->producer = std::thread{runner};
118  }
119 
121  template <typename other_urng_t>
122  requires (!std::same_as<std::remove_cvref_t<other_urng_t>, async_input_buffer_view>)
123  && // prevent recursive instantiation
124  std::ranges::viewable_range<other_urng_t>
125  && std::constructible_from<urng_t, std::ranges::ref_view<std::remove_reference_t<other_urng_t>>>
126  async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) :
127  async_input_buffer_view{std::views::all(_urng), buffer_size}
128  {}
130 
145  iterator begin()
146  {
147  assert(state_ptr != nullptr);
148  return {state_ptr->buffer};
149  }
150 
152  iterator begin() const = delete;
153 
155  std::default_sentinel_t end()
156  {
157  return std::default_sentinel;
158  }
159 
161  std::default_sentinel_t end() const = delete;
163 };
164 
166 template <std::ranges::range urng_t>
167 class async_input_buffer_view<urng_t>::iterator
168 {
170  using sentinel_type = std::default_sentinel_t;
171 
173  contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> * buffer_ptr = nullptr;
174 
176  mutable std::ranges::range_value_t<urng_t> cached_value;
177 
179  bool at_end = false;
180 
181 public:
186  using difference_type = std::iter_difference_t<urng_iterator_type>;
188  using value_type = std::iter_value_t<urng_iterator_type>;
190  using pointer = detail::iter_pointer_t<urng_iterator_type>;
192  using reference = std::iter_reference_t<urng_iterator_type>;
194  using iterator_category = std::input_iterator_tag;
196  using iterator_concept = iterator_category;
198 
203  iterator() = default;
204  //TODO: delete:
205  iterator(iterator const & rhs) = default;
206  iterator(iterator && rhs) = default;
207  //TODO: delete:
208  iterator & operator=(iterator const & rhs) = default;
209  iterator & operator=(iterator && rhs) = default;
210  ~iterator() noexcept = default;
211 
213  iterator(contrib::fixed_buffer_queue<std::ranges::range_value_t<urng_t>> & buffer) noexcept : buffer_ptr{&buffer}
214  {
215  ++(*this); // cache first value
216  }
218 
223  reference operator*() const noexcept
224  {
225  return cached_value;
226  }
227 
229  pointer operator->() const noexcept
230  {
231  return std::addressof(cached_value);
232  }
234 
239  iterator & operator++() noexcept
240  {
241  if (at_end) // TODO unlikely
242  return *this;
243 
244  assert(buffer_ptr != nullptr);
245 
246  if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed)
247  at_end = true;
248 
249  return *this;
250  }
251 
253  void operator++(int) noexcept
254  {
255  ++(*this);
256  }
258 
263  friend constexpr bool operator==(iterator const & lhs, std::default_sentinel_t const &) noexcept
264  {
265  return lhs.at_end;
266  }
267 
269  friend constexpr bool operator==(std::default_sentinel_t const &, iterator const & rhs) noexcept
270  {
271  return rhs == std::default_sentinel_t{};
272  }
273 
275  friend constexpr bool operator!=(iterator const & lhs, std::default_sentinel_t const &) noexcept
276  {
277  return !(lhs == std::default_sentinel_t{});
278  }
279 
281  friend constexpr bool operator!=(std::default_sentinel_t const &, iterator const & rhs) noexcept
282  {
283  return rhs != std::default_sentinel_t{};
284  }
286 };
287 
294 template <std::ranges::viewable_range urng_t>
295 async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view<std::views::all_t<urng_t>>;
297 
298 // ============================================================================
299 // async_input_buffer_fn (adaptor definition
300 // ============================================================================
301 
303 struct async_input_buffer_fn
304 {
306  constexpr auto operator()(size_t const buffer_size) const
307  {
308  return detail::adaptor_from_functor{*this, buffer_size};
309  }
310 
316  template <std::ranges::range urng_t>
317  constexpr auto operator()(urng_t && urange, size_t const buffer_size) const
318  {
319  static_assert(std::ranges::input_range<urng_t>,
320  "The range parameter to views::async_input_buffer must be at least a std::ranges::input_range.");
321  static_assert(std::ranges::viewable_range<urng_t>,
322  "The range parameter to views::async_input_buffer cannot be a temporary of a non-view range.");
323  static_assert(std::movable<std::ranges::range_value_t<urng_t>>,
324  "The range parameter to views::async_input_buffer must have a value_type that is std::movable.");
325  static_assert(
326  std::constructible_from<std::ranges::range_value_t<urng_t>,
327  std::remove_reference_t<std::ranges::range_reference_t<urng_t>> &&>,
328  "The range parameter to views::async_input_buffer must have a value_type that is constructible by a moved "
329  "value of its reference type.");
330 
331  if (buffer_size == 0)
332  throw std::invalid_argument{"The buffer_size parameter to views::async_input_buffer must be > 0."};
333 
334  return detail::async_input_buffer_view{std::forward<urng_t>(urange), buffer_size};
335  }
336 };
337 
338 } // namespace seqan3::detail
339 
340 //-----------------------------------------------------------------------------
341 // View shortcut for functor.
342 //-----------------------------------------------------------------------------
343 
344 namespace seqan3::views
345 {
481 inline constexpr auto async_input_buffer = detail::async_input_buffer_fn{};
482 } // namespace seqan3::views
Provides seqan3::detail::adaptor_from_functor.
T addressof(T... args)
T begin(T... args)
Provides seqan3::buffer_queue.
The <concepts> header from C++20's standard library.
T end(T... args)
requires requires
The rank_type of the semi-alphabet; defined as the return type of seqan3::to_rank....
Definition: alphabet/concept.hpp:164
constexpr auto async_input_buffer
A view adapter that returns a concurrent-queue-like view over the underlying range.
Definition: async_input_buffer.hpp:481
The SeqAn namespace for views.
Definition: char_strictly_to.hpp:22
SeqAn specific customisations in the standard namespace.
T operator!=(T... args)
The <ranges> header from C++20's standard library.