// Copyright ©2012 The bíogo Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package bgzf import ( "bufio" "bytes" "compress/flate" "compress/gzip" "io" "runtime" "sync" ) // countReader wraps flate.Reader, adding support for querying current offset. type countReader struct { // Underlying Reader. fr flate.Reader // Offset within the underlying reader. off int64 } // newCountReader returns a new countReader. func newCountReader(r io.Reader) *countReader { switch r := r.(type) { case *countReader: panic("bgzf: illegal use of internal type") case flate.Reader: return &countReader{fr: r} default: return &countReader{fr: bufio.NewReader(r)} } } // Read is required to satisfy flate.Reader. func (r *countReader) Read(p []byte) (int, error) { n, err := r.fr.Read(p) r.off += int64(n) return n, err } // ReadByte is required to satisfy flate.Reader. func (r *countReader) ReadByte() (byte, error) { b, err := r.fr.ReadByte() if err == nil { r.off++ } return b, err } // offset returns the current offset in the underlying reader. func (r *countReader) offset() int64 { return r.off } // seek moves the countReader to the specified offset using rs as the // underlying reader. func (r *countReader) seek(rs io.ReadSeeker, off int64) error { _, err := rs.Seek(off, 0) if err != nil { return err } type reseter interface { Reset(io.Reader) } switch cr := r.fr.(type) { case reseter: cr.Reset(rs) default: r.fr = newCountReader(rs) } r.off = off return nil } // buffer is a flate.Reader used by a decompressor to store read-ahead data. type buffer struct { // Buffered compressed data from read ahead. off int // Current position in buffered data. size int // Total size of buffered data. data [MaxBlockSize]byte } // Read provides the flate.Decompressor Read method. func (r *buffer) Read(b []byte) (int, error) { if r.off >= r.size { return 0, io.EOF } if n := r.size - r.off; len(b) > n { b = b[:n] } n := copy(b, r.data[r.off:]) r.off += n return n, nil } // ReadByte provides the flate.Decompressor ReadByte method. func (r *buffer) ReadByte() (byte, error) { if r.off == r.size { return 0, io.EOF } b := r.data[r.off] r.off++ return b, nil } // reset makes the buffer available to store data. func (r *buffer) reset() { r.size = 0 } // hasData returns whether the buffer has any data buffered. func (r *buffer) hasData() bool { return r.size != 0 } // readLimited reads n bytes into the buffer from the given source. func (r *buffer) readLimited(n int, src *countReader) error { if r.hasData() { panic("bgzf: read into non-empty buffer") } r.off = 0 var err error r.size, err = io.ReadFull(src, r.data[:n]) return err } // equals returns a boolean indicating the equality between // the buffered data and the given byte slice. func (r *buffer) equals(b []byte) bool { return bytes.Equal(r.data[:r.size], b) } // decompressor is a gzip member decompressor worker. type decompressor struct { owner *Reader gz gzip.Reader cr *countReader // Current block size. blockSize int // Buffered compressed data from read ahead. buf buffer // Decompressed data. wg sync.WaitGroup blk Block err error } // Read provides the Read method for the decompressor's gzip.Reader. func (d *decompressor) Read(b []byte) (int, error) { if d.buf.hasData() { return d.buf.Read(b) } return d.cr.Read(b) } // ReadByte provides the ReadByte method for the decompressor's gzip.Reader. func (d *decompressor) ReadByte() (byte, error) { if d.buf.hasData() { return d.buf.ReadByte() } return d.cr.ReadByte() } // lazyBlock conditionally creates a ready to use Block. func (d *decompressor) lazyBlock() { if d.blk == nil { if w, ok := d.owner.cache.(Wrapper); ok { d.blk = w.Wrap(&block{owner: d.owner}) } else { d.blk = &block{owner: d.owner} } return } if !d.blk.ownedBy(d.owner) { d.blk.setOwner(d.owner) } } // acquireHead gains the read head from the decompressor's owner. func (d *decompressor) acquireHead() { d.wg.Add(1) d.cr = <-d.owner.head } // releaseHead releases the read head back to the decompressor's owner. func (d *decompressor) releaseHead() { d.owner.head <- d.cr d.cr = nil // Defensively zero the reader. } // wait waits for the current member to be decompressed or fail, and returns // the resulting error state. func (d *decompressor) wait() (Block, error) { d.wg.Wait() blk := d.blk d.blk = nil return blk, d.err } // using sets the Block for the decompressor to work with. func (d *decompressor) using(b Block) *decompressor { d.blk = b; return d } // nextBlockAt makes the decompressor ready for reading decompressed data // from its Block. It checks if there is a cached Block for the nextBase, // otherwise it seeks to the correct location if decompressor is not // correctly positioned, and then reads the compressed data and fills // the decompressed Block. // After nextBlockAt returns without error, the decompressor's Block // holds a valid gzip.Header and base offset. func (d *decompressor) nextBlockAt(off int64, rs io.ReadSeeker) *decompressor { d.err = nil for { exists, next := d.owner.cacheHasBlockFor(off) if !exists { break } off = next } d.lazyBlock() d.acquireHead() defer d.releaseHead() if d.cr.offset() != off { if rs == nil { // It should not be possible for the expected next block base // to be out of register with the count reader unless Seek // has been called, so we know the base reader must be an // io.ReadSeeker. var ok bool rs, ok = d.owner.r.(io.ReadSeeker) if !ok { panic("bgzf: unexpected offset without seek") } } d.err = d.cr.seek(rs, off) if d.err != nil { d.wg.Done() return d } } d.blk.setBase(d.cr.offset()) d.err = d.readMember() if d.err != nil { d.wg.Done() return d } d.blk.setHeader(d.gz.Header) d.gz.Header = gzip.Header{} // Prevent retention of header field in next use. // Decompress data into the decompressor's Block. go func() { d.err = d.blk.readFrom(&d.gz) d.wg.Done() }() return d } // expectedMemberSize returns the size of the BGZF conformant gzip member. // It returns -1 if no BGZF block size field is found. func expectedMemberSize(h gzip.Header) int { i := bytes.Index(h.Extra, bgzfExtraPrefix) if i < 0 || i+5 >= len(h.Extra) { return -1 } return (int(h.Extra[i+4]) | int(h.Extra[i+5])<<8) + 1 } // readMember buffers the gzip member starting the current decompressor offset. func (d *decompressor) readMember() error { // Set the decompressor to Read from the underlying flate.Reader // and mark the starting offset from which the underlying reader // was used. d.buf.reset() mark := d.cr.offset() err := d.gz.Reset(d) if err != nil { d.blockSize = -1 return err } d.blockSize = expectedMemberSize(d.gz.Header) if d.blockSize < 0 { return ErrNoBlockSize } skipped := int(d.cr.offset() - mark) // Read compressed data into the decompressor buffer until the // underlying flate.Reader is positioned at the end of the gzip // member in which the readMember call was made. return d.buf.readLimited(d.blockSize-skipped, d.cr) } // Offset is a BGZF virtual offset. type Offset struct { File int64 Block uint16 } // Chunk is a region of a BGZF file. type Chunk struct { Begin Offset End Offset } // Reader implements BGZF blocked gzip decompression. type Reader struct { gzip.Header r io.Reader // head serialises access to the underlying // io.Reader. head chan *countReader // lastChunk is the virtual file offset // interval of the last successful read // or seek operation. lastChunk Chunk // Blocked specifies the behaviour of the // Reader at the end of a BGZF member. // If the Reader is Blocked, a Read that // reaches the end of a BGZF block will // return io.EOF. This error is not sticky, // so a subsequent Read will progress to // the next block if it is available. Blocked bool // Non-concurrent work decompressor. dec *decompressor // Concurrent work fields. waiting chan *decompressor working chan *decompressor control chan int64 current Block // cache is the Reader block cache. If Cache is not nil, // the cache is queried for blocks before an attempt to // read from the underlying io.Reader. mu sync.RWMutex cache Cache err error } // NewReader returns a new BGZF reader. // // The number of concurrent read decompressors is specified by rd. // If rd is 0, GOMAXPROCS concurrent will be created. The returned // Reader should be closed after use to avoid leaking resources. func NewReader(r io.Reader, rd int) (*Reader, error) { if rd == 0 { rd = runtime.GOMAXPROCS(0) } bg := &Reader{ r: r, head: make(chan *countReader, 1), } bg.head <- newCountReader(r) // Make work loop control structures. if rd > 1 { bg.waiting = make(chan *decompressor, rd) bg.working = make(chan *decompressor, rd) bg.control = make(chan int64, 1) for ; rd > 1; rd-- { bg.waiting <- &decompressor{owner: bg} } } // Read the first block now so we can fail before // the first Read call if there is a problem. bg.dec = &decompressor{owner: bg} blk, err := bg.dec.nextBlockAt(0, nil).wait() if err != nil { return nil, err } bg.current = blk bg.Header = bg.current.header() // Set up work loop if rd was > 1. if bg.control != nil { bg.waiting <- bg.dec bg.dec = nil next := blk.NextBase() go func() { defer func() { bg.mu.Lock() bg.cache = nil bg.mu.Unlock() }() for dec := range bg.waiting { var open bool if next < 0 { next, open = <-bg.control if !open { return } } else { select { case next, open = <-bg.control: if !open { return } default: } } dec.nextBlockAt(next, nil) next = dec.blk.NextBase() bg.working <- dec } }() } return bg, nil } // SetCache sets the cache to be used by the Reader. func (bg *Reader) SetCache(c Cache) { bg.mu.Lock() bg.cache = c bg.mu.Unlock() } // Seek performs a seek operation to the given virtual offset. func (bg *Reader) Seek(off Offset) error { rs, ok := bg.r.(io.ReadSeeker) if !ok { return ErrNotASeeker } if off.File != bg.current.Base() || !bg.current.hasData() { ok := bg.cacheSwap(off.File) if !ok { var dec *decompressor if bg.dec != nil { dec = bg.dec } else { select { case dec = <-bg.waiting: case dec = <-bg.working: blk, err := dec.wait() if err == nil { bg.keep(blk) } } } bg.current, bg.err = dec. using(bg.current). nextBlockAt(off.File, rs). wait() if bg.dec == nil { select { case <-bg.control: default: } bg.control <- bg.current.NextBase() bg.waiting <- dec } bg.Header = bg.current.header() if bg.err != nil { return bg.err } } } bg.err = bg.current.seek(int64(off.Block)) if bg.err == nil { bg.lastChunk = Chunk{Begin: off, End: off} } return bg.err } // LastChunk returns the region of the BGZF file read by the last read // operation or the resulting virtual offset of the last successful // seek operation. func (bg *Reader) LastChunk() Chunk { return bg.lastChunk } // BlockLen returns the number of bytes remaining to be read from the // current BGZF block. func (bg *Reader) BlockLen() int { return bg.current.len() } // Close closes the reader and releases resources. func (bg *Reader) Close() error { if bg.control != nil { close(bg.control) close(bg.waiting) } if bg.err == io.EOF { return nil } return bg.err } // Read implements the io.Reader interface. func (bg *Reader) Read(p []byte) (int, error) { if bg.err != nil { return 0, bg.err } // Discard leading empty blocks. This is an indexing // optimisation to avoid retaining useless members // in a BAI/CSI. for bg.current.len() == 0 { bg.err = bg.nextBlock() if bg.err != nil { return 0, bg.err } } bg.lastChunk.Begin = bg.current.txOffset() var n int for n < len(p) && bg.err == nil { var _n int _n, bg.err = bg.current.Read(p[n:]) n += _n if bg.err == io.EOF { if n == len(p) { bg.err = nil break } if bg.Blocked { bg.err = nil bg.lastChunk.End = bg.current.txOffset() return n, io.EOF } bg.err = bg.nextBlock() if bg.err != nil { break } } } bg.lastChunk.End = bg.current.txOffset() return n, bg.err } // nextBlock swaps the current decompressed block for the next // in the stream. If the block is available from the cache // no additional work is done, otherwise a decompressor is // used or waited on. func (bg *Reader) nextBlock() error { base := bg.current.NextBase() ok := bg.cacheSwap(base) if ok { bg.Header = bg.current.header() return nil } var err error if bg.dec != nil { bg.dec.using(bg.current).nextBlockAt(base, nil) bg.current, err = bg.dec.wait() } else { var ok bool for i := 0; i < cap(bg.working); i++ { dec := <-bg.working bg.current, err = dec.wait() bg.waiting <- dec if bg.current.Base() == base { ok = true break } if err == nil { bg.keep(bg.current) bg.current = nil } } if !ok { panic("bgzf: unexpected block") } } if err != nil { return err } // Only set header if there was no error. h := bg.current.header() if bg.current.isMagicBlock() { // TODO(kortschak): Do this more carefully. It may be that // someone actually has extra data in this field that we are // clobbering. bg.Header.Extra = h.Extra } else { bg.Header = h } return nil } // cacheSwap attempts to swap the current Block for a cached Block // for the given base offset. It returns true if successful. func (bg *Reader) cacheSwap(base int64) bool { bg.mu.RLock() defer bg.mu.RUnlock() if bg.cache == nil { return false } blk, err := bg.cachedBlockFor(base) if err != nil { return false } if blk != nil { // TODO(kortschak): Under some conditions, e.g. FIFO // cache we will be discarding a non-nil evicted Block. // Consider retaining these in a sync.Pool. bg.cachePut(bg.current) bg.current = blk return true } var retained bool bg.current, retained = bg.cachePut(bg.current) if retained { bg.current = nil } return false } // cacheHasBlockFor returns whether the Reader's cache has a block // for the given base offset. If the requested Block exists, the base // offset of the following Block is returned. func (bg *Reader) cacheHasBlockFor(base int64) (exists bool, next int64) { bg.mu.RLock() defer bg.mu.RUnlock() if bg.cache == nil { return false, -1 } return bg.cache.Peek(base) } // cachedBlockFor returns a non-nil Block if the Reader has access to a // cache and the cache holds the block with the given base and the // correct owner, otherwise it returns nil. If the Block's owner is not // correct, or the Block cannot seek to the start of its data, a non-nil // error is returned. func (bg *Reader) cachedBlockFor(base int64) (Block, error) { blk := bg.cache.Get(base) if blk != nil { if !blk.ownedBy(bg) { return nil, ErrContaminatedCache } err := blk.seek(0) if err != nil { return nil, err } } return blk, nil } // cachePut puts the given Block into the cache if it exists, it returns // the Block that was evicted or b if it was not retained, and whether // the Block was retained by the cache. func (bg *Reader) cachePut(b Block) (evicted Block, retained bool) { if b == nil || !b.hasData() { return b, false } return bg.cache.Put(b) } // keep puts the given Block into the cache if it exists. func (bg *Reader) keep(b Block) { if b == nil || !b.hasData() { return } bg.mu.RLock() defer bg.mu.RUnlock() if bg.cache != nil { bg.cache.Put(b) } } // Begin returns a Tx that starts at the current virtual offset. func (bg *Reader) Begin() Tx { return Tx{begin: bg.lastChunk.Begin, r: bg} } // Tx represents a multi-read transaction. type Tx struct { begin Offset r *Reader } // End returns the Chunk spanning the transaction. After return the Tx is // no longer valid. func (t *Tx) End() Chunk { c := Chunk{Begin: t.begin, End: t.r.lastChunk.End} t.r = nil return c }