post-processor/compress: add bgzf format support
* add bgzf compressor (it allows seek inside compressed file) * add optional format config variable to specify archive format * Update pgzip to get sse4 and avx optimizations. Signed-off-by: Vasiliy Tolstov <v.tolstov@selfip.ru>
This commit is contained in:
parent
d6764ff37a
commit
6a5f6938ef
|
@ -9,17 +9,9 @@ const BuilderId = "packer.post-processor.compress"
|
|||
|
||||
type Artifact struct {
|
||||
Path string
|
||||
Provider string
|
||||
files []string
|
||||
}
|
||||
|
||||
func NewArtifact(provider, path string) *Artifact {
|
||||
return &Artifact{
|
||||
Path: path,
|
||||
Provider: provider,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Artifact) BuilderId() string {
|
||||
return BuilderId
|
||||
}
|
||||
|
@ -33,7 +25,7 @@ func (a *Artifact) Files() []string {
|
|||
}
|
||||
|
||||
func (a *Artifact) String() string {
|
||||
return fmt.Sprintf("'%s' compressing: %s", a.Provider, a.Path)
|
||||
return fmt.Sprintf("compressed artifacts in: %s", a.Path)
|
||||
}
|
||||
|
||||
func (*Artifact) State(name string) interface{} {
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"regexp"
|
||||
"runtime"
|
||||
|
||||
"github.com/biogo/hts/bgzf"
|
||||
"github.com/klauspost/pgzip"
|
||||
"github.com/mitchellh/packer/common"
|
||||
"github.com/mitchellh/packer/helper/config"
|
||||
|
@ -36,6 +37,7 @@ type Config struct {
|
|||
|
||||
// Fields from config file
|
||||
OutputPath string `mapstructure:"output"`
|
||||
Format string `mapstructure:"format"`
|
||||
CompressionLevel int `mapstructure:"compression_level"`
|
||||
KeepInputArtifact bool `mapstructure:"keep_input_artifact"`
|
||||
|
||||
|
@ -115,6 +117,10 @@ func (p *PostProcessor) PostProcess(ui packer.Ui, artifact packer.Artifact) (pac
|
|||
keep := p.config.KeepInputArtifact
|
||||
newArtifact := &Artifact{Path: target}
|
||||
|
||||
if err = os.MkdirAll(filepath.Dir(target), os.FileMode(0755)); err != nil {
|
||||
return nil, false, fmt.Errorf(
|
||||
"Unable to create dir for archive %s: %s", target, err)
|
||||
}
|
||||
outputFile, err := os.Create(target)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf(
|
||||
|
@ -126,6 +132,11 @@ func (p *PostProcessor) PostProcess(ui packer.Ui, artifact packer.Artifact) (pac
|
|||
// compression writer. Otherwise it's just a file.
|
||||
var output io.WriteCloser
|
||||
switch p.config.Algorithm {
|
||||
case "bgzf":
|
||||
ui.Say(fmt.Sprintf("Using bgzf compression with %d cores for %s",
|
||||
runtime.GOMAXPROCS(-1), target))
|
||||
output, err = makeBGZFWriter(outputFile, p.config.CompressionLevel)
|
||||
defer output.Close()
|
||||
case "lz4":
|
||||
ui.Say(fmt.Sprintf("Using lz4 compression with %d cores for %s",
|
||||
runtime.GOMAXPROCS(-1), target))
|
||||
|
@ -190,15 +201,21 @@ func (p *PostProcessor) PostProcess(ui packer.Ui, artifact packer.Artifact) (pac
|
|||
}
|
||||
|
||||
func (config *Config) detectFromFilename() {
|
||||
var result [][]string
|
||||
|
||||
extensions := map[string]string{
|
||||
"tar": "tar",
|
||||
"zip": "zip",
|
||||
"gz": "pgzip",
|
||||
"lz4": "lz4",
|
||||
"bgzf": "bgzf",
|
||||
}
|
||||
|
||||
result := filenamePattern.FindAllStringSubmatch(config.OutputPath, -1)
|
||||
if config.Format == "" {
|
||||
result = filenamePattern.FindAllStringSubmatch(config.OutputPath, -1)
|
||||
} else {
|
||||
result = filenamePattern.FindAllStringSubmatch(fmt.Sprintf("%s.%s", config.OutputPath, config.Format), -1)
|
||||
}
|
||||
|
||||
// No dots. Bail out with defaults.
|
||||
if len(result) == 0 {
|
||||
|
@ -240,6 +257,14 @@ func (config *Config) detectFromFilename() {
|
|||
return
|
||||
}
|
||||
|
||||
func makeBGZFWriter(output io.WriteCloser, compressionLevel int) (io.WriteCloser, error) {
|
||||
bgzfWriter, err := bgzf.NewWriterLevel(output, compressionLevel, runtime.GOMAXPROCS(-1))
|
||||
if err != nil {
|
||||
return nil, ErrInvalidCompressionLevel
|
||||
}
|
||||
return bgzfWriter, nil
|
||||
}
|
||||
|
||||
func makeLZ4Writer(output io.WriteCloser, compressionLevel int) (io.WriteCloser, error) {
|
||||
lzwriter := lz4.NewWriter(output)
|
||||
if compressionLevel > gzip.DefaultCompression {
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
Copyright ©2012 The bíogo Authors. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
* Neither the name of the bíogo project nor the names of its authors and
|
||||
contributors may be used to endorse or promote products derived from this
|
||||
software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
@ -0,0 +1,104 @@
|
|||
// Copyright ©2012 The bíogo Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package bgzf implements BGZF format reading and writing according to the
|
||||
// SAM specification.
|
||||
//
|
||||
// The specification is available at https://github.com/samtools/hts-specs.
|
||||
package bgzf
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
BlockSize = 0x0ff00 // The maximum size of an uncompressed input data block.
|
||||
MaxBlockSize = 0x10000 // The maximum size of a compressed output block.
|
||||
)
|
||||
|
||||
const (
|
||||
bgzfExtra = "BC\x02\x00\x00\x00"
|
||||
minFrame = 20 + len(bgzfExtra) // Minimum bgzf header+footer length.
|
||||
|
||||
// Magic EOF block.
|
||||
magicBlock = "\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
)
|
||||
|
||||
var (
|
||||
bgzfExtraPrefix = []byte(bgzfExtra[:4])
|
||||
unixEpoch = time.Unix(0, 0)
|
||||
)
|
||||
|
||||
func compressBound(srcLen int) int {
|
||||
return srcLen + srcLen>>12 + srcLen>>14 + srcLen>>25 + 13 + minFrame
|
||||
}
|
||||
|
||||
func init() {
|
||||
if compressBound(BlockSize) > MaxBlockSize {
|
||||
panic("bam: BlockSize too large")
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
ErrClosed = errors.New("bgzf: use of closed writer")
|
||||
ErrBlockOverflow = errors.New("bgzf: block overflow")
|
||||
ErrWrongFileType = errors.New("bgzf: file is a directory")
|
||||
ErrNoEnd = errors.New("bgzf: cannot determine offset from end")
|
||||
ErrNotASeeker = errors.New("bgzf: not a seeker")
|
||||
ErrContaminatedCache = errors.New("bgzf: cache owner mismatch")
|
||||
ErrNoBlockSize = errors.New("bgzf: could not determine block size")
|
||||
ErrBlockSizeMismatch = errors.New("bgzf: unexpected block size")
|
||||
)
|
||||
|
||||
// HasEOF checks for the presence of a BGZF magic EOF block.
|
||||
// The magic block is defined in the SAM specification. A magic block
|
||||
// is written by a Writer on calling Close. The ReaderAt must provide
|
||||
// some method for determining valid ReadAt offsets.
|
||||
func HasEOF(r io.ReaderAt) (bool, error) {
|
||||
type sizer interface {
|
||||
Size() int64
|
||||
}
|
||||
type stater interface {
|
||||
Stat() (os.FileInfo, error)
|
||||
}
|
||||
type lenSeeker interface {
|
||||
io.Seeker
|
||||
Len() int
|
||||
}
|
||||
var size int64
|
||||
switch r := r.(type) {
|
||||
case sizer:
|
||||
size = r.Size()
|
||||
case stater:
|
||||
fi, err := r.Stat()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
size = fi.Size()
|
||||
case lenSeeker:
|
||||
var err error
|
||||
size, err = r.Seek(0, 1)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
size += int64(r.Len())
|
||||
default:
|
||||
return false, ErrNoEnd
|
||||
}
|
||||
|
||||
b := make([]byte, len(magicBlock))
|
||||
_, err := r.ReadAt(b, size-int64(len(magicBlock)))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for i, c := range b {
|
||||
if c != magicBlock[i] {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
|
@ -0,0 +1,196 @@
|
|||
// Copyright ©2012 The bíogo Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package bgzf
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"io"
|
||||
)
|
||||
|
||||
// Cache is a Block caching type. Basic cache implementations are provided
|
||||
// in the cache package. A Cache must be safe for concurrent use.
|
||||
//
|
||||
// If a Cache is a Wrapper, its Wrap method is called on newly created blocks.
|
||||
type Cache interface {
|
||||
// Get returns the Block in the Cache with the specified
|
||||
// base or a nil Block if it does not exist. The returned
|
||||
// Block must be removed from the Cache.
|
||||
Get(base int64) Block
|
||||
|
||||
// Put inserts a Block into the Cache, returning the Block
|
||||
// that was evicted or nil if no eviction was necessary and
|
||||
// a boolean indicating whether the put Block was retained
|
||||
// by the Cache.
|
||||
Put(Block) (evicted Block, retained bool)
|
||||
|
||||
// Peek returns whether a Block exists in the cache for the
|
||||
// given base. If a Block satisfies the request, then exists
|
||||
// is returned as true with the offset for the next Block in
|
||||
// the stream, otherwise false and -1.
|
||||
Peek(base int64) (exists bool, next int64)
|
||||
}
|
||||
|
||||
// Wrapper defines Cache types that need to modify a Block at its creation.
|
||||
type Wrapper interface {
|
||||
Wrap(Block) Block
|
||||
}
|
||||
|
||||
// Block wraps interaction with decompressed BGZF data blocks.
|
||||
type Block interface {
|
||||
// Base returns the file offset of the start of
|
||||
// the gzip member from which the Block data was
|
||||
// decompressed.
|
||||
Base() int64
|
||||
|
||||
io.Reader
|
||||
|
||||
// Used returns whether one or more bytes have
|
||||
// been read from the Block.
|
||||
Used() bool
|
||||
|
||||
// header returns the gzip.Header of the gzip member
|
||||
// from which the Block data was decompressed.
|
||||
header() gzip.Header
|
||||
|
||||
// isMagicBlock returns whether the Block is a BGZF
|
||||
// magic EOF marker block.
|
||||
isMagicBlock() bool
|
||||
|
||||
// ownedBy returns whether the Block is owned by
|
||||
// the given Reader.
|
||||
ownedBy(*Reader) bool
|
||||
|
||||
// setOwner changes the owner to the given Reader,
|
||||
// reseting other data to its zero state.
|
||||
setOwner(*Reader)
|
||||
|
||||
// hasData returns whether the Block has read data.
|
||||
hasData() bool
|
||||
|
||||
// The following are unexported equivalents
|
||||
// of the io interfaces. seek is limited to
|
||||
// the file origin offset case and does not
|
||||
// return the new offset.
|
||||
seek(offset int64) error
|
||||
readFrom(io.ReadCloser) error
|
||||
|
||||
// len returns the number of remaining
|
||||
// bytes that can be read from the Block.
|
||||
len() int
|
||||
|
||||
// setBase sets the file offset of the start
|
||||
// and of the gzip member that the Block data
|
||||
// was decompressed from.
|
||||
setBase(int64)
|
||||
|
||||
// NextBase returns the expected position of the next
|
||||
// BGZF block. It returns -1 if the Block is not valid.
|
||||
NextBase() int64
|
||||
|
||||
// setHeader sets the file header of of the gzip
|
||||
// member that the Block data was decompressed from.
|
||||
setHeader(gzip.Header)
|
||||
|
||||
// txOffset returns the current vitual offset.
|
||||
txOffset() Offset
|
||||
}
|
||||
|
||||
type block struct {
|
||||
owner *Reader
|
||||
used bool
|
||||
|
||||
base int64
|
||||
h gzip.Header
|
||||
magic bool
|
||||
|
||||
offset Offset
|
||||
|
||||
buf *bytes.Reader
|
||||
data [MaxBlockSize]byte
|
||||
}
|
||||
|
||||
func (b *block) Base() int64 { return b.base }
|
||||
|
||||
func (b *block) Used() bool { return b.used }
|
||||
|
||||
func (b *block) Read(p []byte) (int, error) {
|
||||
n, err := b.buf.Read(p)
|
||||
b.offset.Block += uint16(n)
|
||||
if n > 0 {
|
||||
b.used = true
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (b *block) readFrom(r io.ReadCloser) error {
|
||||
o := b.owner
|
||||
b.owner = nil
|
||||
buf := bytes.NewBuffer(b.data[:0])
|
||||
_, err := io.Copy(buf, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.buf = bytes.NewReader(buf.Bytes())
|
||||
b.owner = o
|
||||
b.magic = b.magic && b.len() == 0
|
||||
return r.Close()
|
||||
}
|
||||
|
||||
func (b *block) seek(offset int64) error {
|
||||
_, err := b.buf.Seek(offset, 0)
|
||||
if err == nil {
|
||||
b.offset.Block = uint16(offset)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *block) len() int {
|
||||
if b.buf == nil {
|
||||
return 0
|
||||
}
|
||||
return b.buf.Len()
|
||||
}
|
||||
|
||||
func (b *block) setBase(n int64) {
|
||||
b.base = n
|
||||
b.offset = Offset{File: n}
|
||||
}
|
||||
|
||||
func (b *block) NextBase() int64 {
|
||||
size := int64(expectedMemberSize(b.h))
|
||||
if size == -1 {
|
||||
return -1
|
||||
}
|
||||
return b.base + size
|
||||
}
|
||||
|
||||
func (b *block) setHeader(h gzip.Header) {
|
||||
b.h = h
|
||||
b.magic = h.OS == 0xff &&
|
||||
h.ModTime.Equal(unixEpoch) &&
|
||||
h.Name == "" &&
|
||||
h.Comment == "" &&
|
||||
bytes.Equal(h.Extra, []byte("BC\x02\x00\x1b\x00"))
|
||||
}
|
||||
|
||||
func (b *block) header() gzip.Header { return b.h }
|
||||
|
||||
func (b *block) isMagicBlock() bool { return b.magic }
|
||||
|
||||
func (b *block) setOwner(r *Reader) {
|
||||
b.owner = r
|
||||
b.used = false
|
||||
b.base = -1
|
||||
b.h = gzip.Header{}
|
||||
b.offset = Offset{}
|
||||
b.buf = nil
|
||||
}
|
||||
|
||||
func (b *block) ownedBy(r *Reader) bool { return b.owner == r }
|
||||
|
||||
func (b *block) hasData() bool { return b.buf != nil }
|
||||
|
||||
func (b *block) txOffset() Offset { return b.offset }
|
|
@ -0,0 +1,703 @@
|
|||
// Copyright ©2012 The bíogo Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package bgzf
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"compress/flate"
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"runtime"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// countReader wraps flate.Reader, adding support for querying current offset.
|
||||
type countReader struct {
|
||||
// Underlying Reader.
|
||||
fr flate.Reader
|
||||
|
||||
// Offset within the underlying reader.
|
||||
off int64
|
||||
}
|
||||
|
||||
// newCountReader returns a new countReader.
|
||||
func newCountReader(r io.Reader) *countReader {
|
||||
switch r := r.(type) {
|
||||
case *countReader:
|
||||
panic("bgzf: illegal use of internal type")
|
||||
case flate.Reader:
|
||||
return &countReader{fr: r}
|
||||
default:
|
||||
return &countReader{fr: bufio.NewReader(r)}
|
||||
}
|
||||
}
|
||||
|
||||
// Read is required to satisfy flate.Reader.
|
||||
func (r *countReader) Read(p []byte) (int, error) {
|
||||
n, err := r.fr.Read(p)
|
||||
r.off += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// ReadByte is required to satisfy flate.Reader.
|
||||
func (r *countReader) ReadByte() (byte, error) {
|
||||
b, err := r.fr.ReadByte()
|
||||
if err == nil {
|
||||
r.off++
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
// offset returns the current offset in the underlying reader.
|
||||
func (r *countReader) offset() int64 { return r.off }
|
||||
|
||||
// seek moves the countReader to the specified offset using rs as the
|
||||
// underlying reader.
|
||||
func (r *countReader) seek(rs io.ReadSeeker, off int64) error {
|
||||
_, err := rs.Seek(off, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
type reseter interface {
|
||||
Reset(io.Reader)
|
||||
}
|
||||
switch cr := r.fr.(type) {
|
||||
case reseter:
|
||||
cr.Reset(rs)
|
||||
default:
|
||||
r.fr = newCountReader(rs)
|
||||
}
|
||||
r.off = off
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// buffer is a flate.Reader used by a decompressor to store read-ahead data.
|
||||
type buffer struct {
|
||||
// Buffered compressed data from read ahead.
|
||||
off int // Current position in buffered data.
|
||||
size int // Total size of buffered data.
|
||||
data [MaxBlockSize]byte
|
||||
}
|
||||
|
||||
// Read provides the flate.Decompressor Read method.
|
||||
func (r *buffer) Read(b []byte) (int, error) {
|
||||
if r.off >= r.size {
|
||||
return 0, io.EOF
|
||||
}
|
||||
if n := r.size - r.off; len(b) > n {
|
||||
b = b[:n]
|
||||
}
|
||||
n := copy(b, r.data[r.off:])
|
||||
r.off += n
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// ReadByte provides the flate.Decompressor ReadByte method.
|
||||
func (r *buffer) ReadByte() (byte, error) {
|
||||
if r.off == r.size {
|
||||
return 0, io.EOF
|
||||
}
|
||||
b := r.data[r.off]
|
||||
r.off++
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// reset makes the buffer available to store data.
|
||||
func (r *buffer) reset() { r.size = 0 }
|
||||
|
||||
// hasData returns whether the buffer has any data buffered.
|
||||
func (r *buffer) hasData() bool { return r.size != 0 }
|
||||
|
||||
// readLimited reads n bytes into the buffer from the given source.
|
||||
func (r *buffer) readLimited(n int, src *countReader) error {
|
||||
if r.hasData() {
|
||||
panic("bgzf: read into non-empty buffer")
|
||||
}
|
||||
r.off = 0
|
||||
var err error
|
||||
r.size, err = io.ReadFull(src, r.data[:n])
|
||||
return err
|
||||
}
|
||||
|
||||
// equals returns a boolean indicating the equality between
|
||||
// the buffered data and the given byte slice.
|
||||
func (r *buffer) equals(b []byte) bool { return bytes.Equal(r.data[:r.size], b) }
|
||||
|
||||
// decompressor is a gzip member decompressor worker.
|
||||
type decompressor struct {
|
||||
owner *Reader
|
||||
|
||||
gz gzip.Reader
|
||||
|
||||
cr *countReader
|
||||
|
||||
// Current block size.
|
||||
blockSize int
|
||||
|
||||
// Buffered compressed data from read ahead.
|
||||
buf buffer
|
||||
|
||||
// Decompressed data.
|
||||
wg sync.WaitGroup
|
||||
blk Block
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
// Read provides the Read method for the decompressor's gzip.Reader.
|
||||
func (d *decompressor) Read(b []byte) (int, error) {
|
||||
if d.buf.hasData() {
|
||||
return d.buf.Read(b)
|
||||
}
|
||||
return d.cr.Read(b)
|
||||
}
|
||||
|
||||
// ReadByte provides the ReadByte method for the decompressor's gzip.Reader.
|
||||
func (d *decompressor) ReadByte() (byte, error) {
|
||||
if d.buf.hasData() {
|
||||
return d.buf.ReadByte()
|
||||
}
|
||||
return d.cr.ReadByte()
|
||||
}
|
||||
|
||||
// lazyBlock conditionally creates a ready to use Block.
|
||||
func (d *decompressor) lazyBlock() {
|
||||
if d.blk == nil {
|
||||
if w, ok := d.owner.cache.(Wrapper); ok {
|
||||
d.blk = w.Wrap(&block{owner: d.owner})
|
||||
} else {
|
||||
d.blk = &block{owner: d.owner}
|
||||
}
|
||||
return
|
||||
}
|
||||
if !d.blk.ownedBy(d.owner) {
|
||||
d.blk.setOwner(d.owner)
|
||||
}
|
||||
}
|
||||
|
||||
// acquireHead gains the read head from the decompressor's owner.
|
||||
func (d *decompressor) acquireHead() {
|
||||
d.wg.Add(1)
|
||||
d.cr = <-d.owner.head
|
||||
}
|
||||
|
||||
// releaseHead releases the read head back to the decompressor's owner.
|
||||
func (d *decompressor) releaseHead() {
|
||||
d.owner.head <- d.cr
|
||||
d.cr = nil // Defensively zero the reader.
|
||||
}
|
||||
|
||||
// wait waits for the current member to be decompressed or fail, and returns
|
||||
// the resulting error state.
|
||||
func (d *decompressor) wait() (Block, error) {
|
||||
d.wg.Wait()
|
||||
blk := d.blk
|
||||
d.blk = nil
|
||||
return blk, d.err
|
||||
}
|
||||
|
||||
// using sets the Block for the decompressor to work with.
|
||||
func (d *decompressor) using(b Block) *decompressor { d.blk = b; return d }
|
||||
|
||||
// nextBlockAt makes the decompressor ready for reading decompressed data
|
||||
// from its Block. It checks if there is a cached Block for the nextBase,
|
||||
// otherwise it seeks to the correct location if decompressor is not
|
||||
// correctly positioned, and then reads the compressed data and fills
|
||||
// the decompressed Block.
|
||||
// After nextBlockAt returns without error, the decompressor's Block
|
||||
// holds a valid gzip.Header and base offset.
|
||||
func (d *decompressor) nextBlockAt(off int64, rs io.ReadSeeker) *decompressor {
|
||||
d.err = nil
|
||||
for {
|
||||
exists, next := d.owner.cacheHasBlockFor(off)
|
||||
if !exists {
|
||||
break
|
||||
}
|
||||
off = next
|
||||
}
|
||||
|
||||
d.lazyBlock()
|
||||
|
||||
d.acquireHead()
|
||||
defer d.releaseHead()
|
||||
|
||||
if d.cr.offset() != off {
|
||||
if rs == nil {
|
||||
// It should not be possible for the expected next block base
|
||||
// to be out of register with the count reader unless Seek
|
||||
// has been called, so we know the base reader must be an
|
||||
// io.ReadSeeker.
|
||||
var ok bool
|
||||
rs, ok = d.owner.r.(io.ReadSeeker)
|
||||
if !ok {
|
||||
panic("bgzf: unexpected offset without seek")
|
||||
}
|
||||
}
|
||||
d.err = d.cr.seek(rs, off)
|
||||
if d.err != nil {
|
||||
d.wg.Done()
|
||||
return d
|
||||
}
|
||||
}
|
||||
|
||||
d.blk.setBase(d.cr.offset())
|
||||
d.err = d.readMember()
|
||||
if d.err != nil {
|
||||
d.wg.Done()
|
||||
return d
|
||||
}
|
||||
d.blk.setHeader(d.gz.Header)
|
||||
d.gz.Header = gzip.Header{} // Prevent retention of header field in next use.
|
||||
|
||||
// Decompress data into the decompressor's Block.
|
||||
go func() {
|
||||
d.err = d.blk.readFrom(&d.gz)
|
||||
d.wg.Done()
|
||||
}()
|
||||
|
||||
return d
|
||||
}
|
||||
|
||||
// expectedMemberSize returns the size of the BGZF conformant gzip member.
|
||||
// It returns -1 if no BGZF block size field is found.
|
||||
func expectedMemberSize(h gzip.Header) int {
|
||||
i := bytes.Index(h.Extra, bgzfExtraPrefix)
|
||||
if i < 0 || i+5 >= len(h.Extra) {
|
||||
return -1
|
||||
}
|
||||
return (int(h.Extra[i+4]) | int(h.Extra[i+5])<<8) + 1
|
||||
}
|
||||
|
||||
// readMember buffers the gzip member starting the current decompressor offset.
|
||||
func (d *decompressor) readMember() error {
|
||||
// Set the decompressor to Read from the underlying flate.Reader
|
||||
// and mark the starting offset from which the underlying reader
|
||||
// was used.
|
||||
d.buf.reset()
|
||||
mark := d.cr.offset()
|
||||
|
||||
err := d.gz.Reset(d)
|
||||
if err != nil {
|
||||
d.blockSize = -1
|
||||
return err
|
||||
}
|
||||
|
||||
d.blockSize = expectedMemberSize(d.gz.Header)
|
||||
if d.blockSize < 0 {
|
||||
return ErrNoBlockSize
|
||||
}
|
||||
skipped := int(d.cr.offset() - mark)
|
||||
|
||||
// Read compressed data into the decompressor buffer until the
|
||||
// underlying flate.Reader is positioned at the end of the gzip
|
||||
// member in which the readMember call was made.
|
||||
return d.buf.readLimited(d.blockSize-skipped, d.cr)
|
||||
}
|
||||
|
||||
// Offset is a BGZF virtual offset.
|
||||
type Offset struct {
|
||||
File int64
|
||||
Block uint16
|
||||
}
|
||||
|
||||
// Chunk is a region of a BGZF file.
|
||||
type Chunk struct {
|
||||
Begin Offset
|
||||
End Offset
|
||||
}
|
||||
|
||||
// Reader implements BGZF blocked gzip decompression.
|
||||
type Reader struct {
|
||||
gzip.Header
|
||||
r io.Reader
|
||||
|
||||
// head serialises access to the underlying
|
||||
// io.Reader.
|
||||
head chan *countReader
|
||||
|
||||
// lastChunk is the virtual file offset
|
||||
// interval of the last successful read
|
||||
// or seek operation.
|
||||
lastChunk Chunk
|
||||
|
||||
// Blocked specifies the behaviour of the
|
||||
// Reader at the end of a BGZF member.
|
||||
// If the Reader is Blocked, a Read that
|
||||
// reaches the end of a BGZF block will
|
||||
// return io.EOF. This error is not sticky,
|
||||
// so a subsequent Read will progress to
|
||||
// the next block if it is available.
|
||||
Blocked bool
|
||||
|
||||
// Non-concurrent work decompressor.
|
||||
dec *decompressor
|
||||
|
||||
// Concurrent work fields.
|
||||
waiting chan *decompressor
|
||||
working chan *decompressor
|
||||
control chan int64
|
||||
|
||||
current Block
|
||||
|
||||
// cache is the Reader block cache. If Cache is not nil,
|
||||
// the cache is queried for blocks before an attempt to
|
||||
// read from the underlying io.Reader.
|
||||
mu sync.RWMutex
|
||||
cache Cache
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
// NewReader returns a new BGZF reader.
|
||||
//
|
||||
// The number of concurrent read decompressors is specified by rd.
|
||||
// If rd is 0, GOMAXPROCS concurrent will be created. The returned
|
||||
// Reader should be closed after use to avoid leaking resources.
|
||||
func NewReader(r io.Reader, rd int) (*Reader, error) {
|
||||
if rd == 0 {
|
||||
rd = runtime.GOMAXPROCS(0)
|
||||
}
|
||||
bg := &Reader{
|
||||
r: r,
|
||||
|
||||
head: make(chan *countReader, 1),
|
||||
}
|
||||
bg.head <- newCountReader(r)
|
||||
|
||||
// Make work loop control structures.
|
||||
if rd > 1 {
|
||||
bg.waiting = make(chan *decompressor, rd)
|
||||
bg.working = make(chan *decompressor, rd)
|
||||
bg.control = make(chan int64, 1)
|
||||
for ; rd > 1; rd-- {
|
||||
bg.waiting <- &decompressor{owner: bg}
|
||||
}
|
||||
}
|
||||
|
||||
// Read the first block now so we can fail before
|
||||
// the first Read call if there is a problem.
|
||||
bg.dec = &decompressor{owner: bg}
|
||||
blk, err := bg.dec.nextBlockAt(0, nil).wait()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bg.current = blk
|
||||
bg.Header = bg.current.header()
|
||||
|
||||
// Set up work loop if rd was > 1.
|
||||
if bg.control != nil {
|
||||
bg.waiting <- bg.dec
|
||||
bg.dec = nil
|
||||
next := blk.NextBase()
|
||||
go func() {
|
||||
defer func() {
|
||||
bg.mu.Lock()
|
||||
bg.cache = nil
|
||||
bg.mu.Unlock()
|
||||
}()
|
||||
for dec := range bg.waiting {
|
||||
var open bool
|
||||
if next < 0 {
|
||||
next, open = <-bg.control
|
||||
if !open {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case next, open = <-bg.control:
|
||||
if !open {
|
||||
return
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
dec.nextBlockAt(next, nil)
|
||||
next = dec.blk.NextBase()
|
||||
bg.working <- dec
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return bg, nil
|
||||
}
|
||||
|
||||
// SetCache sets the cache to be used by the Reader.
|
||||
func (bg *Reader) SetCache(c Cache) {
|
||||
bg.mu.Lock()
|
||||
bg.cache = c
|
||||
bg.mu.Unlock()
|
||||
}
|
||||
|
||||
// Seek performs a seek operation to the given virtual offset.
|
||||
func (bg *Reader) Seek(off Offset) error {
|
||||
rs, ok := bg.r.(io.ReadSeeker)
|
||||
if !ok {
|
||||
return ErrNotASeeker
|
||||
}
|
||||
|
||||
if off.File != bg.current.Base() || !bg.current.hasData() {
|
||||
ok := bg.cacheSwap(off.File)
|
||||
if !ok {
|
||||
var dec *decompressor
|
||||
if bg.dec != nil {
|
||||
dec = bg.dec
|
||||
} else {
|
||||
select {
|
||||
case dec = <-bg.waiting:
|
||||
case dec = <-bg.working:
|
||||
blk, err := dec.wait()
|
||||
if err == nil {
|
||||
bg.keep(blk)
|
||||
}
|
||||
}
|
||||
}
|
||||
bg.current, bg.err = dec.
|
||||
using(bg.current).
|
||||
nextBlockAt(off.File, rs).
|
||||
wait()
|
||||
if bg.dec == nil {
|
||||
select {
|
||||
case <-bg.control:
|
||||
default:
|
||||
}
|
||||
bg.control <- bg.current.NextBase()
|
||||
bg.waiting <- dec
|
||||
}
|
||||
bg.Header = bg.current.header()
|
||||
if bg.err != nil {
|
||||
return bg.err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bg.err = bg.current.seek(int64(off.Block))
|
||||
if bg.err == nil {
|
||||
bg.lastChunk = Chunk{Begin: off, End: off}
|
||||
}
|
||||
|
||||
return bg.err
|
||||
}
|
||||
|
||||
// LastChunk returns the region of the BGZF file read by the last read
|
||||
// operation or the resulting virtual offset of the last successful
|
||||
// seek operation.
|
||||
func (bg *Reader) LastChunk() Chunk { return bg.lastChunk }
|
||||
|
||||
// BlockLen returns the number of bytes remaining to be read from the
|
||||
// current BGZF block.
|
||||
func (bg *Reader) BlockLen() int { return bg.current.len() }
|
||||
|
||||
// Close closes the reader and releases resources.
|
||||
func (bg *Reader) Close() error {
|
||||
if bg.control != nil {
|
||||
close(bg.control)
|
||||
close(bg.waiting)
|
||||
}
|
||||
if bg.err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return bg.err
|
||||
}
|
||||
|
||||
// Read implements the io.Reader interface.
|
||||
func (bg *Reader) Read(p []byte) (int, error) {
|
||||
if bg.err != nil {
|
||||
return 0, bg.err
|
||||
}
|
||||
|
||||
// Discard leading empty blocks. This is an indexing
|
||||
// optimisation to avoid retaining useless members
|
||||
// in a BAI/CSI.
|
||||
for bg.current.len() == 0 {
|
||||
bg.err = bg.nextBlock()
|
||||
if bg.err != nil {
|
||||
return 0, bg.err
|
||||
}
|
||||
}
|
||||
|
||||
bg.lastChunk.Begin = bg.current.txOffset()
|
||||
|
||||
var n int
|
||||
for n < len(p) && bg.err == nil {
|
||||
var _n int
|
||||
_n, bg.err = bg.current.Read(p[n:])
|
||||
n += _n
|
||||
if bg.err == io.EOF {
|
||||
if n == len(p) {
|
||||
bg.err = nil
|
||||
break
|
||||
}
|
||||
|
||||
if bg.Blocked {
|
||||
bg.err = nil
|
||||
bg.lastChunk.End = bg.current.txOffset()
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
bg.err = bg.nextBlock()
|
||||
if bg.err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bg.lastChunk.End = bg.current.txOffset()
|
||||
return n, bg.err
|
||||
}
|
||||
|
||||
// nextBlock swaps the current decompressed block for the next
|
||||
// in the stream. If the block is available from the cache
|
||||
// no additional work is done, otherwise a decompressor is
|
||||
// used or waited on.
|
||||
func (bg *Reader) nextBlock() error {
|
||||
base := bg.current.NextBase()
|
||||
ok := bg.cacheSwap(base)
|
||||
if ok {
|
||||
bg.Header = bg.current.header()
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
if bg.dec != nil {
|
||||
bg.dec.using(bg.current).nextBlockAt(base, nil)
|
||||
bg.current, err = bg.dec.wait()
|
||||
} else {
|
||||
var ok bool
|
||||
for i := 0; i < cap(bg.working); i++ {
|
||||
dec := <-bg.working
|
||||
bg.current, err = dec.wait()
|
||||
bg.waiting <- dec
|
||||
if bg.current.Base() == base {
|
||||
ok = true
|
||||
break
|
||||
}
|
||||
if err == nil {
|
||||
bg.keep(bg.current)
|
||||
bg.current = nil
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
panic("bgzf: unexpected block")
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Only set header if there was no error.
|
||||
h := bg.current.header()
|
||||
if bg.current.isMagicBlock() {
|
||||
// TODO(kortschak): Do this more carefully. It may be that
|
||||
// someone actually has extra data in this field that we are
|
||||
// clobbering.
|
||||
bg.Header.Extra = h.Extra
|
||||
} else {
|
||||
bg.Header = h
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cacheSwap attempts to swap the current Block for a cached Block
|
||||
// for the given base offset. It returns true if successful.
|
||||
func (bg *Reader) cacheSwap(base int64) bool {
|
||||
bg.mu.RLock()
|
||||
defer bg.mu.RUnlock()
|
||||
if bg.cache == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
blk, err := bg.cachedBlockFor(base)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if blk != nil {
|
||||
// TODO(kortschak): Under some conditions, e.g. FIFO
|
||||
// cache we will be discarding a non-nil evicted Block.
|
||||
// Consider retaining these in a sync.Pool.
|
||||
bg.cachePut(bg.current)
|
||||
bg.current = blk
|
||||
return true
|
||||
}
|
||||
var retained bool
|
||||
bg.current, retained = bg.cachePut(bg.current)
|
||||
if retained {
|
||||
bg.current = nil
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// cacheHasBlockFor returns whether the Reader's cache has a block
|
||||
// for the given base offset. If the requested Block exists, the base
|
||||
// offset of the following Block is returned.
|
||||
func (bg *Reader) cacheHasBlockFor(base int64) (exists bool, next int64) {
|
||||
bg.mu.RLock()
|
||||
defer bg.mu.RUnlock()
|
||||
if bg.cache == nil {
|
||||
return false, -1
|
||||
}
|
||||
return bg.cache.Peek(base)
|
||||
}
|
||||
|
||||
// cachedBlockFor returns a non-nil Block if the Reader has access to a
|
||||
// cache and the cache holds the block with the given base and the
|
||||
// correct owner, otherwise it returns nil. If the Block's owner is not
|
||||
// correct, or the Block cannot seek to the start of its data, a non-nil
|
||||
// error is returned.
|
||||
func (bg *Reader) cachedBlockFor(base int64) (Block, error) {
|
||||
blk := bg.cache.Get(base)
|
||||
if blk != nil {
|
||||
if !blk.ownedBy(bg) {
|
||||
return nil, ErrContaminatedCache
|
||||
}
|
||||
err := blk.seek(0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return blk, nil
|
||||
}
|
||||
|
||||
// cachePut puts the given Block into the cache if it exists, it returns
|
||||
// the Block that was evicted or b if it was not retained, and whether
|
||||
// the Block was retained by the cache.
|
||||
func (bg *Reader) cachePut(b Block) (evicted Block, retained bool) {
|
||||
if b == nil || !b.hasData() {
|
||||
return b, false
|
||||
}
|
||||
return bg.cache.Put(b)
|
||||
}
|
||||
|
||||
// keep puts the given Block into the cache if it exists.
|
||||
func (bg *Reader) keep(b Block) {
|
||||
if b == nil || !b.hasData() {
|
||||
return
|
||||
}
|
||||
bg.mu.RLock()
|
||||
defer bg.mu.RUnlock()
|
||||
if bg.cache != nil {
|
||||
bg.cache.Put(b)
|
||||
}
|
||||
}
|
||||
|
||||
// Begin returns a Tx that starts at the current virtual offset.
|
||||
func (bg *Reader) Begin() Tx { return Tx{begin: bg.lastChunk.Begin, r: bg} }
|
||||
|
||||
// Tx represents a multi-read transaction.
|
||||
type Tx struct {
|
||||
begin Offset
|
||||
r *Reader
|
||||
}
|
||||
|
||||
// End returns the Chunk spanning the transaction. After return the Tx is
|
||||
// no longer valid.
|
||||
func (t *Tx) End() Chunk {
|
||||
c := Chunk{Begin: t.begin, End: t.r.lastChunk.End}
|
||||
t.r = nil
|
||||
return c
|
||||
}
|
|
@ -0,0 +1,282 @@
|
|||
// Copyright ©2012 The bíogo Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package bgzf
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Writer implements BGZF blocked gzip compression.
|
||||
type Writer struct {
|
||||
gzip.Header
|
||||
w io.Writer
|
||||
|
||||
active *compressor
|
||||
|
||||
queue chan *compressor
|
||||
qwg sync.WaitGroup
|
||||
|
||||
waiting chan *compressor
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
closed bool
|
||||
|
||||
m sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
// NewWriter returns a new Writer. Writes to the returned writer are
|
||||
// compressed and written to w.
|
||||
//
|
||||
// The number of concurrent write compressors is specified by wc.
|
||||
func NewWriter(w io.Writer, wc int) *Writer {
|
||||
bg, _ := NewWriterLevel(w, gzip.DefaultCompression, wc)
|
||||
return bg
|
||||
}
|
||||
|
||||
// NewWriterLevel returns a new Writer using the specified compression level
|
||||
// instead of gzip.DefaultCompression. Allowable level options are integer
|
||||
// values between between gzip.BestSpeed and gzip.BestCompression inclusive.
|
||||
//
|
||||
// The number of concurrent write compressors is specified by wc.
|
||||
func NewWriterLevel(w io.Writer, level, wc int) (*Writer, error) {
|
||||
if level < gzip.DefaultCompression || level > gzip.BestCompression {
|
||||
return nil, fmt.Errorf("bgzf: invalid compression level: %d", level)
|
||||
}
|
||||
wc++ // We count one for the active compressor.
|
||||
if wc < 2 {
|
||||
wc = 2
|
||||
}
|
||||
bg := &Writer{
|
||||
w: w,
|
||||
waiting: make(chan *compressor, wc),
|
||||
queue: make(chan *compressor, wc),
|
||||
}
|
||||
|
||||
c := make([]compressor, wc)
|
||||
for i := range c {
|
||||
c[i].Header = &bg.Header
|
||||
c[i].level = level
|
||||
c[i].waiting = bg.waiting
|
||||
c[i].flush = make(chan *compressor, 1)
|
||||
c[i].qwg = &bg.qwg
|
||||
bg.waiting <- &c[i]
|
||||
}
|
||||
bg.active = <-bg.waiting
|
||||
|
||||
bg.wg.Add(1)
|
||||
go func() {
|
||||
defer bg.wg.Done()
|
||||
for qw := range bg.queue {
|
||||
if !writeOK(bg, <-qw.flush) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return bg, nil
|
||||
}
|
||||
|
||||
func writeOK(bg *Writer, c *compressor) bool {
|
||||
defer func() { bg.waiting <- c }()
|
||||
|
||||
if c.err != nil {
|
||||
bg.setErr(c.err)
|
||||
return false
|
||||
}
|
||||
if c.buf.Len() == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
_, err := io.Copy(bg.w, &c.buf)
|
||||
bg.qwg.Done()
|
||||
if err != nil {
|
||||
bg.setErr(err)
|
||||
return false
|
||||
}
|
||||
c.next = 0
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
type compressor struct {
|
||||
*gzip.Header
|
||||
gz *gzip.Writer
|
||||
level int
|
||||
|
||||
next int
|
||||
block [BlockSize]byte
|
||||
buf bytes.Buffer
|
||||
|
||||
flush chan *compressor
|
||||
qwg *sync.WaitGroup
|
||||
|
||||
waiting chan *compressor
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (c *compressor) writeBlock() {
|
||||
defer func() { c.flush <- c }()
|
||||
|
||||
if c.gz == nil {
|
||||
c.gz, c.err = gzip.NewWriterLevel(&c.buf, c.level)
|
||||
if c.err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
c.gz.Reset(&c.buf)
|
||||
}
|
||||
c.gz.Header = gzip.Header{
|
||||
Comment: c.Comment,
|
||||
Extra: append([]byte(bgzfExtra), c.Extra...),
|
||||
ModTime: c.ModTime,
|
||||
Name: c.Name,
|
||||
OS: c.OS,
|
||||
}
|
||||
|
||||
_, c.err = c.gz.Write(c.block[:c.next])
|
||||
if c.err != nil {
|
||||
return
|
||||
}
|
||||
c.err = c.gz.Close()
|
||||
if c.err != nil {
|
||||
return
|
||||
}
|
||||
c.next = 0
|
||||
|
||||
b := c.buf.Bytes()
|
||||
i := bytes.Index(b, bgzfExtraPrefix)
|
||||
if i < 0 {
|
||||
c.err = gzip.ErrHeader
|
||||
return
|
||||
}
|
||||
size := len(b) - 1
|
||||
if size >= MaxBlockSize {
|
||||
c.err = ErrBlockOverflow
|
||||
return
|
||||
}
|
||||
b[i+4], b[i+5] = byte(size), byte(size>>8)
|
||||
}
|
||||
|
||||
// Next returns the index of the start of the next write within the
|
||||
// decompressed data block.
|
||||
func (bg *Writer) Next() (int, error) {
|
||||
if bg.closed {
|
||||
return 0, ErrClosed
|
||||
}
|
||||
if err := bg.Error(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return bg.active.next, nil
|
||||
}
|
||||
|
||||
// Write writes the compressed form of b to the underlying io.Writer.
|
||||
// Decompressed data blocks are limited to BlockSize, so individual
|
||||
// byte slices may span block boundaries, however the Writer attempts
|
||||
// to keep each write within a single data block.
|
||||
func (bg *Writer) Write(b []byte) (int, error) {
|
||||
if bg.closed {
|
||||
return 0, ErrClosed
|
||||
}
|
||||
err := bg.Error()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
c := bg.active
|
||||
var n int
|
||||
for ; len(b) > 0 && err == nil; err = bg.Error() {
|
||||
var _n int
|
||||
if c.next == 0 || c.next+len(b) <= len(c.block) {
|
||||
_n = copy(c.block[c.next:], b)
|
||||
b = b[_n:]
|
||||
c.next += _n
|
||||
n += _n
|
||||
}
|
||||
|
||||
if c.next == len(c.block) || _n == 0 {
|
||||
bg.queue <- c
|
||||
bg.qwg.Add(1)
|
||||
go c.writeBlock()
|
||||
c = <-bg.waiting
|
||||
}
|
||||
}
|
||||
bg.active = c
|
||||
|
||||
return n, bg.Error()
|
||||
}
|
||||
|
||||
// Flush writes unwritten data to the underlying io.Writer. Flush does not block.
|
||||
func (bg *Writer) Flush() error {
|
||||
if bg.closed {
|
||||
return ErrClosed
|
||||
}
|
||||
if err := bg.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bg.active.next == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var c *compressor
|
||||
c, bg.active = bg.active, <-bg.waiting
|
||||
bg.queue <- c
|
||||
bg.qwg.Add(1)
|
||||
go c.writeBlock()
|
||||
|
||||
return bg.Error()
|
||||
}
|
||||
|
||||
// Wait waits for all pending writes to complete and returns the subsequent
|
||||
// error state of the Writer.
|
||||
func (bg *Writer) Wait() error {
|
||||
if err := bg.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
bg.qwg.Wait()
|
||||
return bg.Error()
|
||||
}
|
||||
|
||||
// Error returns the error state of the Writer.
|
||||
func (bg *Writer) Error() error {
|
||||
bg.m.Lock()
|
||||
defer bg.m.Unlock()
|
||||
return bg.err
|
||||
}
|
||||
|
||||
func (bg *Writer) setErr(err error) {
|
||||
bg.m.Lock()
|
||||
defer bg.m.Unlock()
|
||||
if bg.err == nil {
|
||||
bg.err = err
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the Writer, waiting for any pending writes before returning
|
||||
// the final error of the Writer.
|
||||
func (bg *Writer) Close() error {
|
||||
if !bg.closed {
|
||||
c := bg.active
|
||||
bg.queue <- c
|
||||
bg.qwg.Add(1)
|
||||
<-bg.waiting
|
||||
c.writeBlock()
|
||||
bg.closed = true
|
||||
close(bg.queue)
|
||||
bg.wg.Wait()
|
||||
if bg.err == nil {
|
||||
_, bg.err = bg.w.Write([]byte(magicBlock))
|
||||
}
|
||||
}
|
||||
return bg.err
|
||||
}
|
|
@ -31,6 +31,9 @@ you will need to specify the `output` option.
|
|||
you are executing multiple builders in parallel you should make sure
|
||||
`output` is unique for each one. For example `packer_{{.BuildName}}.zip`.
|
||||
|
||||
- `format` (string) - Disable archive format autodetection and use provided
|
||||
string.
|
||||
|
||||
- `compression_level` (integer) - Specify the compression level, for
|
||||
algorithms that support it, from 1 through 9 inclusive. Typically higher
|
||||
compression levels take longer but produce smaller files. Defaults to `6`
|
||||
|
|
Loading…
Reference in New Issue