123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441 |
- // ***************************************************************************
- // BGZF.cpp (c) 2009 Derek Barnett, Michael Str�mberg
- // Marth Lab, Department of Biology, Boston College
- // All rights reserved.
- // ---------------------------------------------------------------------------
- // Last modified: 8 December 2009 (DB)
- // ---------------------------------------------------------------------------
- // BGZF routines were adapted from the bgzf.c code developed at the Broad
- // Institute.
- // ---------------------------------------------------------------------------
- // Provides the basic functionality for reading & writing BGZF files
- // ***************************************************************************
- #include "BGZF.h"
- #include <nvbio/basic/threads.h>
- #include <nvbio/basic/console.h>
- #include <algorithm>
- using namespace BamTools;
- using namespace nvbio;
- using std::string;
- using std::min;
- namespace BamTools {
- struct BgzfThread : public Thread<BgzfThread>
- {
- void set_data(BgzfData* data) { m_data = data; }
- void run()
- {
- const uint32 tid = ++m_data->BackgroundThreads;
- log_verbose( stderr, "starting BAM output thread %u\n", tid );
- while (m_data->BackgroundThreads != 0)
- {
- if (m_data->WorkCounter > 0u)
- {
- const int32 id = --m_data->WorkCounter;
- if (id >= 0)
- {
- m_data->BlockLengths[id] = m_data->DeflateBlock( id, m_data->CurrentBlockSize );
- --m_data->ActiveThreads;
- }
- }
- }
- }
- BgzfData* m_data;
- };
- BgzfData::BgzfData(const uint32_t threads)
- : UncompressedBlockSize(DEFAULT_BLOCK_SIZE)
- , CompressedBlockSize(MAX_BLOCK_SIZE)
- , BlockLength(0)
- , BlockOffset(0)
- , BlockAddress(0)
- , IsOpen(false)
- , IsWriteOnly(false)
- , Stream(NULL)
- , UncompressedBlock(NULL)
- , CompressedBlock(NULL)
- , BackgroundThreads( 0 )
- , ActiveThreads( 0 )
- , WorkCounter( 0 )
- , ThreadCount( threads == uint32(-1) ? nvbio::num_physical_cores() : 4 )
- {
- try {
- CompressedBlock = new char[CompressedBlockSize * ThreadCount * 4];
- UncompressedBlock = new char[UncompressedBlockSize * ThreadCount];
- BlockLengths = new int [ThreadCount];
- ThreadPool = new BgzfThread[ThreadCount];
- // launch background threads
- for (uint32_t i = 0; i < ThreadCount; ++i)
- {
- ThreadPool[i].set_data( this );
- ThreadPool[i].create();
- }
- // poll until all background threads are ready
- while (BackgroundThreads < ThreadCount) {}
- }
- catch( std::bad_alloc& ) {
- printf("ERROR: Unable to allocate memory for our BGZF object.\n");
- exit(1);
- }
- }
- // destructor
- BgzfData::~BgzfData(void)
- {
- // stop background threads
- BackgroundThreads = 0;
- for (uint32_t i = 0; i < ThreadCount; ++i)
- ThreadPool[i].join();
- if(CompressedBlock) delete [] CompressedBlock;
- if(UncompressedBlock) delete [] UncompressedBlock;
- if(BlockLengths) delete [] BlockLengths;
- if(ThreadPool) delete [] ThreadPool;
- }
- // closes BGZF file
- void BgzfData::Close(void) {
- if (!IsOpen) { return; }
- IsOpen = false;
- // flush the BGZF block
- if ( IsWriteOnly ) { FlushBlocks(); }
- // flush and close
- fflush(Stream);
- fclose(Stream);
- }
- // compresses the current block
- int BgzfData::DeflateBlocks(void)
- {
- // count the number of blocks to deflate
- int32_t blockSize = UncompressedBlockSize;
- while (1)
- {
- const int n_blocks = (BlockOffset + blockSize-1) / blockSize;
- #if 0
- // single threaded implementation
- for (int i = 0; i < n_blocks; ++i)
- BlockLengths[i] = DeflateBlock( i, blockSize );
- #else
- CurrentBlockSize = blockSize; // set current block size
- WorkCounter = n_blocks; // set number of blocks to process
- ActiveThreads = n_blocks; // activate background threads
- // poll until the blocks are being processed
- while (ActiveThreads > 0) {}
- #endif
- // check whether compression failed for any of the blocks
- bool ok = true;
- for (int i = 0; i < n_blocks; ++i)
- {
- if (BlockLengths[i] < 0)
- ok = false;
- }
- if (ok)
- return n_blocks;
- // compression failed: reduce block size
- blockSize -= 1024;
- if (blockSize <= 0)
- {
- printf("ERROR: input reduction failed.\n");
- exit(1);
- }
- }
- return 0;
- }
- // compresses the current block
- int BgzfData::DeflateBlock(int32_t id, const unsigned int uncompressedBlockSize)
- {
- // check if the block is within the proper range
- if (uncompressedBlockSize * id >= BlockOffset)
- return 0;
- // initialize the gzip header
- char* buffer = CompressedBlock + id * CompressedBlockSize;
- unsigned int bufferSize = CompressedBlockSize;
- memset(buffer, 0, 18);
- buffer[0] = GZIP_ID1;
- buffer[1] = (char)GZIP_ID2;
- buffer[2] = CM_DEFLATE;
- buffer[3] = FLG_FEXTRA;
- buffer[9] = (char)OS_UNKNOWN;
- buffer[10] = BGZF_XLEN;
- buffer[12] = BGZF_ID1;
- buffer[13] = BGZF_ID2;
- buffer[14] = BGZF_LEN;
- // loop to retry for blocks that do not compress enough
- int inputLength = std::min( uncompressedBlockSize, BlockOffset - uncompressedBlockSize * id );
- int compressedLength = 0;
- z_stream zs;
- zs.zalloc = NULL;
- zs.zfree = NULL;
- zs.next_in = (Bytef*)UncompressedBlock + id * uncompressedBlockSize;
- zs.avail_in = inputLength;
- zs.next_out = (Bytef*)&buffer[BLOCK_HEADER_LENGTH];
- zs.avail_out = bufferSize - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
- // initialize the zlib compression algorithm
- if(deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, GZIP_WINDOW_BITS, Z_DEFAULT_MEM_LEVEL, Z_DEFAULT_STRATEGY) != Z_OK) {
- printf("ERROR: zlib deflate initialization failed.\n");
- exit(1);
- }
- // compress the data
- int status = deflate(&zs, Z_FINISH);
- if(status != Z_STREAM_END) {
- deflateEnd(&zs);
- // reduce the input length and try again
- if(status == Z_OK)
- return -1;
- }
- // finalize the compression routine
- if(deflateEnd(&zs) != Z_OK) {
- printf("ERROR: deflate end failed.\n");
- exit(1);
- }
- compressedLength = zs.total_out;
- compressedLength += BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
- if(compressedLength > MAX_BLOCK_SIZE) {
- printf("ERROR: deflate overflow.\n");
- exit(1);
- }
- // store the compressed length
- BgzfData::PackUnsignedShort(&buffer[16], (unsigned short)(compressedLength - 1));
- // store the CRC32 checksum
- unsigned int crc = crc32(0, NULL, 0);
- crc = crc32(crc, (Bytef*)UncompressedBlock + id * uncompressedBlockSize, inputLength);
- BgzfData::PackUnsignedInt(&buffer[compressedLength - 8], crc);
- BgzfData::PackUnsignedInt(&buffer[compressedLength - 4], inputLength);
- return compressedLength;
- }
- // flushes the data in the BGZF block
- void BgzfData::FlushBlocks(void) {
- // compress the data block
- int n_blocks = DeflateBlocks();
- // flush the data to our output stream
- for (int i = 0; i < n_blocks; ++i)
- {
- int blockLength = BlockLengths[i];
- int numBytesWritten = (int)fwrite(CompressedBlock + i * CompressedBlockSize, 1, blockLength, Stream);
- if(numBytesWritten != blockLength) {
- printf("ERROR: Expected to write %u bytes during flushing, but wrote %u bytes.\n", blockLength, numBytesWritten);
- exit(1);
- }
- BlockAddress += blockLength;
- }
- BlockOffset = 0;
- }
- // de-compresses the current block
- int BgzfData::InflateBlock(const int& blockLength) {
- // Inflate the block in m_BGZF.CompressedBlock into m_BGZF.UncompressedBlock
- z_stream zs;
- zs.zalloc = NULL;
- zs.zfree = NULL;
- zs.next_in = (Bytef*)CompressedBlock + 18;
- zs.avail_in = blockLength - 16;
- zs.next_out = (Bytef*)UncompressedBlock;
- zs.avail_out = UncompressedBlockSize;
- int status = inflateInit2(&zs, GZIP_WINDOW_BITS);
- if (status != Z_OK) {
- printf("inflateInit failed\n");
- exit(1);
- }
- status = inflate(&zs, Z_FINISH);
- if (status != Z_STREAM_END) {
- inflateEnd(&zs);
- printf("inflate failed\n");
- exit(1);
- }
- status = inflateEnd(&zs);
- if (status != Z_OK) {
- printf("inflateEnd failed\n");
- exit(1);
- }
- return zs.total_out;
- }
- void BgzfData::Open(const string& filename, const char* mode) {
- if ( strcmp(mode, "rb") == 0 ) {
- IsWriteOnly = false;
- } else if ( strcmp(mode, "wb") == 0) {
- IsWriteOnly = true;
- } else {
- printf("ERROR: Unknown file mode: %s\n", mode);
- exit(1);
- }
- Stream = fopen(filename.c_str(), mode);
- if(!Stream) {
- printf("ERROR: Unable to open the BAM file %s\n", filename.c_str() );
- exit(1);
- }
- IsOpen = true;
- }
- int BgzfData::Read(char* data, const unsigned int dataLength) {
- if (dataLength == 0) { return 0; }
- char* output = data;
- unsigned int numBytesRead = 0;
- while (numBytesRead < dataLength) {
- int bytesAvailable = BlockLength - BlockOffset;
- if (bytesAvailable <= 0) {
- if ( ReadBlock() != 0 ) { return -1; }
- bytesAvailable = BlockLength - BlockOffset;
- if ( bytesAvailable <= 0 ) { break; }
- }
- char* buffer = UncompressedBlock;
- int copyLength = min( (int)(dataLength-numBytesRead), bytesAvailable );
- memcpy(output, buffer + BlockOffset, copyLength);
- BlockOffset += copyLength;
- output += copyLength;
- numBytesRead += copyLength;
- }
- if ( BlockOffset == BlockLength ) {
- BlockAddress = ftell(Stream);
- BlockOffset = 0;
- BlockLength = 0;
- }
- return numBytesRead;
- }
- int BgzfData::ReadBlock(void) {
- char header[BLOCK_HEADER_LENGTH];
- int64_t blockAddress = ftell(Stream);
- int count = (int)fread(header, 1, sizeof(header), Stream);
- if (count == 0) {
- BlockLength = 0;
- return 0;
- }
- if (count != sizeof(header)) {
- printf("read block failed - count != sizeof(header)\n");
- return -1;
- }
- if (!BgzfData::CheckBlockHeader(header)) {
- printf("read block failed - CheckBlockHeader() returned false\n");
- return -1;
- }
- int blockLength = BgzfData::UnpackUnsignedShort(&header[16]) + 1;
- char* compressedBlock = CompressedBlock;
- memcpy(compressedBlock, header, BLOCK_HEADER_LENGTH);
- int remaining = blockLength - BLOCK_HEADER_LENGTH;
- count = (int)fread(&compressedBlock[BLOCK_HEADER_LENGTH], 1, remaining, Stream);
- if (count != remaining) {
- printf("read block failed - count != remaining\n");
- return -1;
- }
- count = InflateBlock(blockLength);
- if (count < 0) { return -1; }
- if ( BlockLength != 0 ) {
- BlockOffset = 0;
- }
- BlockAddress = blockAddress;
- BlockLength = count;
- return 0;
- }
- bool BgzfData::Seek(int64_t position) {
- int blockOffset = (position & 0xFFFF);
- int64_t blockAddress = (position >> 16) & 0xFFFFFFFFFFFFLL;
- if (fseek(Stream, (long)blockAddress, SEEK_SET) != 0) {
- printf("ERROR: Unable to seek in BAM file\n");
- exit(1);
- }
- BlockLength = 0;
- BlockAddress = blockAddress;
- BlockOffset = blockOffset;
- return true;
- }
- int64_t BgzfData::Tell(void) {
- return ( (BlockAddress << 16) | (BlockOffset & 0xFFFF) );
- }
- // writes the supplied data into the BGZF buffer
- unsigned int BgzfData::Write(const char* data, const unsigned int dataLen) {
- // initialize
- unsigned int numBytesWritten = 0;
- const char* input = data;
- unsigned int blockLength = UncompressedBlockSize * ThreadCount;
- // copy the data to the buffer
- while(numBytesWritten < dataLen) {
- unsigned int copyLength = min(blockLength - BlockOffset, dataLen - numBytesWritten);
- char* buffer = UncompressedBlock;
- memcpy(buffer + BlockOffset, input, copyLength);
- BlockOffset += copyLength;
- input += copyLength;
- numBytesWritten += copyLength;
- if(BlockOffset == blockLength) {
- FlushBlocks();
- }
- }
- return numBytesWritten;
- }
- } // namespace BamTools
|