sample_kmers.cu 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636
  1. /*
  2. * nvbio
  3. * Copyright (c) 2011-2014, NVIDIA CORPORATION. All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the NVIDIA CORPORATION nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. // sample_kmers.h
  28. //
  29. #include "sample_kmers.h"
  30. #include "utils.h"
  31. #include <nvbio/basic/pipeline_context.h>
  32. #include <nvbio/basic/numbers.h>
  33. #include <nvbio/basic/bloom_filter.h>
  34. #include <nvbio/basic/primitives.h>
  35. #include <nvbio/basic/console.h>
  36. #include <nvbio/basic/timer.h>
  37. #include <nvbio/basic/threads.h>
  38. #include <nvbio/basic/system.h>
  39. #include <nvbio/basic/exceptions.h>
  40. #include <nvbio/basic/cuda/ldg.h>
  41. #include <nvbio/basic/cuda/arch.h>
  42. #include <nvbio/io/sequence/sequence.h>
  43. #include <nvbio/strings/prefetcher.h>
  44. #include <stdio.h>
  45. #include <stdlib.h>
  46. using namespace nvbio;
  47. ///
  48. /// A functor to sample kmers and insert them in a Bloom filter
  49. ///
  50. template <typename string_set_type, typename filter_type>
  51. struct SampleKmersFunctor
  52. {
  53. /// constructor
  54. ///
  55. ///\param _k kmer length
  56. ///\param _alpha the sampling frequency
  57. ///\param _string_set the input string set to sample
  58. ///\param _filter the kmer Bloom filter
  59. ///
  60. NVBIO_HOST_DEVICE
  61. SampleKmersFunctor(
  62. const uint32 _k,
  63. const float _alpha,
  64. const string_set_type _string_set,
  65. filter_type _filter) :
  66. k(_k), kmask( (uint64(1u) << (k*2))-1u ), alpha( _alpha ), string_set( _string_set ), filter(_filter) {}
  67. /// functor operator
  68. ///
  69. ///\param i input string index
  70. ///
  71. NVBIO_HOST_DEVICE
  72. void operator() (const uint32 i) const
  73. {
  74. typedef typename string_set_type::string_type string_type;
  75. typedef typename string_traits<string_type>::forward_iterator forward_iterator;
  76. // fetch the i-th string
  77. const string_type string = string_set[i];
  78. const uint32 len = length( string );
  79. if (len < k)
  80. return;
  81. // build a forward string iterator
  82. forward_iterator it( string.begin() );
  83. // start with an empty kmer
  84. uint64 kmer = 0u;
  85. uint32 kmer_len = 0u;
  86. // initialie a random number generator
  87. LCG_random random( hash(i) );
  88. for (uint32 j = 0; j < len; ++j)
  89. {
  90. // fetch the next character
  91. const uint8 c = *it; ++it;
  92. if (c < 4) // make sure this is not an N
  93. {
  94. kmer |= c; // insert the new character at the end of the kmer (in a big-endian encoding)
  95. if (kmer_len < k)
  96. kmer_len++;
  97. if (kmer_len >= k) // check whether we have an actual 'k'-mer
  98. {
  99. if (float( random.next() ) / float(LCG_random::MAX) < alpha)
  100. {
  101. // insert the kmer
  102. filter.insert( kmer );
  103. }
  104. }
  105. // shift the kmer to the right, dropping the last symbol
  106. kmer <<= 2;
  107. kmer &= kmask;
  108. }
  109. else
  110. {
  111. // an N, skip all k-mers containing it
  112. it += k-1;
  113. j += k-1;
  114. // and reset the kmer
  115. kmer = 0u;
  116. kmer_len = 0u;
  117. }
  118. }
  119. }
  120. const uint32 k;
  121. const uint64 kmask;
  122. const float alpha;
  123. string_set_type string_set;
  124. mutable filter_type filter;
  125. };
  126. // process the next batch
  127. //
  128. bool SampleKmersStage::process(PipelineContext& context)
  129. {
  130. typedef nvbio::io::SequenceDataAccess<DNA_N>::sequence_string_set_type string_set_type;
  131. // declare the Bloom filter type
  132. typedef nvbio::blocked_bloom_filter<hash_functor1, hash_functor2, uint64_2*> filter_type;
  133. typedef SampleKmersFunctor<string_set_type,filter_type> functor_type;
  134. // fetch the input
  135. nvbio::io::SequenceDataHost* h_read_data = context.input<nvbio::io::SequenceDataHost>( 0 );
  136. float time = 0.0f;
  137. // introduce a timing scope
  138. try
  139. {
  140. const nvbio::ScopedTimer<float> timer( &time );
  141. if (device >= 0)
  142. {
  143. //
  144. // Device (GPU) path
  145. //
  146. // set the device
  147. cudaSetDevice( device );
  148. // copy it to the device
  149. nvbio::io::SequenceDataDevice d_read_data( *h_read_data );
  150. // build a view
  151. const nvbio::io::SequenceDataAccess<DNA_N> d_read_view( d_read_data );
  152. // build the Bloom filter
  153. filter_type filter( SAMPLED_KMERS_FILTER_K, filter_size, (uint64_2*)filter_storage );
  154. //filter_type filter( filter_size, filter_storage );
  155. // build the kmer sampling functor
  156. const functor_type kmer_filter(
  157. k,
  158. alpha,
  159. d_read_view.sequence_string_set(),
  160. filter );
  161. device_for_each( d_read_view.size(), kmer_filter );
  162. cudaDeviceSynchronize();
  163. cuda::check_error("sample-kmers");
  164. }
  165. else
  166. {
  167. //
  168. // Host (CPU) path
  169. //
  170. omp_set_num_threads( -device );
  171. // build a view
  172. const io::SequenceDataAccess<DNA_N> h_read_view( *h_read_data );
  173. // build the Bloom filter
  174. filter_type filter( SAMPLED_KMERS_FILTER_K, filter_size, (uint64_2*)filter_storage );
  175. // build the kmer sampling functor
  176. const functor_type kmer_filter(
  177. k,
  178. alpha,
  179. h_read_view.sequence_string_set(),
  180. filter );
  181. host_for_each(
  182. h_read_view.size(),
  183. kmer_filter );
  184. }
  185. }
  186. catch (nvbio::cuda_error e)
  187. {
  188. log_error(stderr, "[SampleKmersStage] caught a nvbio::cuda_error exception:\n");
  189. log_error(stderr, " %s\n", e.what());
  190. exit(1);
  191. }
  192. catch (nvbio::bad_alloc e)
  193. {
  194. log_error(stderr, "[SampleKmersStage] caught a nvbio::bad_alloc exception:\n");
  195. log_error(stderr, " %s\n", e.what());
  196. exit(1);
  197. }
  198. catch (nvbio::logic_error e)
  199. {
  200. log_error(stderr, "[SampleKmersStage] caught a nvbio::logic_error exception:\n");
  201. log_error(stderr, " %s\n", e.what());
  202. exit(1);
  203. }
  204. catch (nvbio::runtime_error e)
  205. {
  206. log_error(stderr, "[SampleKmersStage] caught a nvbio::runtime_error exception:\n");
  207. log_error(stderr, " %s\n", e.what());
  208. exit(1);
  209. }
  210. catch (thrust::system::system_error e)
  211. {
  212. log_error(stderr, "[SampleKmersStage] caught a thrust::system_error exception:\n");
  213. log_error(stderr, " %s\n", e.what());
  214. exit(1);
  215. }
  216. catch (std::bad_alloc e)
  217. {
  218. log_error(stderr, "[SampleKmersStage] caught a std::bad_alloc exception:\n");
  219. log_error(stderr, " %s\n", e.what());
  220. exit(1);
  221. }
  222. catch (std::logic_error e)
  223. {
  224. log_error(stderr, "[SampleKmersStage] caught a std::logic_error exception:\n");
  225. log_error(stderr, " %s\n", e.what());
  226. exit(1);
  227. }
  228. catch (std::runtime_error e)
  229. {
  230. log_error(stderr, "[SampleKmersStage] caught a std::runtime_error exception:\n");
  231. log_error(stderr, " %s\n", e.what());
  232. exit(1);
  233. }
  234. catch (...)
  235. {
  236. log_error(stderr, "[SampleKmersStage] caught an unknown exception!\n");
  237. exit(1);
  238. }
  239. // update the time stats
  240. stats->m_mutex.lock();
  241. stats->m_time += time;
  242. log_info(stderr, "\r processed reads [%llu, %llu] (%.1fM / %.2fG bps, %.1fK reads/s, %.1fM bps/s - %s<%d>) ",
  243. stats->m_reads,
  244. stats->m_reads + h_read_data->size(),
  245. 1.0e-6f * (h_read_data->bps()),
  246. 1.0e-9f * (stats->m_bps + h_read_data->bps()),
  247. stats->m_time ? (1.0e-3f * (stats->m_reads + h_read_data->size())) / stats->m_time : 0.0f,
  248. stats->m_time ? (1.0e-6f * (stats->m_bps + h_read_data->bps() )) / stats->m_time : 0.0f,
  249. device >= 0 ? "gpu" : "cpu",
  250. device >= 0 ? device : -device );
  251. log_debug_cont(stderr, "\n");
  252. log_debug(stderr," peak memory : %.1f GB\n", float( peak_resident_memory() ) / float(1024*1024*1024));
  253. stats->m_reads += h_read_data->size();
  254. stats->m_bps += h_read_data->bps();
  255. stats->m_mutex.unlock();
  256. return true;
  257. }
  258. ///
  259. /// A functor to sample kmers and insert them in a Bloom filter
  260. ///
  261. template <typename string_set_type, typename sampled_filter_type, typename trusted_filter_type, typename threshold_type>
  262. struct TrustedKmersFunctor
  263. {
  264. /// constructor
  265. ///
  266. ///\param _k kmer length
  267. ///\param _alpha the sampling frequency
  268. ///\param _string_set the input string set to sample
  269. ///\param _filter the kmer Bloom filter
  270. ///
  271. NVBIO_HOST_DEVICE
  272. TrustedKmersFunctor(
  273. const uint32 _k,
  274. const string_set_type _string_set,
  275. const sampled_filter_type _sampled_filter,
  276. trusted_filter_type _trusted_filter,
  277. const threshold_type _threshold) :
  278. k(_k), kmask( (uint64(1u) << (k*2))-1u ),
  279. string_set( _string_set ),
  280. sampled_filter(_sampled_filter),
  281. trusted_filter(_trusted_filter),
  282. threshold(_threshold) {}
  283. /// functor operator
  284. ///
  285. ///\param i input string index
  286. ///
  287. NVBIO_HOST_DEVICE
  288. void operator() (const uint32 i) const
  289. {
  290. typedef typename string_set_type::string_type string_type;
  291. typedef nvbio::StringPrefetcher< string_type, nvbio::lmem_cache_tag<MAX_READ_LENGTH> > string_prefetcher_type;
  292. typedef typename string_prefetcher_type::string_type local_string_type;
  293. typedef typename nvbio::string_traits<local_string_type>::forward_iterator forward_iterator;
  294. //bool occur[MAX_READ_LENGTH];
  295. uint32 occur_storage[MAX_READ_LENGTH/32];
  296. nvbio::PackedStream<uint32*,uint8,1u,false> occur( occur_storage );
  297. // instantiate a prefetcher
  298. string_prefetcher_type string_prefetcher;
  299. // fetch the i-th string
  300. //const string_type string = string_set[i];
  301. const local_string_type string = string_prefetcher.load( string_set[i] );
  302. const uint32 len = length( string );
  303. if (len < k)
  304. return;
  305. // build a forward string iterator
  306. forward_iterator it( string.begin() );
  307. // start with an empty kmer
  308. uint64 kmer = 0u;
  309. uint32 kmer_len = 0u;
  310. const int32 occur_cnt = len - k + 1;
  311. // initialize all to false
  312. for (uint32 j = 0; j < (occur_cnt+31)/32; ++j)
  313. occur_storage[j] = 0u;
  314. // mark occurring kmers
  315. for (uint32 j = 0; j < len; ++j)
  316. {
  317. // fetch the next character
  318. const uint8 c = *it; ++it;
  319. if (c < 4) // make sure this is not an N
  320. {
  321. kmer |= c; // insert the new character at the end of the kmer (in a big-endian encoding)
  322. if (kmer_len < k)
  323. kmer_len++;
  324. if (kmer_len >= k) // check whether we have an actual 'k'-mer
  325. {
  326. if (sampled_filter[ kmer ])
  327. occur[j - k + 1] = true;
  328. }
  329. // shift the kmer to the right, dropping the last symbol
  330. kmer <<= 2;
  331. kmer &= kmask;
  332. }
  333. else
  334. {
  335. // an N, skip all kmers containing it
  336. it += k-1;
  337. j += k-1;
  338. // and reset the kmer
  339. kmer = 0u;
  340. kmer_len = 0u;
  341. }
  342. }
  343. // mark trusted kmers
  344. int32 zero_cnt = 0;
  345. int32 one_cnt = 0;
  346. // reset the forward iterator
  347. it = forward_iterator( string.begin() );
  348. // start with an empty kmer
  349. kmer = 0u;
  350. kmer_len = 0u;
  351. // keep a k-bits mask of trusted positions
  352. const uint64 trusted_mask = (uint64(1u) << k) - 1u;
  353. uint64 trusted = 0u;
  354. for (uint32 j = 0; j < len; ++j)
  355. {
  356. if (j >= k)
  357. {
  358. if (occur[j - k]) --one_cnt;
  359. else --zero_cnt;
  360. }
  361. if (j < occur_cnt)
  362. {
  363. if (occur[j]) ++one_cnt;
  364. else ++zero_cnt;
  365. }
  366. const int32 sum = one_cnt + zero_cnt;
  367. //if (qual[j] <= bad_quality)
  368. //{
  369. // trusted[j] = false;
  370. // continue ;
  371. //}
  372. trusted |= (one_cnt > threshold[sum]) ? 1u : 0u;
  373. // fetch the next character
  374. const uint8 c = *it; ++it;
  375. if (c < 4) // if an N, skip it (the kmers containing it will be marked as untrusted and skipped as well)
  376. {
  377. kmer |= c; // insert the new character at the end of the kmer (in a big-endian encoding)
  378. if (popc( trusted ) == k) // check whether we have an actual 'k'-mer - i.e. k trusted positions in a row
  379. trusted_filter.insert( kmer );
  380. }
  381. // shift the kmer to the right, dropping the last symbol
  382. kmer <<= 2;
  383. kmer &= kmask;
  384. // shift the trusted bits by one to the right, dropping the last symbol
  385. trusted <<= 1;
  386. trusted &= trusted_mask;
  387. }
  388. }
  389. const uint32 k;
  390. const uint64 kmask;
  391. string_set_type string_set;
  392. const sampled_filter_type sampled_filter;
  393. mutable trusted_filter_type trusted_filter;
  394. const threshold_type threshold;
  395. };
  396. // process the next batch
  397. //
  398. bool TrustedKmersStage::process(PipelineContext& context)
  399. {
  400. typedef nvbio::io::SequenceDataAccess<DNA_N>::sequence_string_set_type string_set_type;
  401. // fetch the input
  402. nvbio::io::SequenceDataHost* h_read_data = context.input<nvbio::io::SequenceDataHost>( 0 );
  403. float time = 0.0f;
  404. // introduce a timing scope
  405. try
  406. {
  407. const nvbio::ScopedTimer<float> timer( &time );
  408. if (device >= 0)
  409. {
  410. //
  411. // Device (GPU) path
  412. //
  413. // declare the Bloom filter types
  414. typedef nvbio::blocked_bloom_filter<hash_functor1, hash_functor2, nvbio::cuda::ldg_pointer<uint4> > sampled_filter_type;
  415. typedef nvbio::blocked_bloom_filter<hash_functor1, hash_functor2, uint64_2*> trusted_filter_type;
  416. typedef TrustedKmersFunctor<string_set_type,sampled_filter_type,trusted_filter_type, cuda::ldg_pointer<uint32> > functor_type;
  417. // set the device
  418. cudaSetDevice( device );
  419. // copy it to the device
  420. io::SequenceDataDevice d_read_data( *h_read_data );
  421. // build a view
  422. const io::SequenceDataAccess<DNA_N> d_read_view( d_read_data );
  423. // build the Bloom filter
  424. sampled_filter_type sampled_filter( SAMPLED_KMERS_FILTER_K, sampled_filter_size, (const uint4*)sampled_filter_storage );
  425. trusted_filter_type trusted_filter( TRUSTED_KMERS_FILTER_K, trusted_filter_size, (uint64_2*)trusted_filter_storage );
  426. // build the kmer sampling functor
  427. const functor_type kmer_filter(
  428. k,
  429. d_read_view.sequence_string_set(),
  430. sampled_filter,
  431. trusted_filter,
  432. cuda::make_ldg_pointer(threshold) );
  433. // and apply the functor to all reads in the batch
  434. device_for_each(
  435. d_read_view.size(),
  436. kmer_filter );
  437. cudaDeviceSynchronize();
  438. cuda::check_error("mark-trusted-kmers");
  439. }
  440. else
  441. {
  442. //
  443. // Host (CPU) path
  444. //
  445. omp_set_num_threads( -device );
  446. // declare the Bloom filter types
  447. typedef nvbio::blocked_bloom_filter<hash_functor1, hash_functor2, const uint64_2*> sampled_filter_type;
  448. typedef nvbio::blocked_bloom_filter<hash_functor1, hash_functor2, uint64_2*> trusted_filter_type;
  449. typedef TrustedKmersFunctor<string_set_type,sampled_filter_type,trusted_filter_type,const uint32*> functor_type;
  450. // build a view
  451. const nvbio::io::SequenceDataAccess<DNA_N> h_read_view( *h_read_data );
  452. // build the Bloom filter
  453. sampled_filter_type sampled_filter( SAMPLED_KMERS_FILTER_K, sampled_filter_size, (const uint64_2*)sampled_filter_storage );
  454. trusted_filter_type trusted_filter( TRUSTED_KMERS_FILTER_K, trusted_filter_size, (uint64_2*)trusted_filter_storage );
  455. // build the kmer sampling functor
  456. const TrustedKmersFunctor<string_set_type,sampled_filter_type,trusted_filter_type,const uint32*> kmer_filter(
  457. k,
  458. h_read_view.sequence_string_set(),
  459. sampled_filter,
  460. trusted_filter,
  461. threshold );
  462. // and apply the functor to all reads in the batch
  463. host_for_each(
  464. h_read_view.size(),
  465. kmer_filter );
  466. }
  467. }
  468. catch (nvbio::cuda_error e)
  469. {
  470. log_error(stderr, "[TrustedKmersStage] caught a nvbio::cuda_error exception:\n");
  471. log_error(stderr, " %s\n", e.what());
  472. exit(1);
  473. }
  474. catch (nvbio::bad_alloc e)
  475. {
  476. log_error(stderr, "[TrustedKmersStage] caught a nvbio::bad_alloc exception:\n");
  477. log_error(stderr, " %s\n", e.what());
  478. exit(1);
  479. }
  480. catch (nvbio::logic_error e)
  481. {
  482. log_error(stderr, "[TrustedKmersStage] caught a nvbio::logic_error exception:\n");
  483. log_error(stderr, " %s\n", e.what());
  484. exit(1);
  485. }
  486. catch (nvbio::runtime_error e)
  487. {
  488. log_error(stderr, "[TrustedKmersStage] caught a nvbio::runtime_error exception:\n");
  489. log_error(stderr, " %s\n", e.what());
  490. exit(1);
  491. }
  492. catch (thrust::system::system_error e)
  493. {
  494. log_error(stderr, "[TrustedKmersStage] caught a thrust::system_error exception:\n");
  495. log_error(stderr, " %s\n", e.what());
  496. exit(1);
  497. }
  498. catch (std::bad_alloc e)
  499. {
  500. log_error(stderr, "[TrustedKmersStage] caught a std::bad_alloc exception:\n");
  501. log_error(stderr, " %s\n", e.what());
  502. exit(1);
  503. }
  504. catch (std::logic_error e)
  505. {
  506. log_error(stderr, "[TrustedKmersStage] caught a std::logic_error exception:\n");
  507. log_error(stderr, " %s\n", e.what());
  508. exit(1);
  509. }
  510. catch (std::runtime_error e)
  511. {
  512. log_error(stderr, "[TrustedKmersStage] caught a std::runtime_error exception:\n");
  513. log_error(stderr, " %s\n", e.what());
  514. exit(1);
  515. }
  516. catch (...)
  517. {
  518. log_error(stderr, "[TrustedKmersStage] caught an unknown exception!\n");
  519. exit(1);
  520. }
  521. // update the time stats
  522. stats->m_mutex.lock();
  523. stats->m_time += time;
  524. log_info(stderr, "\r processed reads [%llu, %llu] (%.1fM / %.2fG bps, %.1fK reads/s, %.1fM bps/s - %s<%d>) ",
  525. stats->m_reads,
  526. stats->m_reads + h_read_data->size(),
  527. 1.0e-6f * (h_read_data->bps()),
  528. 1.0e-9f * (stats->m_bps + h_read_data->bps()),
  529. stats->m_time ? (1.0e-3f * (stats->m_reads + h_read_data->size())) / stats->m_time : 0.0f,
  530. stats->m_time ? (1.0e-6f * (stats->m_bps + h_read_data->bps() )) / stats->m_time : 0.0f,
  531. device >= 0 ? "gpu" : "cpu",
  532. device >= 0 ? device : -device );
  533. log_debug_cont(stderr, "\n");
  534. log_debug(stderr," peak memory : %.1f GB\n", float( peak_resident_memory() ) / float(1024*1024*1024));
  535. stats->m_reads += h_read_data->size();
  536. stats->m_bps += h_read_data->bps();
  537. stats->m_mutex.unlock();
  538. return true;
  539. }