nvLighter.cu 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. /*
  2. * nvbio
  3. * Copyright (c) 2011-2014, NVIDIA CORPORATION. All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. * * Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * * Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. * * Neither the name of the NVIDIA CORPORATION nor the
  13. * names of its contributors may be used to endorse or promote products
  14. * derived from this software without specific prior written permission.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. * DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
  20. * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. // nvLighter.cu
  28. //
  29. //#define NVBIO_CUDA_DEBUG
  30. #include <cub/cub.cuh>
  31. #include <zlib/zlib.h>
  32. #include <nvbio/basic/omp.h>
  33. #include "bloom_filters.h"
  34. #include "input_thread.h"
  35. #include "output_thread.h"
  36. #include "sample_kmers.h"
  37. #include "error_correct.h"
  38. #include <nvbio/basic/pipeline.h>
  39. #include <nvbio/basic/numbers.h>
  40. #include <nvbio/basic/bloom_filter.h>
  41. #include <nvbio/basic/timer.h>
  42. #include <nvbio/basic/shared_pointer.h>
  43. #include <nvbio/basic/exceptions.h>
  44. #include <nvbio/basic/primitives.h>
  45. #include <nvbio/basic/vector.h>
  46. #include <nvbio/basic/system.h>
  47. #include <nvbio/strings/string_set.h>
  48. #include <nvbio/io/sequence/sequence.h>
  49. #include <stdio.h>
  50. #include <stdlib.h>
  51. #include <vector>
  52. #include <algorithm>
  53. using namespace nvbio;
  54. void cumulative_binomial_distribution(double F[], uint32 l, double p)
  55. {
  56. // p is the probability of getting 1
  57. double coef = 1 ;
  58. double exp = pow( 1 - p, l );
  59. F[0] = pow( 1 - p, l );
  60. for (uint32 i = 1 ; i <= l ; ++i)
  61. {
  62. coef = coef / i * (l - i + 1);
  63. exp = exp / (1 - p) * p;
  64. F[i] = F[i-1] + coef * exp;
  65. }
  66. }
  67. char get_bad_quality(nvbio::io::SequenceDataStream* reads_file)
  68. {
  69. int i;
  70. int histogram1[300], histogram2[300];
  71. // fetch some reads
  72. io::SequenceDataHost reads;
  73. memset( histogram1, 0, sizeof( histogram1 )) ;
  74. memset( histogram2, 0, sizeof( histogram2 ) ) ;
  75. int n_reads = 0;
  76. for (int batch = 0 ; batch < 100; ++batch)
  77. {
  78. const int ret = nvbio::io::next( ASCII, &reads, reads_file, 10000, 10000000 );
  79. if (ret == 0)
  80. break;
  81. const nvbio::io::SequenceDataAccess<ASCII> reads_view( reads );
  82. typedef nvbio::io::SequenceDataAccess<ASCII>::qual_string qual_string;
  83. for (uint32 i = 0; i < reads.size(); ++i)
  84. {
  85. const qual_string quals = reads_view.get_quals(i);
  86. ++histogram2[ (int)quals[ quals.length() - 1 ] ];
  87. ++histogram1[ (int)quals[0] ];
  88. }
  89. n_reads += reads.size();
  90. }
  91. // rewind the file
  92. if (reads_file->rewind() == false)
  93. {
  94. log_error(stderr, " failed rewinding reads file\n");
  95. return 1;
  96. }
  97. int cnt = 0;
  98. for (i = 0 ; i < 300; ++i)
  99. {
  100. cnt += histogram1[i];
  101. if (cnt > n_reads * 0.05f)
  102. break;
  103. }
  104. const int t1 = i - 1;
  105. cnt = 0;
  106. for (i = 0; i < 300 ; ++i)
  107. {
  108. cnt += histogram2[i];
  109. if (cnt > n_reads * 0.05f)
  110. break;
  111. }
  112. const int t2 = i;
  113. return (char)nvbio::min( t1, t2 );
  114. }
  115. float infer_alpha(nvbio::io::SequenceDataStream* reads_file, const uint64 genome_size)
  116. {
  117. log_info(stderr, " inferring alpha... started\n" );
  118. nvbio::io::SequenceDataHost reads;
  119. uint64 n_reads = 0;
  120. uint64 n_bps = 0;
  121. float time = 0.0f;
  122. while (1)
  123. {
  124. Timer timer;
  125. timer.start();
  126. const int ret = nvbio::io::next( ASCII, &reads, reads_file, 512*1024, 128*1024*1024 );
  127. if (ret == 0)
  128. break;
  129. timer.stop();
  130. time += timer.seconds();
  131. log_verbose(stderr, "\r input: %llu reads, %.2f%c bps (%.1f Mbps/s) ",
  132. n_reads, n_bps >= 1.0e9f ? 'B' : 'M',
  133. float(n_bps)*(n_bps >= 1.0e-9f ? 1.0e-9f : 1.0e-6f),
  134. 1.0e-6f * float(n_bps)/time );
  135. n_reads += reads.size();
  136. n_bps += reads.bps();
  137. }
  138. log_verbose_cont(stderr, "\n");
  139. const float coverage = float( double( n_bps ) / double( genome_size ) );
  140. const float alpha = 7.0f / coverage;
  141. log_info(stderr, " inferring alpha... done\n" );
  142. log_stats(stderr, " input: %llu reads, %.2f%c bps, %.3fx coverage\n",
  143. n_reads, n_bps >= 1.0e9f ? 'B' : 'M',
  144. float(n_bps)*(n_bps >= 1.0e-9f ? 1.0e-9f : 1.0e-6f),
  145. coverage );
  146. log_visible(stderr, " inferred alpha: %f\n", alpha );
  147. // rewind the file
  148. if (reads_file->rewind() == false)
  149. {
  150. log_error(stderr, " failed rewinding reads file\n");
  151. return 1;
  152. }
  153. return alpha;
  154. }
  155. int main(int argc, char* argv[])
  156. {
  157. if ((argc < 3) || (strcmp( argv[1], "--help" ) == 0))
  158. {
  159. log_visible(stderr, "nvLighter - Copyright 2015, NVIDIA Corporation\n");
  160. log_info(stderr, "usage:\n");
  161. log_info(stderr, " nvLighter [options] input_file output_file\n");
  162. log_info(stderr, " options:\n");
  163. log_info(stderr, " -v int (0-6) [5] # verbosity level\n");
  164. log_info(stderr, " -zlib string [1R] # e.g. \"1\", ..., \"9\", \"1R\"\n");
  165. log_info(stderr, " -t int [auto] # number of CPU threads\n");
  166. log_info(stderr, " -d int [0] # add the specified GPU device\n");
  167. log_info(stderr, " -k k-mer genome-size alpha # error correction parameters\n");
  168. log_info(stderr, " -K k-mer genome-size # error correction parameters\n");
  169. log_info(stderr, " -maxcor int [4] # maximum correction factor\n");
  170. log_info(stderr, " -newQual int [disabled] # new quality score value\n");
  171. log_info(stderr, " -no-cpu # disable CPU usage\n");
  172. log_info(stderr, " -no-gpu # disable GPU usage\n");
  173. return 0;
  174. }
  175. const char* reads_name = argv[argc-2];
  176. const char* output_name = argv[argc-1];
  177. const char* comp_level = "1R";
  178. io::QualityEncoding qencoding = io::Phred;
  179. int threads = 0;
  180. uint32 k = 11u;
  181. uint64 genome_size = 0;
  182. float alpha = 0.0;
  183. float max_correction = 4.0f;
  184. char new_quality = 0;
  185. float bf_factor = 1.0f; // original: 1.5
  186. bool cpu = true;
  187. bool gpu = true;
  188. std::vector<int> devices(0);
  189. for (int i = 0; i < argc - 2; ++i)
  190. {
  191. if ((strcmp( argv[i], "-v" ) == 0) ||
  192. (strcmp( argv[i], "-verbosity" ) == 0) ||
  193. (strcmp( argv[i], "--verbosity" ) == 0))
  194. {
  195. set_verbosity( Verbosity( atoi( argv[++i] ) ) );
  196. }
  197. else if ((strcmp( argv[i], "-c" ) == 0) ||
  198. (strcmp( argv[i], "--compression" ) == 0) ||
  199. (strcmp( argv[i], "-zlib" ) == 0)) // setup compression level
  200. {
  201. comp_level = argv[++i];
  202. }
  203. else if ((strcmp( argv[i], "-t" ) == 0) ||
  204. (strcmp( argv[i], "-threads" ) == 0) ||
  205. (strcmp( argv[i], "--threads" ) == 0)) // setup number of threads
  206. {
  207. threads = atoi( argv[++i] );
  208. }
  209. else if ((strcmp( argv[i], "-d" ) == 0) ||
  210. (strcmp( argv[i], "-device" ) == 0) ||
  211. (strcmp( argv[i], "--device" ) == 0)) // add a device
  212. {
  213. devices.push_back( atoi( argv[++i] ) );
  214. }
  215. else if ((strcmp( argv[i], "-no-cpu" ) == 0) || // remove CPU
  216. (strcmp( argv[i], "--no-cpu" ) == 0))
  217. cpu = false;
  218. else if ((strcmp( argv[i], "-no-gpu" ) == 0) || // remove GPU
  219. (strcmp( argv[i], "--no-gpu" ) == 0))
  220. gpu = false;
  221. else if (strcmp( argv[i], "-k" ) == 0) // setup kmer length, genome size and sampling frequency
  222. {
  223. k = atoi( argv[++i] );
  224. genome_size = atol( argv[++i] );
  225. alpha = atof( argv[++i] );
  226. }
  227. else if (strcmp( argv[i], "-K" ) == 0) // setup kmer length, genome size and sampling frequency
  228. {
  229. k = atoi( argv[++i] );
  230. genome_size = atol( argv[++i] );
  231. alpha = 0.0f;
  232. }
  233. else if (strcmp( argv[i], "-maxcor" ) == 0) // setup max correction factor
  234. max_correction = (float)atoi( argv[++i] );
  235. else if (strcmp( argv[i], "-newQual" ) == 0) // setup new quality
  236. new_quality = argv[++i][0];
  237. else if (strcmp( argv[i], "-bf" ) == 0) // Bloom filter expansion factor
  238. bf_factor = atof( argv[++i] );
  239. }
  240. // if no devices were specified, and the gpu is enabled, pick GPU 0
  241. if (gpu && (devices.size() == 0))
  242. devices.push_back(0);
  243. uint32 device_count = uint32( devices.size() );
  244. // check whether the genome size has been specified
  245. if (genome_size == 0u)
  246. {
  247. log_error(stderr, "must specify the k-mer and genome size with the option: -k k genome-size alpha\n" );
  248. return 1;
  249. }
  250. try
  251. {
  252. log_visible(stderr,"nvLighter... started\n");
  253. // compute the optimal Bloom filter size scaling factors
  254. const float sampled_kmers_bf_factor = optimal_bloom_filter_bits_per_key( 0.01f );
  255. const float trusted_kmers_bf_factor = optimal_bloom_filter_bits_per_key( 0.0005f );
  256. log_verbose(stderr, " optimal m(0.01f) = %.2f\n", sampled_kmers_bf_factor );
  257. log_verbose(stderr, " optimal k(0.01f) = %u\n", optimal_bloom_filter_hashes( sampled_kmers_bf_factor ) );
  258. log_verbose(stderr, " optimal m(0.0005f) = %.2f\n", trusted_kmers_bf_factor );
  259. log_verbose(stderr, " optimal k(0.0005f) = %u\n", optimal_bloom_filter_hashes( trusted_kmers_bf_factor ) );
  260. // compute the Bloom filter sizes, in words
  261. const uint64 sampled_kmers_bf_words = align<8u>( uint64( float(genome_size) * bf_factor * (sampled_kmers_bf_factor / 32.0f) ) );
  262. const uint64 trusted_kmers_bf_words = align<8u>( uint64( float(genome_size) * bf_factor * (trusted_kmers_bf_factor / 32.0f) ) );
  263. const uint32 bits_per_word = 32u;
  264. // now set the number of CPU threads
  265. threads = threads > 0 ? threads : omp_get_num_procs();
  266. //omp_set_num_threads( threads );
  267. omp_set_num_threads( omp_get_num_procs() ); // use all threads for the merging steps...
  268. omp_set_nested(1);
  269. // setup the device Bloom filters
  270. BloomFilters<host_tag> h_bloom_filters;
  271. BloomFilters<host_tag>* h_bloom_filters_ptr = &h_bloom_filters;
  272. BloomFilters<device_tag> d_bloom_filters[16];
  273. for (int i = device_count-1; i >= 0; --i)
  274. {
  275. if (d_bloom_filters[i].setup( devices[i], sampled_kmers_bf_words, trusted_kmers_bf_words ) == false)
  276. devices.erase( devices.begin() + i );
  277. }
  278. device_count = uint32( devices.size() );
  279. if (gpu && (device_count == 0))
  280. {
  281. log_warning(stderr, " no available GPU devices\n");
  282. // revert to using the CPU even if -no-cpu was specified
  283. if (cpu == false)
  284. {
  285. cpu = true;
  286. log_warning(stderr, " switching the CPU on\n");
  287. }
  288. }
  289. if (cpu && h_bloom_filters.setup( -1, sampled_kmers_bf_words, trusted_kmers_bf_words ) == false)
  290. cpu = false;
  291. if (cpu == false)
  292. {
  293. h_bloom_filters_ptr = NULL;
  294. if (device_count == 0)
  295. {
  296. log_error(stderr, " no available CPU or GPU devices\n");
  297. return 1;
  298. }
  299. }
  300. //
  301. // open the output file
  302. //
  303. log_info(stderr, " opening output file \"%s\"... started\n", output_name);
  304. SharedPointer<nvbio::io::SequenceDataOutputStream> output_file(
  305. nvbio::io::open_output_sequence_file(
  306. output_name,
  307. comp_level ) );
  308. if (output_file == NULL || output_file->is_ok() == false)
  309. {
  310. log_error(stderr, " failed opening output \"%s\"\n", output_name);
  311. return 1;
  312. }
  313. log_info(stderr, " opening output file \"%s\"... done\n", output_name);
  314. //
  315. // open the reads file
  316. //
  317. uint32 max_block_strings = 512*1024;
  318. uint32 max_block_bps = 64*1024*1024;
  319. log_info(stderr, " opening read file \"%s\"... started\n", reads_name);
  320. SharedPointer<nvbio::io::SequenceDataInputStream> read_data_file(
  321. nvbio::io::open_sequence_file(
  322. reads_name,
  323. qencoding,
  324. uint32(-1),
  325. uint32(-1),
  326. io::FORWARD )
  327. );
  328. if (read_data_file == NULL || read_data_file->is_ok() == false)
  329. {
  330. log_error(stderr, " failed opening file \"%s\"\n", reads_name);
  331. return 1;
  332. }
  333. log_info(stderr, " opening read file \"%s\"... done\n", reads_name);
  334. // infer alpha if necessary
  335. if (alpha <= 0.0f)
  336. {
  337. alpha = infer_alpha( read_data_file.get(), genome_size );
  338. if (alpha >= 1.0f)
  339. {
  340. log_error(stderr, " alpha cannot be greater than 1, coverage likely too low\n");
  341. exit(1);
  342. }
  343. }
  344. const char bad_quality = get_bad_quality( read_data_file.get() );
  345. log_info(stderr, " bad quality threshold: '%c'\n", bad_quality);
  346. log_info(stderr," sample kmers... started\n");
  347. {
  348. //
  349. // The following code implements a parallel nvbio::Pipeline to sample kmers from the input
  350. // reads. The pipeline is composed several subpipelines, one per active device, each made
  351. // of two stages (which will be run in separate threads): an InputStage, and a
  352. // SampleKmersStage doing the actual sampling work.
  353. //
  354. log_debug(stderr, " assemble pipeline\n");
  355. // build the input stage
  356. InputStageData input_stage_data( read_data_file.get(), max_block_strings, max_block_bps );
  357. InputStage input_stage[16];
  358. // build the sink
  359. SampleKmersStage sample_stage[16];
  360. SequenceStats sample_stats;
  361. // setup the pipeline stages
  362. if (cpu)
  363. {
  364. // host stages
  365. input_stage[device_count] = InputStage( &input_stage_data );
  366. sample_stage[device_count] = SampleKmersStage(
  367. -threads,
  368. k,
  369. alpha,
  370. sampled_kmers_bf_words * bits_per_word,
  371. raw_pointer( h_bloom_filters.sampled_kmers_storage ),
  372. &sample_stats );
  373. }
  374. for (uint32 i = 0; i < device_count; ++i)
  375. {
  376. // device stages
  377. input_stage[i] = InputStage( &input_stage_data );
  378. sample_stage[i] = SampleKmersStage(
  379. devices[i],
  380. k,
  381. alpha,
  382. sampled_kmers_bf_words * bits_per_word,
  383. raw_pointer( d_bloom_filters[i].sampled_kmers_storage ),
  384. &sample_stats );
  385. }
  386. // build the pipeline
  387. nvbio::Pipeline pipeline;
  388. for (uint32 i = 0; i < device_count + (cpu ? 1 : 0); ++i)
  389. {
  390. const uint32 in0 = pipeline.append_stage( &input_stage[i], 4u );
  391. const uint32 out = pipeline.append_sink( &sample_stage[i] );
  392. pipeline.add_dependency( in0, out );
  393. }
  394. log_debug(stderr, " start pipeline\n");
  395. Timer timer;
  396. timer.start();
  397. // and run it!
  398. pipeline.run();
  399. log_info_cont(stderr, "\n");
  400. merge( h_bloom_filters_ptr, device_count, d_bloom_filters, SAMPLED_KMERS );
  401. timer.stop();
  402. const float time = timer.seconds();
  403. log_verbose(stderr," total time : %.1fs\n", time);
  404. log_verbose(stderr," peak memory : %.1f GB\n", float( peak_resident_memory() ) / float(1024*1024*1024));
  405. }
  406. log_info(stderr," sample kmers... done\n");
  407. log_info(stderr," mark trusted kmers... started\n");
  408. {
  409. //
  410. // The following code implements a parallel nvbio::Pipeline to mark trusted kmers in the input
  411. // reads. The pipeline is composed several subpipelines, one per active device, each made
  412. // of two stages (which will be run in separate threads): an InputStage, and a
  413. // TrustedKmersStage doing the actual marking work.
  414. //
  415. // gather Bloom filter statistics
  416. nvbio::vector<host_tag,uint32> threshold( 100, 0u );
  417. {
  418. double untrustF[100][100];
  419. // compute the number of bits set
  420. float occupancy;
  421. float approx_size;
  422. float FP;
  423. if (device_count)
  424. {
  425. compute_bloom_filter_stats(
  426. d_bloom_filters[0],
  427. SAMPLED_KMERS,
  428. SAMPLED_KMERS_FILTER_K,
  429. occupancy,
  430. approx_size,
  431. FP );
  432. }
  433. else
  434. {
  435. compute_bloom_filter_stats(
  436. h_bloom_filters,
  437. SAMPLED_KMERS,
  438. SAMPLED_KMERS_FILTER_K,
  439. occupancy,
  440. approx_size,
  441. FP );
  442. }
  443. log_stats(stderr, " sampled kmers:\n" );
  444. log_stats(stderr, " occupancy : %f\n", occupancy );
  445. log_stats(stderr, " #kmers (approx) : %.1f\n", approx_size );
  446. log_stats(stderr, " FP rate : %f\n", FP );
  447. // compute the i-th untrustF table
  448. for (uint32 i = 1; i <= k; ++i)
  449. {
  450. int d = (int)(0.1 / alpha * 2);
  451. if (d < 2)
  452. d = 2;
  453. const double p = 1.0 - pow( (1.0 - alpha), d );
  454. cumulative_binomial_distribution( untrustF[i], i, p + FP - p * FP );
  455. }
  456. // compute the threshold table
  457. for (uint32 i = 1; i <= k; ++i)
  458. {
  459. for (uint32 j = 0; j <= i; ++j)
  460. {
  461. if (untrustF[i][j] >= 1 - 0.5 * 1e-2)
  462. {
  463. threshold[i] = j;
  464. break;
  465. }
  466. }
  467. }
  468. log_verbose(stderr, " thresholds = {");
  469. for (uint32 i = 1; i <= k; ++i)
  470. log_verbose_cont(stderr, " %u,", threshold[i]);
  471. log_verbose_cont(stderr, " }\n");
  472. }
  473. h_bloom_filters.set_threshold( threshold );
  474. for (uint32 i = 0; i < device_count; ++i)
  475. d_bloom_filters[i].set_threshold( threshold );
  476. log_debug(stderr, " rewind reads\n");
  477. if (read_data_file->rewind() == false)
  478. {
  479. log_error(stderr, " failed rewinding file \"%s\"\n", reads_name);
  480. return 1;
  481. }
  482. log_debug(stderr, " assemble pipeline\n");
  483. // build the input stages
  484. InputStageData input_stage_data( read_data_file.get(), max_block_strings, max_block_bps );
  485. InputStage input_stage[16];
  486. // build the trusted-kmer marking stages
  487. SequenceStats marking_stats;
  488. TrustedKmersStage marking_stage[16];
  489. // setup the pipeline stages
  490. if (cpu)
  491. {
  492. // host stages
  493. input_stage[device_count] = InputStage( &input_stage_data );
  494. marking_stage[device_count] = TrustedKmersStage(
  495. -threads,
  496. k,
  497. sampled_kmers_bf_words * bits_per_word, raw_pointer( h_bloom_filters.sampled_kmers_storage ),
  498. trusted_kmers_bf_words * bits_per_word, raw_pointer( h_bloom_filters.trusted_kmers_storage ),
  499. raw_pointer( h_bloom_filters.threshold ),
  500. &marking_stats );
  501. }
  502. for (uint32 i = 0; i < device_count; ++i)
  503. {
  504. // device stages
  505. input_stage[i] = InputStage( &input_stage_data );
  506. marking_stage[i] = TrustedKmersStage(
  507. devices[i],
  508. k,
  509. sampled_kmers_bf_words * bits_per_word, raw_pointer( d_bloom_filters[i].sampled_kmers_storage ),
  510. trusted_kmers_bf_words * bits_per_word, raw_pointer( d_bloom_filters[i].trusted_kmers_storage ),
  511. raw_pointer( d_bloom_filters[i].threshold ),
  512. &marking_stats );
  513. }
  514. // build the pipeline
  515. nvbio::Pipeline pipeline;
  516. for (uint32 i = 0; i < device_count + (cpu ? 1 : 0); ++i)
  517. {
  518. const uint32 in0 = pipeline.append_stage( &input_stage[i], 4u );
  519. const uint32 out = pipeline.append_sink( &marking_stage[i] );
  520. pipeline.add_dependency( in0, out );
  521. }
  522. log_debug(stderr, " start pipeline\n");
  523. Timer timer;
  524. timer.start();
  525. // and run it!
  526. pipeline.run();
  527. log_info_cont(stderr, "\n");
  528. merge( h_bloom_filters_ptr, device_count, d_bloom_filters, TRUSTED_KMERS );
  529. timer.stop();
  530. const float time = timer.seconds();
  531. log_verbose(stderr," total time : %.1fs\n", time);
  532. log_verbose(stderr," peak memory : %.1f GB\n", float( peak_resident_memory() ) / float(1024*1024*1024));
  533. }
  534. log_info(stderr," mark trusted kmers... done\n");
  535. log_info(stderr," error correction... started\n");
  536. {
  537. //
  538. // The following code implements a parallel nvbio::Pipeline to error-correct the input
  539. // reads. The pipeline is composed several subpipelines, one per active device, each made
  540. // of three stages (which will be run in separate threads): an InputStage, an
  541. // ErrorCorrectStage doing the actual error correction work, and a final OutputStage.
  542. //
  543. // gather Bloom filter statistics
  544. {
  545. // compute the number of bits set
  546. float occupancy;
  547. float approx_size;
  548. float FP;
  549. if (device_count)
  550. {
  551. compute_bloom_filter_stats(
  552. d_bloom_filters[0],
  553. TRUSTED_KMERS,
  554. TRUSTED_KMERS_FILTER_K,
  555. occupancy,
  556. approx_size,
  557. FP );
  558. }
  559. else
  560. {
  561. compute_bloom_filter_stats(
  562. h_bloom_filters,
  563. TRUSTED_KMERS,
  564. TRUSTED_KMERS_FILTER_K,
  565. occupancy,
  566. approx_size,
  567. FP );
  568. }
  569. log_stats(stderr, " trusted kmers:\n" );
  570. log_stats(stderr, " occupancy : %f\n", occupancy );
  571. log_stats(stderr, " #kmers (approx) : %.1f\n", approx_size );
  572. log_stats(stderr, " FP rate : %f\n", FP );
  573. }
  574. log_debug(stderr, " rewind reads\n");
  575. if (read_data_file->rewind() == false)
  576. {
  577. log_error(stderr, " failed rewinding file \"%s\"\n", reads_name);
  578. return 1;
  579. }
  580. log_debug(stderr, " assemble pipeline\n");
  581. // build the input stages
  582. InputStageData input_stage_data( read_data_file.get(), max_block_strings, max_block_bps );
  583. InputStage input_stage[16];
  584. // build the error correction stages
  585. SequenceStats ec_stats;
  586. ErrorCorrectStage ec_stage[16];
  587. // build the output stages
  588. OutputStageData output_stage_data( output_file.get() );
  589. OutputStage output_stage[16];
  590. // setup the pipeline stages
  591. if (cpu)
  592. {
  593. // build the input stage
  594. input_stage[device_count] = InputStage( &input_stage_data );
  595. // build the sink
  596. ec_stage[device_count] = ErrorCorrectStage(
  597. -threads,
  598. k,
  599. trusted_kmers_bf_words * bits_per_word, raw_pointer( h_bloom_filters.trusted_kmers_storage ),
  600. raw_pointer( h_bloom_filters.stats ),
  601. max_correction,
  602. bad_quality,
  603. new_quality,
  604. &ec_stats );
  605. // build the input stage
  606. output_stage[device_count] = OutputStage( &output_stage_data );
  607. }
  608. for (uint32 i = 0; i < device_count; ++i)
  609. {
  610. // build the input stage
  611. input_stage[i] = InputStage( &input_stage_data );
  612. // build the sink
  613. ec_stage[i] = ErrorCorrectStage(
  614. devices[i],
  615. k,
  616. trusted_kmers_bf_words * bits_per_word, raw_pointer( d_bloom_filters[i].trusted_kmers_storage ),
  617. raw_pointer( d_bloom_filters[i].stats ),
  618. max_correction,
  619. bad_quality,
  620. new_quality,
  621. &ec_stats );
  622. // build the input stage
  623. output_stage[i] = OutputStage( &output_stage_data );
  624. }
  625. log_debug(stderr, " start pipeline\n");
  626. // build the pipeline
  627. nvbio::Pipeline pipeline;
  628. for (uint32 i = 0; i < device_count + (cpu ? 1 : 0); ++i)
  629. {
  630. const uint32 in = pipeline.append_stage( &input_stage[i], 4u );
  631. const uint32 ec = pipeline.append_stage( &ec_stage[i] );
  632. const uint32 out = pipeline.append_sink( &output_stage[i] );
  633. pipeline.add_dependency( in, ec );
  634. pipeline.add_dependency( ec, out );
  635. }
  636. Timer timer;
  637. timer.start();
  638. // and run it!
  639. pipeline.run();
  640. timer.stop();
  641. const float time = timer.seconds();
  642. log_info_cont(stderr, "\n");
  643. nvbio::vector<host_tag,uint64> stats;
  644. merged_stats( h_bloom_filters_ptr, device_count, d_bloom_filters, stats );
  645. log_info(stderr," stats :\n");
  646. log_info(stderr," error free reads : %llu\n", uint64( stats[ERROR_FREE] ));
  647. log_info(stderr," unfixable reads : %llu\n", uint64( stats[UNFIXABLE] ));
  648. log_info(stderr," corrected bases : %llu\n", uint64( stats[CORRECTIONS] ));
  649. log_verbose(stderr," total time : %.1fs\n", time);
  650. log_verbose(stderr," peak memory : %.1f GB\n", float( peak_resident_memory() ) / float(1024*1024*1024));
  651. }
  652. log_info(stderr," error correction... done\n");
  653. log_visible(stderr,"nvLighter... done\n");
  654. }
  655. catch (nvbio::cuda_error e)
  656. {
  657. log_error(stderr, "caught a nvbio::cuda_error exception:\n");
  658. log_error(stderr, " %s\n", e.what());
  659. return 1;
  660. }
  661. catch (nvbio::bad_alloc e)
  662. {
  663. log_error(stderr, "caught a nvbio::bad_alloc exception:\n");
  664. log_error(stderr, " %s\n", e.what());
  665. return 1;
  666. }
  667. catch (nvbio::logic_error e)
  668. {
  669. log_error(stderr, "caught a nvbio::logic_error exception:\n");
  670. log_error(stderr, " %s\n", e.what());
  671. return 1;
  672. }
  673. catch (nvbio::runtime_error e)
  674. {
  675. log_error(stderr, "caught a nvbio::runtime_error exception:\n");
  676. log_error(stderr, " %s\n", e.what());
  677. return 1;
  678. }
  679. catch (thrust::system::system_error e)
  680. {
  681. log_error(stderr, "caught a thrust::system_error exception:\n");
  682. log_error(stderr, " %s\n", e.what());
  683. return 1;
  684. }
  685. catch (std::bad_alloc e)
  686. {
  687. log_error(stderr, "caught a std::bad_alloc exception:\n");
  688. log_error(stderr, " %s\n", e.what());
  689. return 1;
  690. }
  691. catch (std::logic_error e)
  692. {
  693. log_error(stderr, "caught a std::logic_error exception:\n");
  694. log_error(stderr, " %s\n", e.what());
  695. return 1;
  696. }
  697. catch (std::runtime_error e)
  698. {
  699. log_error(stderr, "caught a std::runtime_error exception:\n");
  700. log_error(stderr, " %s\n", e.what());
  701. return 1;
  702. }
  703. catch (...)
  704. {
  705. log_error(stderr, "caught an unknown exception!\n");
  706. return 1;
  707. }
  708. return 0;
  709. }