SeqAn3  3.2.0-rc.1
The Modern C++ library for sequence analysis.
bgzf_ostream.hpp
1 // zipstream Library License:
2 // --------------------------
3 //
4 // The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5 //
6 // This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7 //
8 // Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9 //
10 // 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11 //
12 // 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13 //
14 // 3. This notice may not be removed or altered from any source distribution
15 //
16 //
17 // Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream)
18 // Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format)
19 // Author: RenĂ© Rahn, rene.rahn [at] fu-berlin.de, 2019 (adaptions to SeqAn library version 3)
20 
21 #pragma once
22 
23 #include <cassert>
24 #include <functional>
25 
29 
30 #if !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
31 # error "This file cannot be used when building without GZip-support."
32 #endif // !defined(SEQAN3_HAS_ZLIB) && !defined(SEQAN3_HEADER_TEST)
33 
34 #if defined(SEQAN3_HAS_ZLIB)
35 
36 namespace seqan3::contrib
37 {
38 
39 // --------------------------------------------------------------------------
40 // Class basic_bgzf_ostreambuf
41 // --------------------------------------------------------------------------
42 
43 template<
44  typename Elem,
45  typename Tr = std::char_traits<Elem>,
46  typename ElemA = std::allocator<Elem>,
47  typename ByteT = char,
48  typename ByteAT = std::allocator<ByteT>
49 >
50 class basic_bgzf_ostreambuf : public std::basic_streambuf<Elem, Tr>
51 {
52 private:
53 
54  typedef std::basic_ostream<Elem, Tr>& ostream_reference;
55  typedef ElemA char_allocator_type;
56  typedef ByteT byte_type;
57  typedef ByteAT byte_allocator_type;
58  typedef byte_type* byte_buffer_type;
59  typedef ConcurrentQueue<size_t, Suspendable<Limit> > job_queue_type;
60 
61 public:
62 
63  typedef Tr traits_type;
64  typedef typename traits_type::char_type char_type;
65  typedef typename traits_type::int_type int_type;
66  typedef typename traits_type::pos_type pos_type;
67  typedef typename traits_type::off_type off_type;
68 
69  struct ScopedLock
70  {
71  ScopedLock(std::function<void()> complete_fn) : completion(std::move(complete_fn))
72  {}
73 
74  ~ScopedLock()
75  {
76  completion();
77  }
78 
79  std::function<void()> completion;
80  };
81 
82  // One compressed block.
83  struct OutputBuffer
84  {
85  char buffer[DefaultPageSize<detail::bgzf_compression>::MAX_BLOCK_SIZE];
86  size_t size;
87  };
88 
89  // Writes the output to the underlying stream when invoked.
90  struct BufferWriter
91  {
92  ostream_reference ostream;
93 
94  BufferWriter(ostream_reference ostream) :
95  ostream(ostream)
96  {}
97 
98  bool operator() (OutputBuffer const & outputBuffer)
99  {
100  ostream.write(outputBuffer.buffer, outputBuffer.size);
101  return ostream.good();
102  }
103  };
104 
105  struct CompressionJob
106  {
108 
109  TBuffer buffer;
110  size_t size;
111  OutputBuffer *outputBuffer;
112 
113  CompressionJob() :
114  buffer(DefaultPageSize<detail::bgzf_compression>::VALUE / sizeof(char_type), 0),
115  size(0),
116  outputBuffer(NULL)
117  {}
118  };
119 
120  // string of recycable jobs
121  size_t numThreads;
122  size_t numJobs;
124  job_queue_type jobQueue;
125  job_queue_type idleQueue;
126  Serializer<OutputBuffer, BufferWriter> serializer;
127  size_t currentJobId;
128  bool currentJobAvail;
129 
130  struct CompressionThread
131  {
132  basic_bgzf_ostreambuf *streamBuf;
133  CompressionContext<detail::bgzf_compression> compressionCtx;
134 
135  void operator()()
136  {
137  ScopedLock readLock{[this] () mutable { unlockReading(this->streamBuf->jobQueue); }};
138  // ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
139  ScopedLock writeLock{[this] () mutable { unlockWriting(this->streamBuf->idleQueue); }};
140  // ScopedWriteLock{obQueue> writeLock{str}amBuf->idleQueue);
141 
142  // wait for a new job to become available
143  bool success = true;
144  while (success)
145  {
146  size_t jobId = -1;
147  if (!popFront(jobId, streamBuf->jobQueue))
148  return;
149 
150  CompressionJob &job = streamBuf->jobs[jobId];
151 
152  // compress block with zlib
153  job.outputBuffer->size = _compressBlock(
154  job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
155  &job.buffer[0], job.size, compressionCtx);
156 
157  success = releaseValue(streamBuf->serializer, job.outputBuffer);
158  appendValue(streamBuf->idleQueue, jobId);
159  }
160  }
161  };
162 
163  // array of worker threads
164  // using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
166 
167  basic_bgzf_ostreambuf(ostream_reference ostream_,
168  size_t numThreads = bgzf_thread_count,
169  size_t jobsPerThread = 8) :
170  numThreads(numThreads),
171  numJobs(numThreads * jobsPerThread),
172  jobQueue(numJobs),
173  idleQueue(numJobs),
174  serializer(ostream_, numThreads * jobsPerThread)
175  {
176  jobs.resize(numJobs);
177  currentJobId = 0;
178 
179  lockWriting(jobQueue);
180  lockReading(idleQueue);
181  setReaderWriterCount(jobQueue, numThreads, 1);
182  setReaderWriterCount(idleQueue, 1, numThreads);
183 
184  // Prepare idle queue.
185  for (size_t i = 0; i < numJobs; ++i)
186  {
187  [[maybe_unused]] bool success = appendValue(idleQueue, i);
188  assert(success);
189  }
190 
191  // Start off threads.
192  for (size_t i = 0; i < numThreads; ++i)
193  pool.emplace_back(CompressionThread{this, CompressionContext<detail::bgzf_compression>{}});
194 
195  currentJobAvail = popFront(currentJobId, idleQueue);
196  assert(currentJobAvail);
197 
198  CompressionJob &job = jobs[currentJobId];
199  job.outputBuffer = aquireValue(serializer);
200  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
201  }
202 
203  ~basic_bgzf_ostreambuf()
204  {
205  // the buffer is now (after addFooter()) and flush will append the empty EOF marker
206  flush(true);
207 
208  unlockWriting(jobQueue);
209 
210  // Wait for threads to finish there active work.
211  for (auto & t : pool)
212  {
213  if (t.joinable())
214  t.join();
215  }
216 
217  unlockReading(idleQueue);
218  }
219 
220  bool compressBuffer(size_t size)
221  {
222  // submit current job
223  if (currentJobAvail)
224  {
225  jobs[currentJobId].size = size;
226  appendValue(jobQueue, currentJobId);
227  }
228 
229  // recycle existing idle job
230  if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
231  return false;
232 
233  jobs[currentJobId].outputBuffer = aquireValue(serializer);
234 
235  return serializer;
236  }
237 
238  int_type overflow(int_type c)
239  {
240  int w = static_cast<int>(this->pptr() - this->pbase());
241  if (c != static_cast<int_type>(EOF))
242  {
243  *this->pptr() = c;
244  ++w;
245  }
246  if (compressBuffer(w))
247  {
248  CompressionJob &job = jobs[currentJobId];
249  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
250  return c;
251  }
252  else
253  {
254  return EOF;
255  }
256  }
257 
258  std::streamsize flush(bool flushEmptyBuffer = false)
259  {
260  int w = static_cast<int>(this->pptr() - this->pbase());
261  if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
262  {
263  CompressionJob &job = jobs[currentJobId];
264  this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
265  }
266  else
267  {
268  w = 0;
269  }
270 
271  // wait for running compressor threads
272  waitForMinSize(idleQueue, numJobs - 1);
273 
274  serializer.worker.ostream.flush();
275  return w;
276  }
277 
278  int sync()
279  {
280  if (this->pptr() != this->pbase())
281  {
282  int_type c = overflow(EOF);
283  if (c == static_cast<int_type>(EOF))
284  return -1;
285  }
286  return 0;
287  }
288 
289  void addFooter()
290  {
291  // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
292  if (this->pptr() != this->pbase())
293  overflow(EOF);
294  }
295 
296  // returns a reference to the output stream
297  ostream_reference get_ostream() const { return serializer.worker.ostream; };
298 };
299 
300 // --------------------------------------------------------------------------
301 // Class basic_bgzf_ostreambase
302 // --------------------------------------------------------------------------
303 
304 template<
305  typename Elem,
306  typename Tr = std::char_traits<Elem>,
307  typename ElemA = std::allocator<Elem>,
308  typename ByteT = char,
309  typename ByteAT = std::allocator<ByteT>
310 >
311 class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
312 {
313 public:
314  typedef std::basic_ostream<Elem, Tr>& ostream_reference;
315  typedef basic_bgzf_ostreambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
316 
317  basic_bgzf_ostreambase(ostream_reference ostream_)
318  : m_buf(ostream_)
319  {
320  this->init(&m_buf );
321  };
322 
323  // returns the underlying zip ostream object
324  bgzf_streambuf_type* rdbuf() { return &m_buf; };
325  // returns the bgzf error state
326  int get_zerr() const { return m_buf.get_err(); };
327  // returns the uncompressed data crc
328  long get_crc() const { return m_buf.get_crc(); };
329  // returns the compressed data size
330  long get_out_size() const { return m_buf.get_out_size(); };
331  // returns the uncompressed data size
332  long get_in_size() const { return m_buf.get_in_size(); };
333 
334 private:
335  bgzf_streambuf_type m_buf;
336 };
337 
338 // --------------------------------------------------------------------------
339 // Class basic_bgzf_ostream
340 // --------------------------------------------------------------------------
341 
342 template<
343  typename Elem,
344  typename Tr = std::char_traits<Elem>,
345  typename ElemA = std::allocator<Elem>,
346  typename ByteT = char,
347  typename ByteAT = std::allocator<ByteT>
348 >
349 class basic_bgzf_ostream :
350  public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
351  public std::basic_ostream<Elem,Tr>
352 {
353 public:
354  typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
355  typedef std::basic_ostream<Elem,Tr> ostream_type;
356  typedef ostream_type& ostream_reference;
357 
358  basic_bgzf_ostream(ostream_reference ostream_) :
359  bgzf_ostreambase_type(ostream_),
360  ostream_type(bgzf_ostreambase_type::rdbuf())
361  {}
362 
363  // flush inner buffer and zipper buffer
364  basic_bgzf_ostream<Elem,Tr>& flush()
365  {
366  ostream_type::flush(); this->rdbuf()->flush(); return *this;
367  };
368 
369  ~basic_bgzf_ostream()
370  {
371  this->rdbuf()->addFooter();
372  }
373 
374 private:
375  static void put_long(ostream_reference out_, unsigned long x_);
376 #ifdef _WIN32
377  void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
378  void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
379 #endif
380 };
381 
382 // ===========================================================================
383 // Typedefs
384 // ===========================================================================
385 
386 // A typedef for basic_bgzf_ostream<char>
387 typedef basic_bgzf_ostream<char> bgzf_ostream;
388 // A typedef for basic_bgzf_ostream<wchar_t>
389 typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
390 
391 } // namespace seqan3::contrib
392 
393 #endif // defined(SEQAN3_HAS_ZLIB)
Provides stream compression utilities.
T emplace_back(T... args)
T flush(T... args)
constexpr size_t size
The size of a type pack.
Definition: type_pack/traits.hpp:146
T init(T... args)
SeqAn specific customisations in the standard namespace.
T rdbuf(T... args)
T resize(T... args)
Provides helper structs from SeqAn2 for the bgzf_ostream.
T size(T... args)
Provides seqan suspendable queue.