283 lines
5.3 KiB
Go
283 lines
5.3 KiB
Go
|
// Copyright ©2012 The bíogo Authors. All rights reserved.
|
||
|
// Use of this source code is governed by a BSD-style
|
||
|
// license that can be found in the LICENSE file.
|
||
|
|
||
|
package bgzf
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"compress/gzip"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
// Writer implements BGZF blocked gzip compression.
|
||
|
type Writer struct {
|
||
|
gzip.Header
|
||
|
w io.Writer
|
||
|
|
||
|
active *compressor
|
||
|
|
||
|
queue chan *compressor
|
||
|
qwg sync.WaitGroup
|
||
|
|
||
|
waiting chan *compressor
|
||
|
|
||
|
wg sync.WaitGroup
|
||
|
|
||
|
closed bool
|
||
|
|
||
|
m sync.Mutex
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
// NewWriter returns a new Writer. Writes to the returned writer are
|
||
|
// compressed and written to w.
|
||
|
//
|
||
|
// The number of concurrent write compressors is specified by wc.
|
||
|
func NewWriter(w io.Writer, wc int) *Writer {
|
||
|
bg, _ := NewWriterLevel(w, gzip.DefaultCompression, wc)
|
||
|
return bg
|
||
|
}
|
||
|
|
||
|
// NewWriterLevel returns a new Writer using the specified compression level
|
||
|
// instead of gzip.DefaultCompression. Allowable level options are integer
|
||
|
// values between between gzip.BestSpeed and gzip.BestCompression inclusive.
|
||
|
//
|
||
|
// The number of concurrent write compressors is specified by wc.
|
||
|
func NewWriterLevel(w io.Writer, level, wc int) (*Writer, error) {
|
||
|
if level < gzip.DefaultCompression || level > gzip.BestCompression {
|
||
|
return nil, fmt.Errorf("bgzf: invalid compression level: %d", level)
|
||
|
}
|
||
|
wc++ // We count one for the active compressor.
|
||
|
if wc < 2 {
|
||
|
wc = 2
|
||
|
}
|
||
|
bg := &Writer{
|
||
|
w: w,
|
||
|
waiting: make(chan *compressor, wc),
|
||
|
queue: make(chan *compressor, wc),
|
||
|
}
|
||
|
|
||
|
c := make([]compressor, wc)
|
||
|
for i := range c {
|
||
|
c[i].Header = &bg.Header
|
||
|
c[i].level = level
|
||
|
c[i].waiting = bg.waiting
|
||
|
c[i].flush = make(chan *compressor, 1)
|
||
|
c[i].qwg = &bg.qwg
|
||
|
bg.waiting <- &c[i]
|
||
|
}
|
||
|
bg.active = <-bg.waiting
|
||
|
|
||
|
bg.wg.Add(1)
|
||
|
go func() {
|
||
|
defer bg.wg.Done()
|
||
|
for qw := range bg.queue {
|
||
|
if !writeOK(bg, <-qw.flush) {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return bg, nil
|
||
|
}
|
||
|
|
||
|
func writeOK(bg *Writer, c *compressor) bool {
|
||
|
defer func() { bg.waiting <- c }()
|
||
|
|
||
|
if c.err != nil {
|
||
|
bg.setErr(c.err)
|
||
|
return false
|
||
|
}
|
||
|
if c.buf.Len() == 0 {
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
_, err := io.Copy(bg.w, &c.buf)
|
||
|
bg.qwg.Done()
|
||
|
if err != nil {
|
||
|
bg.setErr(err)
|
||
|
return false
|
||
|
}
|
||
|
c.next = 0
|
||
|
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
type compressor struct {
|
||
|
*gzip.Header
|
||
|
gz *gzip.Writer
|
||
|
level int
|
||
|
|
||
|
next int
|
||
|
block [BlockSize]byte
|
||
|
buf bytes.Buffer
|
||
|
|
||
|
flush chan *compressor
|
||
|
qwg *sync.WaitGroup
|
||
|
|
||
|
waiting chan *compressor
|
||
|
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
func (c *compressor) writeBlock() {
|
||
|
defer func() { c.flush <- c }()
|
||
|
|
||
|
if c.gz == nil {
|
||
|
c.gz, c.err = gzip.NewWriterLevel(&c.buf, c.level)
|
||
|
if c.err != nil {
|
||
|
return
|
||
|
}
|
||
|
} else {
|
||
|
c.gz.Reset(&c.buf)
|
||
|
}
|
||
|
c.gz.Header = gzip.Header{
|
||
|
Comment: c.Comment,
|
||
|
Extra: append([]byte(bgzfExtra), c.Extra...),
|
||
|
ModTime: c.ModTime,
|
||
|
Name: c.Name,
|
||
|
OS: c.OS,
|
||
|
}
|
||
|
|
||
|
_, c.err = c.gz.Write(c.block[:c.next])
|
||
|
if c.err != nil {
|
||
|
return
|
||
|
}
|
||
|
c.err = c.gz.Close()
|
||
|
if c.err != nil {
|
||
|
return
|
||
|
}
|
||
|
c.next = 0
|
||
|
|
||
|
b := c.buf.Bytes()
|
||
|
i := bytes.Index(b, bgzfExtraPrefix)
|
||
|
if i < 0 {
|
||
|
c.err = gzip.ErrHeader
|
||
|
return
|
||
|
}
|
||
|
size := len(b) - 1
|
||
|
if size >= MaxBlockSize {
|
||
|
c.err = ErrBlockOverflow
|
||
|
return
|
||
|
}
|
||
|
b[i+4], b[i+5] = byte(size), byte(size>>8)
|
||
|
}
|
||
|
|
||
|
// Next returns the index of the start of the next write within the
|
||
|
// decompressed data block.
|
||
|
func (bg *Writer) Next() (int, error) {
|
||
|
if bg.closed {
|
||
|
return 0, ErrClosed
|
||
|
}
|
||
|
if err := bg.Error(); err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
|
||
|
return bg.active.next, nil
|
||
|
}
|
||
|
|
||
|
// Write writes the compressed form of b to the underlying io.Writer.
|
||
|
// Decompressed data blocks are limited to BlockSize, so individual
|
||
|
// byte slices may span block boundaries, however the Writer attempts
|
||
|
// to keep each write within a single data block.
|
||
|
func (bg *Writer) Write(b []byte) (int, error) {
|
||
|
if bg.closed {
|
||
|
return 0, ErrClosed
|
||
|
}
|
||
|
err := bg.Error()
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
|
||
|
c := bg.active
|
||
|
var n int
|
||
|
for ; len(b) > 0 && err == nil; err = bg.Error() {
|
||
|
var _n int
|
||
|
if c.next == 0 || c.next+len(b) <= len(c.block) {
|
||
|
_n = copy(c.block[c.next:], b)
|
||
|
b = b[_n:]
|
||
|
c.next += _n
|
||
|
n += _n
|
||
|
}
|
||
|
|
||
|
if c.next == len(c.block) || _n == 0 {
|
||
|
bg.queue <- c
|
||
|
bg.qwg.Add(1)
|
||
|
go c.writeBlock()
|
||
|
c = <-bg.waiting
|
||
|
}
|
||
|
}
|
||
|
bg.active = c
|
||
|
|
||
|
return n, bg.Error()
|
||
|
}
|
||
|
|
||
|
// Flush writes unwritten data to the underlying io.Writer. Flush does not block.
|
||
|
func (bg *Writer) Flush() error {
|
||
|
if bg.closed {
|
||
|
return ErrClosed
|
||
|
}
|
||
|
if err := bg.Error(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if bg.active.next == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
var c *compressor
|
||
|
c, bg.active = bg.active, <-bg.waiting
|
||
|
bg.queue <- c
|
||
|
bg.qwg.Add(1)
|
||
|
go c.writeBlock()
|
||
|
|
||
|
return bg.Error()
|
||
|
}
|
||
|
|
||
|
// Wait waits for all pending writes to complete and returns the subsequent
|
||
|
// error state of the Writer.
|
||
|
func (bg *Writer) Wait() error {
|
||
|
if err := bg.Error(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
bg.qwg.Wait()
|
||
|
return bg.Error()
|
||
|
}
|
||
|
|
||
|
// Error returns the error state of the Writer.
|
||
|
func (bg *Writer) Error() error {
|
||
|
bg.m.Lock()
|
||
|
defer bg.m.Unlock()
|
||
|
return bg.err
|
||
|
}
|
||
|
|
||
|
func (bg *Writer) setErr(err error) {
|
||
|
bg.m.Lock()
|
||
|
defer bg.m.Unlock()
|
||
|
if bg.err == nil {
|
||
|
bg.err = err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close closes the Writer, waiting for any pending writes before returning
|
||
|
// the final error of the Writer.
|
||
|
func (bg *Writer) Close() error {
|
||
|
if !bg.closed {
|
||
|
c := bg.active
|
||
|
bg.queue <- c
|
||
|
bg.qwg.Add(1)
|
||
|
<-bg.waiting
|
||
|
c.writeBlock()
|
||
|
bg.closed = true
|
||
|
close(bg.queue)
|
||
|
bg.wg.Wait()
|
||
|
if bg.err == nil {
|
||
|
_, bg.err = bg.w.Write([]byte(magicBlock))
|
||
|
}
|
||
|
}
|
||
|
return bg.err
|
||
|
}
|