BGZF.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. // ***************************************************************************
  2. // BGZF.cpp (c) 2009 Derek Barnett, Michael Str�mberg
  3. // Marth Lab, Department of Biology, Boston College
  4. // All rights reserved.
  5. // ---------------------------------------------------------------------------
  6. // Last modified: 8 December 2009 (DB)
  7. // ---------------------------------------------------------------------------
  8. // BGZF routines were adapted from the bgzf.c code developed at the Broad
  9. // Institute.
  10. // ---------------------------------------------------------------------------
  11. // Provides the basic functionality for reading & writing BGZF files
  12. // ***************************************************************************
  13. #include "BGZF.h"
  14. #include <nvbio/basic/threads.h>
  15. #include <nvbio/basic/console.h>
  16. #include <algorithm>
  17. using namespace BamTools;
  18. using namespace nvbio;
  19. using std::string;
  20. using std::min;
  21. namespace BamTools {
  22. struct BgzfThread : public Thread<BgzfThread>
  23. {
  24. void set_data(BgzfData* data) { m_data = data; }
  25. void run()
  26. {
  27. const uint32 tid = ++m_data->BackgroundThreads;
  28. log_verbose( stderr, "starting BAM output thread %u\n", tid );
  29. while (m_data->BackgroundThreads != 0)
  30. {
  31. if (m_data->WorkCounter > 0u)
  32. {
  33. const int32 id = --m_data->WorkCounter;
  34. if (id >= 0)
  35. {
  36. m_data->BlockLengths[id] = m_data->DeflateBlock( id, m_data->CurrentBlockSize );
  37. --m_data->ActiveThreads;
  38. }
  39. }
  40. }
  41. }
  42. BgzfData* m_data;
  43. };
  44. BgzfData::BgzfData(const uint32_t threads)
  45. : UncompressedBlockSize(DEFAULT_BLOCK_SIZE)
  46. , CompressedBlockSize(MAX_BLOCK_SIZE)
  47. , BlockLength(0)
  48. , BlockOffset(0)
  49. , BlockAddress(0)
  50. , IsOpen(false)
  51. , IsWriteOnly(false)
  52. , Stream(NULL)
  53. , UncompressedBlock(NULL)
  54. , CompressedBlock(NULL)
  55. , BackgroundThreads( 0 )
  56. , ActiveThreads( 0 )
  57. , WorkCounter( 0 )
  58. , ThreadCount( threads == uint32(-1) ? nvbio::num_physical_cores() : 4 )
  59. {
  60. try {
  61. CompressedBlock = new char[CompressedBlockSize * ThreadCount * 4];
  62. UncompressedBlock = new char[UncompressedBlockSize * ThreadCount];
  63. BlockLengths = new int [ThreadCount];
  64. ThreadPool = new BgzfThread[ThreadCount];
  65. // launch background threads
  66. for (uint32_t i = 0; i < ThreadCount; ++i)
  67. {
  68. ThreadPool[i].set_data( this );
  69. ThreadPool[i].create();
  70. }
  71. // poll until all background threads are ready
  72. while (BackgroundThreads < ThreadCount) {}
  73. }
  74. catch( std::bad_alloc& ) {
  75. printf("ERROR: Unable to allocate memory for our BGZF object.\n");
  76. exit(1);
  77. }
  78. }
  79. // destructor
  80. BgzfData::~BgzfData(void)
  81. {
  82. // stop background threads
  83. BackgroundThreads = 0;
  84. for (uint32_t i = 0; i < ThreadCount; ++i)
  85. ThreadPool[i].join();
  86. if(CompressedBlock) delete [] CompressedBlock;
  87. if(UncompressedBlock) delete [] UncompressedBlock;
  88. if(BlockLengths) delete [] BlockLengths;
  89. if(ThreadPool) delete [] ThreadPool;
  90. }
  91. // closes BGZF file
  92. void BgzfData::Close(void) {
  93. if (!IsOpen) { return; }
  94. IsOpen = false;
  95. // flush the BGZF block
  96. if ( IsWriteOnly ) { FlushBlocks(); }
  97. // flush and close
  98. fflush(Stream);
  99. fclose(Stream);
  100. }
  101. // compresses the current block
  102. int BgzfData::DeflateBlocks(void)
  103. {
  104. // count the number of blocks to deflate
  105. int32_t blockSize = UncompressedBlockSize;
  106. while (1)
  107. {
  108. const int n_blocks = (BlockOffset + blockSize-1) / blockSize;
  109. #if 0
  110. // single threaded implementation
  111. for (int i = 0; i < n_blocks; ++i)
  112. BlockLengths[i] = DeflateBlock( i, blockSize );
  113. #else
  114. CurrentBlockSize = blockSize; // set current block size
  115. WorkCounter = n_blocks; // set number of blocks to process
  116. ActiveThreads = n_blocks; // activate background threads
  117. // poll until the blocks are being processed
  118. while (ActiveThreads > 0) {}
  119. #endif
  120. // check whether compression failed for any of the blocks
  121. bool ok = true;
  122. for (int i = 0; i < n_blocks; ++i)
  123. {
  124. if (BlockLengths[i] < 0)
  125. ok = false;
  126. }
  127. if (ok)
  128. return n_blocks;
  129. // compression failed: reduce block size
  130. blockSize -= 1024;
  131. if (blockSize <= 0)
  132. {
  133. printf("ERROR: input reduction failed.\n");
  134. exit(1);
  135. }
  136. }
  137. return 0;
  138. }
  139. // compresses the current block
  140. int BgzfData::DeflateBlock(int32_t id, const unsigned int uncompressedBlockSize)
  141. {
  142. // check if the block is within the proper range
  143. if (uncompressedBlockSize * id >= BlockOffset)
  144. return 0;
  145. // initialize the gzip header
  146. char* buffer = CompressedBlock + id * CompressedBlockSize;
  147. unsigned int bufferSize = CompressedBlockSize;
  148. memset(buffer, 0, 18);
  149. buffer[0] = GZIP_ID1;
  150. buffer[1] = (char)GZIP_ID2;
  151. buffer[2] = CM_DEFLATE;
  152. buffer[3] = FLG_FEXTRA;
  153. buffer[9] = (char)OS_UNKNOWN;
  154. buffer[10] = BGZF_XLEN;
  155. buffer[12] = BGZF_ID1;
  156. buffer[13] = BGZF_ID2;
  157. buffer[14] = BGZF_LEN;
  158. // loop to retry for blocks that do not compress enough
  159. int inputLength = std::min( uncompressedBlockSize, BlockOffset - uncompressedBlockSize * id );
  160. int compressedLength = 0;
  161. z_stream zs;
  162. zs.zalloc = NULL;
  163. zs.zfree = NULL;
  164. zs.next_in = (Bytef*)UncompressedBlock + id * uncompressedBlockSize;
  165. zs.avail_in = inputLength;
  166. zs.next_out = (Bytef*)&buffer[BLOCK_HEADER_LENGTH];
  167. zs.avail_out = bufferSize - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
  168. // initialize the zlib compression algorithm
  169. if(deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, GZIP_WINDOW_BITS, Z_DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY) != Z_OK) {
  170. printf("ERROR: zlib deflate initialization failed.\n");
  171. exit(1);
  172. }
  173. // compress the data
  174. int status = deflate(&zs, Z_FINISH);
  175. if(status != Z_STREAM_END) {
  176. deflateEnd(&zs);
  177. // reduce the input length and try again
  178. if(status == Z_OK)
  179. return -1;
  180. }
  181. // finalize the compression routine
  182. if(deflateEnd(&zs) != Z_OK) {
  183. printf("ERROR: deflate end failed.\n");
  184. exit(1);
  185. }
  186. compressedLength = zs.total_out;
  187. compressedLength += BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
  188. if(compressedLength > MAX_BLOCK_SIZE) {
  189. printf("ERROR: deflate overflow.\n");
  190. exit(1);
  191. }
  192. // store the compressed length
  193. BgzfData::PackUnsignedShort(&buffer[16], (unsigned short)(compressedLength - 1));
  194. // store the CRC32 checksum
  195. unsigned int crc = crc32(0, NULL, 0);
  196. crc = crc32(crc, (Bytef*)UncompressedBlock + id * uncompressedBlockSize, inputLength);
  197. BgzfData::PackUnsignedInt(&buffer[compressedLength - 8], crc);
  198. BgzfData::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
  199. return compressedLength;
  200. }
  201. // flushes the data in the BGZF block
  202. void BgzfData::FlushBlocks(void) {
  203. // compress the data block
  204. int n_blocks = DeflateBlocks();
  205. // flush the data to our output stream
  206. for (int i = 0; i < n_blocks; ++i)
  207. {
  208. int blockLength = BlockLengths[i];
  209. int numBytesWritten = (int)fwrite(CompressedBlock + i * CompressedBlockSize, 1, blockLength, Stream);
  210. if(numBytesWritten != blockLength) {
  211. printf("ERROR: Expected to write %u bytes during flushing, but wrote %u bytes.\n", blockLength, numBytesWritten);
  212. exit(1);
  213. }
  214. BlockAddress += blockLength;
  215. }
  216. BlockOffset = 0;
  217. }
  218. // de-compresses the current block
  219. int BgzfData::InflateBlock(const int& blockLength) {
  220. // Inflate the block in m_BGZF.CompressedBlock into m_BGZF.UncompressedBlock
  221. z_stream zs;
  222. zs.zalloc = NULL;
  223. zs.zfree = NULL;
  224. zs.next_in = (Bytef*)CompressedBlock + 18;
  225. zs.avail_in = blockLength - 16;
  226. zs.next_out = (Bytef*)UncompressedBlock;
  227. zs.avail_out = UncompressedBlockSize;
  228. int status = inflateInit2(&zs, GZIP_WINDOW_BITS);
  229. if (status != Z_OK) {
  230. printf("inflateInit failed\n");
  231. exit(1);
  232. }
  233. status = inflate(&zs, Z_FINISH);
  234. if (status != Z_STREAM_END) {
  235. inflateEnd(&zs);
  236. printf("inflate failed\n");
  237. exit(1);
  238. }
  239. status = inflateEnd(&zs);
  240. if (status != Z_OK) {
  241. printf("inflateEnd failed\n");
  242. exit(1);
  243. }
  244. return zs.total_out;
  245. }
  246. void BgzfData::Open(const string& filename, const char* mode) {
  247. if ( strcmp(mode, "rb") == 0 ) {
  248. IsWriteOnly = false;
  249. } else if ( strcmp(mode, "wb") == 0) {
  250. IsWriteOnly = true;
  251. } else {
  252. printf("ERROR: Unknown file mode: %s\n", mode);
  253. exit(1);
  254. }
  255. Stream = fopen(filename.c_str(), mode);
  256. if(!Stream) {
  257. printf("ERROR: Unable to open the BAM file %s\n", filename.c_str() );
  258. exit(1);
  259. }
  260. IsOpen = true;
  261. }
  262. int BgzfData::Read(char* data, const unsigned int dataLength) {
  263. if (dataLength == 0) { return 0; }
  264. char* output = data;
  265. unsigned int numBytesRead = 0;
  266. while (numBytesRead < dataLength) {
  267. int bytesAvailable = BlockLength - BlockOffset;
  268. if (bytesAvailable <= 0) {
  269. if ( ReadBlock() != 0 ) { return -1; }
  270. bytesAvailable = BlockLength - BlockOffset;
  271. if ( bytesAvailable <= 0 ) { break; }
  272. }
  273. char* buffer = UncompressedBlock;
  274. int copyLength = min( (int)(dataLength-numBytesRead), bytesAvailable );
  275. memcpy(output, buffer + BlockOffset, copyLength);
  276. BlockOffset += copyLength;
  277. output += copyLength;
  278. numBytesRead += copyLength;
  279. }
  280. if ( BlockOffset == BlockLength ) {
  281. BlockAddress = ftell(Stream);
  282. BlockOffset = 0;
  283. BlockLength = 0;
  284. }
  285. return numBytesRead;
  286. }
  287. int BgzfData::ReadBlock(void) {
  288. char header[BLOCK_HEADER_LENGTH];
  289. int64_t blockAddress = ftell(Stream);
  290. int count = (int)fread(header, 1, sizeof(header), Stream);
  291. if (count == 0) {
  292. BlockLength = 0;
  293. return 0;
  294. }
  295. if (count != sizeof(header)) {
  296. printf("read block failed - count != sizeof(header)\n");
  297. return -1;
  298. }
  299. if (!BgzfData::CheckBlockHeader(header)) {
  300. printf("read block failed - CheckBlockHeader() returned false\n");
  301. return -1;
  302. }
  303. int blockLength = BgzfData::UnpackUnsignedShort(&header[16]) + 1;
  304. char* compressedBlock = CompressedBlock;
  305. memcpy(compressedBlock, header, BLOCK_HEADER_LENGTH);
  306. int remaining = blockLength - BLOCK_HEADER_LENGTH;
  307. count = (int)fread(&compressedBlock[BLOCK_HEADER_LENGTH], 1, remaining, Stream);
  308. if (count != remaining) {
  309. printf("read block failed - count != remaining\n");
  310. return -1;
  311. }
  312. count = InflateBlock(blockLength);
  313. if (count < 0) { return -1; }
  314. if ( BlockLength != 0 ) {
  315. BlockOffset = 0;
  316. }
  317. BlockAddress = blockAddress;
  318. BlockLength = count;
  319. return 0;
  320. }
  321. bool BgzfData::Seek(int64_t position) {
  322. int blockOffset = (position & 0xFFFF);
  323. int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
  324. if (fseek(Stream, (long)blockAddress, SEEK_SET) != 0) {
  325. printf("ERROR: Unable to seek in BAM file\n");
  326. exit(1);
  327. }
  328. BlockLength = 0;
  329. BlockAddress = blockAddress;
  330. BlockOffset = blockOffset;
  331. return true;
  332. }
  333. int64_t BgzfData::Tell(void) {
  334. return ( (BlockAddress << 16) | (BlockOffset & 0xFFFF) );
  335. }
  336. // writes the supplied data into the BGZF buffer
  337. unsigned int BgzfData::Write(const char* data, const unsigned int dataLen) {
  338. // initialize
  339. unsigned int numBytesWritten = 0;
  340. const char* input = data;
  341. unsigned int blockLength = UncompressedBlockSize * ThreadCount;
  342. // copy the data to the buffer
  343. while(numBytesWritten < dataLen) {
  344. unsigned int copyLength = min(blockLength - BlockOffset, dataLen - numBytesWritten);
  345. char* buffer = UncompressedBlock;
  346. memcpy(buffer + BlockOffset, input, copyLength);
  347. BlockOffset += copyLength;
  348. input += copyLength;
  349. numBytesWritten += copyLength;
  350. if(BlockOffset == blockLength) {
  351. FlushBlocks();
  352. }
  353. }
  354. return numBytesWritten;
  355. }
  356. } // namespace BamTools