[jira] [HBASE-5442] Use builder pattern in StoreFile and HFile

Summary: Cleaning up the factory method explosion in HFile writer and StoreFile.
Now, adding a new parameter to HFile/StoreFile writer initialization will not
require modifying factory method invocations all over the codebase.

Test Plan:
Run unit tests
Deploy to dev cluster and run a load test

Reviewers: JIRA, stack, tedyu, Kannan, Karthik, Liyin

Reviewed By: stack

Differential Revision: https://reviews.facebook.net/D1893

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1293095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbautin 2012-02-24 06:06:53 +00:00
parent bbf0a0074e
commit a01166ec92
27 changed files with 418 additions and 356 deletions

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.KeyValue.KeyComparator;
@ -51,6 +52,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import com.google.common.base.Preconditions;
/** /**
* File format for hbase. * File format for hbase.
* A file of sorted key/value pairs. Both keys and values are byte arrays. * A file of sorted key/value pairs. Both keys and values are byte arrays.
@ -232,33 +235,82 @@ public class HFile {
* we want to be able to swap writer implementations. * we want to be able to swap writer implementations.
*/ */
public static abstract class WriterFactory { public static abstract class WriterFactory {
protected Configuration conf; protected final Configuration conf;
protected CacheConfig cacheConf; protected final CacheConfig cacheConf;
protected FileSystem fs;
protected Path path;
protected FSDataOutputStream ostream;
protected int blockSize = HColumnDescriptor.DEFAULT_BLOCKSIZE;
protected Compression.Algorithm compression =
HFile.DEFAULT_COMPRESSION_ALGORITHM;
protected HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE;
protected KeyComparator comparator;
WriterFactory(Configuration conf, CacheConfig cacheConf) { WriterFactory(Configuration conf, CacheConfig cacheConf) {
this.conf = conf; this.conf = conf;
this.cacheConf = cacheConf; this.cacheConf = cacheConf;
} }
public abstract Writer createWriter(FileSystem fs, Path path) public WriterFactory withPath(FileSystem fs, Path path) {
throws IOException; Preconditions.checkNotNull(fs);
Preconditions.checkNotNull(path);
this.fs = fs;
this.path = path;
return this;
}
public abstract Writer createWriter(FileSystem fs, Path path, public WriterFactory withOutputStream(FSDataOutputStream ostream) {
int blockSize, Compression.Algorithm compress, Preconditions.checkNotNull(ostream);
this.ostream = ostream;
return this;
}
public WriterFactory withBlockSize(int blockSize) {
this.blockSize = blockSize;
return this;
}
public WriterFactory withCompression(Compression.Algorithm compression) {
Preconditions.checkNotNull(compression);
this.compression = compression;
return this;
}
public WriterFactory withCompression(String compressAlgo) {
Preconditions.checkNotNull(compression);
this.compression = AbstractHFileWriter.compressionByName(compressAlgo);
return this;
}
public WriterFactory withDataBlockEncoder(HFileDataBlockEncoder encoder) {
Preconditions.checkNotNull(encoder);
this.encoder = encoder;
return this;
}
public WriterFactory withComparator(KeyComparator comparator) {
Preconditions.checkNotNull(comparator);
this.comparator = comparator;
return this;
}
public Writer create() throws IOException {
if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
throw new AssertionError("Please specify exactly one of " +
"filesystem/path or path");
}
if (path != null) {
ostream = AbstractHFileWriter.createOutputStream(conf, fs, path);
}
return createWriter(fs, path, ostream, blockSize,
compression, encoder, comparator);
}
protected abstract Writer createWriter(FileSystem fs, Path path,
FSDataOutputStream ostream, int blockSize,
Compression.Algorithm compress,
HFileDataBlockEncoder dataBlockEncoder, HFileDataBlockEncoder dataBlockEncoder,
final KeyComparator comparator) throws IOException; KeyComparator comparator) throws IOException;
public abstract Writer createWriter(FileSystem fs, Path path,
int blockSize, String compress,
final KeyComparator comparator) throws IOException;
public abstract Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final String compress,
final KeyComparator comparator) throws IOException;
public abstract Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final Compression.Algorithm compress,
final KeyComparator c) throws IOException;
} }
/** The configuration key for HFile version to use for new files */ /** The configuration key for HFile version to use for new files */

View File

@ -82,102 +82,32 @@ public class HFileWriterV1 extends AbstractHFileWriter {
private int blockNumber = 0; private int blockNumber = 0;
static class WriterFactoryV1 extends HFile.WriterFactory { static class WriterFactoryV1 extends HFile.WriterFactory {
WriterFactoryV1(Configuration conf, CacheConfig cacheConf) { WriterFactoryV1(Configuration conf, CacheConfig cacheConf) {
super(conf, cacheConf); super(conf, cacheConf);
} }
@Override @Override
public Writer createWriter(FileSystem fs, Path path) throws IOException { public Writer createWriter(FileSystem fs, Path path,
return new HFileWriterV1(conf, cacheConf, fs, path); FSDataOutputStream ostream, int blockSize,
}
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder, Algorithm compressAlgo, HFileDataBlockEncoder dataBlockEncoder,
KeyComparator comparator) KeyComparator comparator)
throws IOException { throws IOException {
return new HFileWriterV1(conf, cacheConf, fs, path, blockSize, return new HFileWriterV1(conf, cacheConf, fs, path, ostream, blockSize,
compressAlgo, dataBlockEncoder, comparator); compressAlgo, dataBlockEncoder, comparator);
} }
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
String compressAlgoName, KeyComparator comparator)
throws IOException {
return new HFileWriterV1(conf, cacheConf, fs, path, blockSize,
compressAlgoName, comparator);
}
@Override
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final String compress,
final KeyComparator comparator) throws IOException {
return new HFileWriterV1(cacheConf, ostream, blockSize, compress,
comparator);
}
@Override
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final Compression.Algorithm compress,
final KeyComparator c) throws IOException {
return new HFileWriterV1(cacheConf, ostream, blockSize, compress,
NoOpDataBlockEncoder.INSTANCE, c);
}
}
/** Constructor that uses all defaults for compression and block size. */
public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
FileSystem fs, Path path)
throws IOException {
this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
HFile.DEFAULT_COMPRESSION_ALGORITHM,
NoOpDataBlockEncoder.INSTANCE, null);
}
/**
* Constructor that takes a path, creates and closes the output stream. Takes
* compression algorithm name as string.
*/
public HFileWriterV1(Configuration conf, CacheConfig cacheConf, FileSystem fs,
Path path, int blockSize, String compressAlgoName,
final KeyComparator comparator) throws IOException {
this(conf, cacheConf, fs, path, blockSize,
compressionByName(compressAlgoName), NoOpDataBlockEncoder.INSTANCE,
comparator);
} }
/** Constructor that takes a path, creates and closes the output stream. */ /** Constructor that takes a path, creates and closes the output stream. */
public HFileWriterV1(Configuration conf, CacheConfig cacheConf, public HFileWriterV1(Configuration conf, CacheConfig cacheConf,
FileSystem fs, Path path, FileSystem fs, Path path, FSDataOutputStream ostream,
int blockSize, Compression.Algorithm compress, int blockSize, Compression.Algorithm compress,
HFileDataBlockEncoder blockEncoder, HFileDataBlockEncoder blockEncoder,
final KeyComparator comparator) throws IOException { final KeyComparator comparator) throws IOException {
super(cacheConf, createOutputStream(conf, fs, path), path, super(cacheConf, ostream == null ? createOutputStream(conf, fs, path) : ostream, path,
blockSize, compress, blockEncoder, comparator); blockSize, compress, blockEncoder, comparator);
SchemaMetrics.configureGlobally(conf); SchemaMetrics.configureGlobally(conf);
} }
/** Constructor that takes a stream. */
public HFileWriterV1(CacheConfig cacheConf,
final FSDataOutputStream outputStream, final int blockSize,
final String compressAlgoName, final KeyComparator comparator)
throws IOException {
this(cacheConf, outputStream, blockSize,
Compression.getCompressionAlgorithmByName(compressAlgoName),
NoOpDataBlockEncoder.INSTANCE, comparator);
}
/** Constructor that takes a stream. */
public HFileWriterV1(CacheConfig cacheConf,
final FSDataOutputStream outputStream, final int blockSize,
final Compression.Algorithm compress,
HFileDataBlockEncoder blockEncoder, final KeyComparator comparator)
throws IOException {
super(cacheConf, outputStream, null, blockSize, compress,
blockEncoder, comparator);
}
/** /**
* If at block boundary, opens new block. * If at block boundary, opens new block.
* *

View File

@ -83,100 +83,32 @@ public class HFileWriterV2 extends AbstractHFileWriter {
private long maxMemstoreTS = 0; private long maxMemstoreTS = 0;
static class WriterFactoryV2 extends HFile.WriterFactory { static class WriterFactoryV2 extends HFile.WriterFactory {
WriterFactoryV2(Configuration conf, CacheConfig cacheConf) { WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
super(conf, cacheConf); super(conf, cacheConf);
} }
@Override @Override
public Writer createWriter(FileSystem fs, Path path) public Writer createWriter(FileSystem fs, Path path,
throws IOException { FSDataOutputStream ostream, int blockSize,
return new HFileWriterV2(conf, cacheConf, fs, path);
}
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder, Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder,
final KeyComparator comparator) throws IOException { final KeyComparator comparator) throws IOException {
return new HFileWriterV2(conf, cacheConf, fs, path, blockSize, return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize,
compress, blockEncoder, comparator); compress, blockEncoder, comparator);
} }
@Override
public Writer createWriter(FileSystem fs, Path path, int blockSize,
String compress, final KeyComparator comparator)
throws IOException {
return new HFileWriterV2(conf, cacheConf, fs, path, blockSize,
compress, comparator);
}
@Override
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final String compress,
final KeyComparator comparator) throws IOException {
return new HFileWriterV2(conf, cacheConf, ostream, blockSize, compress,
comparator);
}
@Override
public Writer createWriter(final FSDataOutputStream ostream,
final int blockSize, final Compression.Algorithm compress,
final KeyComparator c) throws IOException {
return new HFileWriterV2(conf, cacheConf, ostream, blockSize, compress,
c);
}
}
/** Constructor that uses all defaults for compression and block size. */
public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
FileSystem fs, Path path)
throws IOException {
this(conf, cacheConf, fs, path, HFile.DEFAULT_BLOCKSIZE,
HFile.DEFAULT_COMPRESSION_ALGORITHM, null, null);
}
/**
* Constructor that takes a path, creates and closes the output stream. Takes
* compression algorithm name as string.
*/
public HFileWriterV2(Configuration conf, CacheConfig cacheConf, FileSystem fs,
Path path, int blockSize, String compressAlgoName,
final KeyComparator comparator) throws IOException {
this(conf, cacheConf, fs, path, blockSize,
compressionByName(compressAlgoName), null, comparator);
} }
/** Constructor that takes a path, creates and closes the output stream. */ /** Constructor that takes a path, creates and closes the output stream. */
public HFileWriterV2(Configuration conf, CacheConfig cacheConf, FileSystem fs, public HFileWriterV2(Configuration conf, CacheConfig cacheConf,
Path path, int blockSize, Compression.Algorithm compressAlgo, FileSystem fs, Path path, FSDataOutputStream ostream, int blockSize,
HFileDataBlockEncoder blockEncoder, Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder,
final KeyComparator comparator) throws IOException { final KeyComparator comparator) throws IOException {
super(cacheConf, createOutputStream(conf, fs, path), path, super(cacheConf,
blockSize, compressAlgo, blockEncoder, comparator); ostream == null ? createOutputStream(conf, fs, path) : ostream,
path, blockSize, compressAlgo, blockEncoder, comparator);
SchemaMetrics.configureGlobally(conf); SchemaMetrics.configureGlobally(conf);
finishInit(conf); finishInit(conf);
} }
/** Constructor that takes a stream. */
public HFileWriterV2(final Configuration conf, final CacheConfig cacheConf,
final FSDataOutputStream outputStream, final int blockSize,
final String compressAlgoName, final KeyComparator comparator)
throws IOException {
this(conf, cacheConf, outputStream, blockSize,
Compression.getCompressionAlgorithmByName(compressAlgoName),
comparator);
}
/** Constructor that takes a stream. */
public HFileWriterV2(final Configuration conf, final CacheConfig cacheConf,
final FSDataOutputStream outputStream, final int blockSize,
final Compression.Algorithm compress, final KeyComparator comparator)
throws IOException {
super(cacheConf, outputStream, null, blockSize, compress, null,
comparator);
finishInit(conf);
}
/** Additional initialization steps */ /** Additional initialization steps */
private void finishInit(final Configuration conf) { private void finishInit(final Configuration conf) {
if (fsBlockWriter != null) if (fsBlockWriter != null)

View File

@ -173,10 +173,12 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
Path familydir = new Path(outputdir, Bytes.toString(family)); Path familydir = new Path(outputdir, Bytes.toString(family));
String compression = compressionMap.get(family); String compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression; compression = compression == null ? defaultCompression : compression;
wl.writer = wl.writer = HFile.getWriterFactoryNoCache(conf)
HFile.getWriterFactoryNoCache(conf).createWriter(fs, .withPath(fs, StoreFile.getUniqueFile(fs, familydir))
StoreFile.getUniqueFile(fs, familydir), blocksize, .withBlockSize(blocksize)
compression, KeyValue.KEY_COMPARATOR); .withCompression(compression)
.withComparator(KeyValue.KEY_COMPARATOR)
.create();
this.writers.put(family, wl); this.writers.put(family, wl);
return wl; return wl;
} }

View File

@ -543,10 +543,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
Algorithm compression = familyDescriptor.getCompression(); Algorithm compression = familyDescriptor.getCompression();
BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
halfWriter = new StoreFile.Writer( halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
fs, outFile, blocksize, compression, dataBlockEncoder, fs, blocksize)
conf, cacheConf, .withFilePath(outFile)
KeyValue.COMPARATOR, bloomFilterType, 0); .withCompression(compression)
.withDataBlockEncoder(dataBlockEncoder)
.withBloomType(bloomFilterType)
.build();
HFileScanner scanner = halfReader.getScanner(false, false, false); HFileScanner scanner = halfReader.getScanner(false, false, false);
scanner.seekTo(); scanner.seekTo();
do { do {

View File

@ -790,9 +790,14 @@ public class Store extends SchemaConfigured implements HeapSize {
} else { } else {
writerCacheConf = cacheConf; writerCacheConf = cacheConf;
} }
StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(), StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
blocksize, compression, dataBlockEncoder, comparator, conf, fs, blocksize)
writerCacheConf, family.getBloomFilterType(), maxKeyCount); .withOutputDir(region.getTmpDir())
.withDataBlockEncoder(dataBlockEncoder)
.withComparator(comparator)
.withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount)
.build();
// The store file writer's path does not include the CF name, so we need // The store file writer's path does not include the CF name, so we need
// to configure the HFile writer directly. // to configure the HFile writer directly.
SchemaConfigured sc = (SchemaConfigured) w.writer; SchemaConfigured sc = (SchemaConfigured) w.writer;

View File

@ -69,13 +69,14 @@ import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
/** /**
* A Store data file. Stores usually have one or more of these files. They * A Store data file. Stores usually have one or more of these files. They
* are produced by flushing the memstore to disk. To * are produced by flushing the memstore to disk. To
* create, call {@link #createWriter(FileSystem, Path, int, Configuration, CacheConfig)} * create, instantiate a writer using {@link StoreFile#WriterBuilder}
* and append data. Be sure to add any metadata before calling close on the * and append data. Be sure to add any metadata before calling close on the
* Writer (Use the appendMetadata convenience methods). On close, a StoreFile * Writer (Use the appendMetadata convenience methods). On close, a StoreFile
* is sitting in the Filesystem. To refer to it, create a StoreFile instance * is sitting in the Filesystem. To refer to it, create a StoreFile instance
@ -681,64 +682,122 @@ public class StoreFile extends SchemaConfigured {
return tgt; return tgt;
} }
/** public static class WriterBuilder {
* Get a store file writer. Client is responsible for closing file when done. private final Configuration conf;
* private final CacheConfig cacheConf;
* @param fs private final FileSystem fs;
* @param dir Path to family directory. Makes the directory if doesn't exist. private final int blockSize;
* Creates a file with a unique name in this directory.
* @param blocksize size per filesystem block
* @return StoreFile.Writer
* @throws IOException
*/
public static Writer createWriter(final FileSystem fs, final Path dir,
final int blocksize, Configuration conf, CacheConfig cacheConf)
throws IOException {
return createWriter(fs, dir, blocksize, null, NoOpDataBlockEncoder.INSTANCE,
null, conf, cacheConf, BloomType.NONE, 0);
}
/** private Compression.Algorithm compressAlgo =
* Create a store file writer. Client is responsible for closing file when done. HFile.DEFAULT_COMPRESSION_ALGORITHM;
* If metadata, add BEFORE closing using appendMetadata() private HFileDataBlockEncoder dataBlockEncoder =
* @param fs NoOpDataBlockEncoder.INSTANCE;
* @param dir Path to family directory. Makes the directory if doesn't exist. private KeyValue.KVComparator comparator = KeyValue.COMPARATOR;
* Creates a file with a unique name in this directory. private BloomType bloomType = BloomType.NONE;
* @param blocksize private long maxKeyCount = 0;
* @param compressAlgo Compression algorithm. Pass null to get default. private Path dir;
* @param dataBlockEncoder Pass null to disable data block encoding. private Path filePath;
* @param comparator Key-value comparator. Pass null to get default.
* @param conf HBase system configuration. used with bloom filters
* @param cacheConf Cache configuration and reference.
* @param bloomType column family setting for bloom filters
* @param maxKeyCount estimated maximum number of keys we expect to add
* @return HFile.Writer
* @throws IOException
*/
public static StoreFile.Writer createWriter(final FileSystem fs,
final Path dir, final int blocksize,
Compression.Algorithm compressAlgo,
final HFileDataBlockEncoder dataBlockEncoder,
KeyValue.KVComparator comparator, final Configuration conf,
final CacheConfig cacheConf, BloomType bloomType, long maxKeyCount)
throws IOException {
if (!fs.exists(dir)) { public WriterBuilder(Configuration conf, CacheConfig cacheConf,
fs.mkdirs(dir); FileSystem fs, int blockSize) {
} this.conf = conf;
Path path = getUniqueFile(fs, dir); this.cacheConf = cacheConf;
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) { this.fs = fs;
bloomType = BloomType.NONE; this.blockSize = blockSize;
} }
if (compressAlgo == null) { /**
compressAlgo = HFile.DEFAULT_COMPRESSION_ALGORITHM; * Use either this method or {@link #withFilePath}, but not both.
* @param dir Path to column family directory. The directory is created if
* does not exist. The file is given a unique name within this
* directory.
* @return this (for chained invocation)
*/
public WriterBuilder withOutputDir(Path dir) {
Preconditions.checkNotNull(dir);
this.dir = dir;
return this;
} }
if (comparator == null) {
comparator = KeyValue.COMPARATOR; /**
* Use either this method or {@link #withOutputDir}, but not both.
* @param filePath the StoreFile path to write
* @return this (for chained invocation)
*/
public WriterBuilder withFilePath(Path filePath) {
Preconditions.checkNotNull(filePath);
this.filePath = filePath;
return this;
}
public WriterBuilder withCompression(Compression.Algorithm compressAlgo) {
Preconditions.checkNotNull(compressAlgo);
this.compressAlgo = compressAlgo;
return this;
}
public WriterBuilder withDataBlockEncoder(HFileDataBlockEncoder encoder) {
Preconditions.checkNotNull(encoder);
this.dataBlockEncoder = encoder;
return this;
}
public WriterBuilder withComparator(KeyValue.KVComparator comparator) {
Preconditions.checkNotNull(comparator);
this.comparator = comparator;
return this;
}
public WriterBuilder withBloomType(BloomType bloomType) {
Preconditions.checkNotNull(bloomType);
this.bloomType = bloomType;
return this;
}
/**
* @param maxKeyCount estimated maximum number of keys we expect to add
* @return this (for chained invocation)
*/
public WriterBuilder withMaxKeyCount(long maxKeyCount) {
this.maxKeyCount = maxKeyCount;
return this;
}
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
* {@link Writer#appendMetadata}.
*/
public Writer build() throws IOException {
if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
throw new IllegalArgumentException("Either specify parent directory " +
"or file path");
}
if (dir == null) {
dir = filePath.getParent();
}
if (!fs.exists(dir)) {
fs.mkdirs(dir);
}
if (filePath == null) {
filePath = getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
bloomType = BloomType.NONE;
}
}
if (compressAlgo == null) {
compressAlgo = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
if (comparator == null) {
comparator = KeyValue.COMPARATOR;
}
return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder,
conf, cacheConf, comparator, bloomType, maxKeyCount);
} }
return new Writer(fs, path, blocksize, compressAlgo, dataBlockEncoder,
conf, cacheConf, comparator, bloomType, maxKeyCount);
} }
/** /**
@ -845,6 +904,7 @@ public class StoreFile extends SchemaConfigured {
boolean isTimeRangeTrackerSet = false; boolean isTimeRangeTrackerSet = false;
protected HFile.Writer writer; protected HFile.Writer writer;
/** /**
* Creates an HFile.Writer that also write helpful meta data. * Creates an HFile.Writer that also write helpful meta data.
* @param fs file system to write to * @param fs file system to write to
@ -858,7 +918,7 @@ public class StoreFile extends SchemaConfigured {
* for Bloom filter size in {@link HFile} format version 1. * for Bloom filter size in {@link HFile} format version 1.
* @throws IOException problem writing to FS * @throws IOException problem writing to FS
*/ */
public Writer(FileSystem fs, Path path, int blocksize, private Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress, Compression.Algorithm compress,
HFileDataBlockEncoder dataBlockEncoder, final Configuration conf, HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
CacheConfig cacheConf, CacheConfig cacheConf,
@ -866,9 +926,13 @@ public class StoreFile extends SchemaConfigured {
throws IOException { throws IOException {
this.dataBlockEncoder = dataBlockEncoder != null ? this.dataBlockEncoder = dataBlockEncoder != null ?
dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
writer = HFile.getWriterFactory(conf, cacheConf).createWriter( writer = HFile.getWriterFactory(conf, cacheConf)
fs, path, blocksize, .withPath(fs, path)
compress, this.dataBlockEncoder, comparator.getRawComparator()); .withBlockSize(blocksize)
.withCompression(compress)
.withDataBlockEncoder(dataBlockEncoder)
.withComparator(comparator.getRawComparator())
.create();
this.kvComparator = comparator; this.kvComparator = comparator;

View File

@ -105,9 +105,10 @@ public class CompressionTest {
public static void doSmokeTest(FileSystem fs, Path path, String codec) public static void doSmokeTest(FileSystem fs, Path path, String codec)
throws Exception { throws Exception {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
HFile.Writer writer = HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
HFile.getWriterFactoryNoCache(conf).createWriter( .withPath(fs, path)
fs, path, HFile.DEFAULT_BLOCKSIZE, codec, null); .withCompression(codec)
.create();
writer.append(Bytes.toBytes("testkey"), Bytes.toBytes("testval")); writer.append(Bytes.toBytes("testkey"), Bytes.toBytes("testval"));
writer.appendFileInfo(Bytes.toBytes("infokey"), Bytes.toBytes("infoval")); writer.appendFileInfo(Bytes.toBytes("infokey"), Bytes.toBytes("infoval"));
writer.close(); writer.close();

View File

@ -190,9 +190,10 @@ public class HFilePerformanceEvaluation {
@Override @Override
void setUp() throws Exception { void setUp() throws Exception {
writer = writer =
HFile.getWriterFactoryNoCache(conf).createWriter(this.fs, HFile.getWriterFactoryNoCache(conf)
this.mf, RFILE_BLOCKSIZE, (Compression.Algorithm) null, null, .withPath(fs, mf)
null); .withBlockSize(RFILE_BLOCKSIZE)
.create();
} }
@Override @Override

View File

@ -67,9 +67,11 @@ public class TestHalfStoreFileReader {
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
CacheConfig cacheConf = new CacheConfig(conf); CacheConfig cacheConf = new CacheConfig(conf);
HFile.Writer w = HFile.Writer w = HFile.getWriterFactory(conf, cacheConf)
HFile.getWriterFactory(conf, cacheConf).createWriter(fs, p, 1024, .withPath(fs, p)
"none", KeyValue.KEY_COMPARATOR); .withBlockSize(1024)
.withComparator(KeyValue.KEY_COMPARATOR)
.create();
// write some things. // write some things.
List<KeyValue> items = genSomeKeys(); List<KeyValue> items = genSomeKeys();

View File

@ -284,9 +284,15 @@ public class TestCacheOnWrite {
public void writeStoreFile() throws IOException { public void writeStoreFile() throws IOException {
Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(),
"test_cache_on_write"); "test_cache_on_write");
StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir, StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs,
DATA_BLOCK_SIZE, compress, encoder, KeyValue.COMPARATOR, conf, DATA_BLOCK_SIZE)
cacheConf, BLOOM_TYPE, NUM_KV); .withOutputDir(storeFileParentDir)
.withCompression(compress)
.withDataBlockEncoder(encoder)
.withComparator(KeyValue.COMPARATOR)
.withBloomType(BLOOM_TYPE)
.withMaxKeyCount(NUM_KV)
.build();
final int rowLen = 32; final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) { for (int i = 0; i < NUM_KV; ++i) {

View File

@ -80,7 +80,8 @@ public class TestHFile extends HBaseTestCase {
public void testEmptyHFile() throws IOException { public void testEmptyHFile() throws IOException {
if (cacheConf == null) cacheConf = new CacheConfig(conf); if (cacheConf == null) cacheConf = new CacheConfig(conf);
Path f = new Path(ROOT_DIR, getName()); Path f = new Path(ROOT_DIR, getName());
Writer w = HFile.getWriterFactory(conf, cacheConf).createWriter(this.fs, f); Writer w =
HFile.getWriterFactory(conf, cacheConf).withPath(fs, f).create();
w.close(); w.close();
Reader r = HFile.createReader(fs, f, cacheConf); Reader r = HFile.createReader(fs, f, cacheConf);
r.loadFileInfo(); r.loadFileInfo();
@ -152,8 +153,11 @@ public class TestHFile extends HBaseTestCase {
if (cacheConf == null) cacheConf = new CacheConfig(conf); if (cacheConf == null) cacheConf = new CacheConfig(conf);
Path ncTFile = new Path(ROOT_DIR, "basic.hfile." + codec.toString()); Path ncTFile = new Path(ROOT_DIR, "basic.hfile." + codec.toString());
FSDataOutputStream fout = createFSOutput(ncTFile); FSDataOutputStream fout = createFSOutput(ncTFile);
Writer writer = HFile.getWriterFactory(conf, cacheConf).createWriter(fout, Writer writer = HFile.getWriterFactory(conf, cacheConf)
minBlockSize, Compression.getCompressionAlgorithmByName(codec), null); .withOutputStream(fout)
.withBlockSize(minBlockSize)
.withCompression(codec)
.create();
LOG.info(writer); LOG.info(writer);
writeRecords(writer); writeRecords(writer);
fout.close(); fout.close();
@ -229,9 +233,11 @@ public class TestHFile extends HBaseTestCase {
if (cacheConf == null) cacheConf = new CacheConfig(conf); if (cacheConf == null) cacheConf = new CacheConfig(conf);
Path mFile = new Path(ROOT_DIR, "meta.hfile"); Path mFile = new Path(ROOT_DIR, "meta.hfile");
FSDataOutputStream fout = createFSOutput(mFile); FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = HFile.getWriterFactory(conf, cacheConf).createWriter(fout, Writer writer = HFile.getWriterFactory(conf, cacheConf)
minBlockSize, Compression.getCompressionAlgorithmByName(compress), .withOutputStream(fout)
null); .withBlockSize(minBlockSize)
.withCompression(compress)
.create();
someTestingWithMetaBlock(writer); someTestingWithMetaBlock(writer);
writer.close(); writer.close();
fout.close(); fout.close();
@ -259,8 +265,11 @@ public class TestHFile extends HBaseTestCase {
HBaseTestingUtility.COMPRESSION_ALGORITHMS) { HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
Path mFile = new Path(ROOT_DIR, "nometa_" + compressAlgo + ".hfile"); Path mFile = new Path(ROOT_DIR, "nometa_" + compressAlgo + ".hfile");
FSDataOutputStream fout = createFSOutput(mFile); FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = HFile.getWriterFactory(conf, cacheConf).createWriter(fout, Writer writer = HFile.getWriterFactory(conf, cacheConf)
minBlockSize, compressAlgo, null); .withOutputStream(fout)
.withBlockSize(minBlockSize)
.withCompression(compressAlgo)
.create();
writer.append("foo".getBytes(), "value".getBytes()); writer.append("foo".getBytes(), "value".getBytes());
writer.close(); writer.close();
fout.close(); fout.close();
@ -283,19 +292,22 @@ public class TestHFile extends HBaseTestCase {
if (cacheConf == null) cacheConf = new CacheConfig(conf); if (cacheConf == null) cacheConf = new CacheConfig(conf);
Path mFile = new Path(ROOT_DIR, "meta.tfile"); Path mFile = new Path(ROOT_DIR, "meta.tfile");
FSDataOutputStream fout = createFSOutput(mFile); FSDataOutputStream fout = createFSOutput(mFile);
Writer writer = HFile.getWriterFactory(conf, cacheConf).createWriter(fout, KeyComparator comparator = new KeyComparator() {
minBlockSize, (Compression.Algorithm) null, new KeyComparator() { @Override
@Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int l2) { return -Bytes.compareTo(b1, s1, l1, b2, s2, l2);
return -Bytes.compareTo(b1, s1, l1, b2, s2, l2); }
@Override
} public int compare(byte[] o1, byte[] o2) {
@Override return compare(o1, 0, o1.length, o2, 0, o2.length);
public int compare(byte[] o1, byte[] o2) { }
return compare(o1, 0, o1.length, o2, 0, o2.length); };
} Writer writer = HFile.getWriterFactory(conf, cacheConf)
}); .withOutputStream(fout)
.withBlockSize(minBlockSize)
.withComparator(comparator)
.create();
writer.append("3".getBytes(), "0".getBytes()); writer.append("3".getBytes(), "0".getBytes());
writer.append("2".getBytes(), "0".getBytes()); writer.append("2".getBytes(), "0".getBytes());
writer.append("1".getBytes(), "0".getBytes()); writer.append("1".getBytes(), "0".getBytes());

View File

@ -483,8 +483,12 @@ public class TestHFileBlockIndex {
// Write the HFile // Write the HFile
{ {
HFile.Writer writer = HFile.Writer writer =
HFile.getWriterFactory(conf, cacheConf).createWriter(fs, HFile.getWriterFactory(conf, cacheConf)
hfilePath, SMALL_BLOCK_SIZE, compr, null, KeyValue.KEY_COMPARATOR); .withPath(fs, hfilePath)
.withBlockSize(SMALL_BLOCK_SIZE)
.withCompression(compr)
.withComparator(KeyValue.KEY_COMPARATOR)
.create();
Random rand = new Random(19231737); Random rand = new Random(19231737);
for (int i = 0; i < NUM_KV; ++i) { for (int i = 0; i < NUM_KV; ++i) {

View File

@ -161,9 +161,11 @@ public class TestHFilePerformance extends TestCase {
if ("HFile".equals(fileType)){ if ("HFile".equals(fileType)){
System.out.println("HFile write method: "); System.out.println("HFile write method: ");
HFile.Writer writer = HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
HFile.getWriterFactoryNoCache(conf).createWriter(fout, .withOutputStream(fout)
minBlockSize, codecName, null); .withBlockSize(minBlockSize)
.withCompression(codecName)
.create();
// Writing value in one shot. // Writing value in one shot.
for (long l=0; l<rows; l++ ) { for (long l=0; l<rows; l++ ) {

View File

@ -121,9 +121,11 @@ public class TestHFileSeek extends TestCase {
long totalBytes = 0; long totalBytes = 0;
FSDataOutputStream fout = createFSOutput(path, fs); FSDataOutputStream fout = createFSOutput(path, fs);
try { try {
Writer writer = Writer writer = HFile.getWriterFactoryNoCache(conf)
HFile.getWriterFactoryNoCache(conf).createWriter(fout, .withOutputStream(fout)
options.minBlockSize, options.compress, null); .withBlockSize(options.minBlockSize)
.withCompression(options.compress)
.create();
try { try {
BytesWritable key = new BytesWritable(); BytesWritable key = new BytesWritable();
BytesWritable val = new BytesWritable(); BytesWritable val = new BytesWritable();

View File

@ -75,8 +75,13 @@ public class TestHFileWriterV2 {
"testHFileFormatV2"); "testHFileFormatV2");
final Compression.Algorithm COMPRESS_ALGO = Compression.Algorithm.GZ; final Compression.Algorithm COMPRESS_ALGO = Compression.Algorithm.GZ;
HFileWriterV2 writer = new HFileWriterV2(conf, new CacheConfig(conf), fs, HFileWriterV2 writer = (HFileWriterV2)
hfilePath, 4096, COMPRESS_ALGO, null, KeyValue.KEY_COMPARATOR); new HFileWriterV2.WriterFactoryV2(conf, new CacheConfig(conf))
.withPath(fs, hfilePath)
.withBlockSize(4096)
.withCompression(COMPRESS_ALGO)
.withComparator(KeyValue.KEY_COMPARATOR)
.create();
long totalKeyLength = 0; long totalKeyLength = 0;
long totalValueLength = 0; long totalValueLength = 0;

View File

@ -47,8 +47,10 @@ public class TestReseekTo {
FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile); FSDataOutputStream fout = TEST_UTIL.getTestFileSystem().create(ncTFile);
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration()); CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
HFile.Writer writer = HFile.getWriterFactory( HFile.Writer writer = HFile.getWriterFactory(
TEST_UTIL.getConfiguration(), cacheConf).createWriter( TEST_UTIL.getConfiguration(), cacheConf)
fout, 4000, "none", null); .withOutputStream(fout)
.withBlockSize(4000)
.create();
int numberOfKeys = 1000; int numberOfKeys = 1000;
String valueString = "Value"; String valueString = "Value";

View File

@ -46,9 +46,10 @@ public class TestSeekTo extends HBaseTestCase {
Path ncTFile = new Path(this.testDir, "basic.hfile"); Path ncTFile = new Path(this.testDir, "basic.hfile");
FSDataOutputStream fout = this.fs.create(ncTFile); FSDataOutputStream fout = this.fs.create(ncTFile);
int blocksize = toKV("a").getLength() * 3; int blocksize = toKV("a").getLength() * 3;
HFile.Writer writer = HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
HFile.getWriterFactoryNoCache(conf).createWriter(fout, .withOutputStream(fout)
blocksize, "none", null); .withBlockSize(blocksize)
.create();
// 4 bytes * 3 * 2 for each key/value + // 4 bytes * 3 * 2 for each key/value +
// 3 for keys, 15 for values = 42 (woot) // 3 for keys, 15 for values = 42 (woot)
writer.append(toKV("c")); writer.append(toKV("c"));

View File

@ -207,10 +207,12 @@ public class TestLoadIncrementalHFiles {
byte[] family, byte[] qualifier, byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows) throws IOException byte[] startKey, byte[] endKey, int numRows) throws IOException
{ {
HFile.Writer writer = HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
HFile.getWriterFactory(conf, new CacheConfig(conf)).createWriter(fs, path, .withPath(fs, path)
BLOCKSIZE, COMPRESSION, .withBlockSize(BLOCKSIZE)
KeyValue.KEY_COMPARATOR); .withCompression(COMPRESSION)
.withComparator(KeyValue.KEY_COMPARATOR)
.create();
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
try { try {
// subtract 2 since iterateOnSplits doesn't include boundary keys // subtract 2 since iterateOnSplits doesn't include boundary keys

View File

@ -183,9 +183,13 @@ public class CreateRandomStoreFile {
Integer.valueOf(cmdLine.getOptionValue(INDEX_BLOCK_SIZE_OPTION))); Integer.valueOf(cmdLine.getOptionValue(INDEX_BLOCK_SIZE_OPTION)));
} }
StoreFile.Writer sfw = StoreFile.createWriter(fs, outputDir, blockSize, StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf,
compr, null, KeyValue.COMPARATOR, conf, new CacheConfig(conf), new CacheConfig(conf), fs, blockSize)
bloomType, numKV); .withOutputDir(outputDir)
.withCompression(compr)
.withBloomType(bloomType)
.withMaxKeyCount(numKV)
.build();
rand = new Random(); rand = new Random();
LOG.info("Writing " + numKV + " key/value pairs"); LOG.info("Writing " + numKV + " key/value pairs");

View File

@ -349,11 +349,14 @@ public class HFileReadWriteTest {
null); null);
Store store = new Store(outputDir, region, columnDescriptor, fs, conf); Store store = new Store(outputDir, region, columnDescriptor, fs, conf);
StoreFile.Writer writer = StoreFile.Writer writer = new StoreFile.WriterBuilder(conf,
StoreFile.createWriter(fs, outputDir, blockSize, compression, new CacheConfig(conf), fs, blockSize)
dataBlockEncoder, KeyValue.COMPARATOR, this.conf, .withOutputDir(outputDir)
new CacheConfig(conf), bloomType, .withCompression(compression)
maxKeyCount); .withDataBlockEncoder(dataBlockEncoder)
.withBloomType(bloomType)
.withMaxKeyCount(maxKeyCount)
.build();
StatisticsPrinter statsPrinter = new StatisticsPrinter(); StatisticsPrinter statsPrinter = new StatisticsPrinter();
statsPrinter.startThread(); statsPrinter.startThread();

View File

@ -294,9 +294,11 @@ public class TestCompoundBloomFilter {
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true); conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, true);
cacheConf = new CacheConfig(conf); cacheConf = new CacheConfig(conf);
StoreFile.Writer w = StoreFile.createWriter(fs, StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConf, fs,
TEST_UTIL.getDataTestDir(), BLOCK_SIZES[t], null, null, null, conf, BLOCK_SIZES[t])
cacheConf, bt, 0); .withOutputDir(TEST_UTIL.getDataTestDir())
.withBloomType(bt)
.build();
assertTrue(w.hasGeneralBloom()); assertTrue(w.hasGeneralBloom());
assertTrue(w.getGeneralBloomWriter() instanceof CompoundBloomFilterWriter); assertTrue(w.getGeneralBloomWriter() instanceof CompoundBloomFilterWriter);

View File

@ -71,8 +71,10 @@ public class TestFSErrorsExposed {
"regionname"), "familyname"); "regionname"), "familyname");
FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem()); FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2*1024, StoreFile.Writer writer = new StoreFile.WriterBuilder(
util.getConfiguration(), cacheConf); util.getConfiguration(), cacheConf, fs, 2*1024)
.withOutputDir(hfilePath)
.build();
TestStoreFile.writeStoreFile( TestStoreFile.writeStoreFile(
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
@ -116,8 +118,10 @@ public class TestFSErrorsExposed {
"regionname"), "familyname"); "regionname"), "familyname");
FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem()); FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024, StoreFile.Writer writer = new StoreFile.WriterBuilder(
util.getConfiguration(), cacheConf); util.getConfiguration(), cacheConf, fs, 2 * 1024)
.withOutputDir(hfilePath)
.build();
TestStoreFile.writeStoreFile( TestStoreFile.writeStoreFile(
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));

View File

@ -82,7 +82,11 @@ public class TestHRegionServerBulkLoad {
byte[] qualifier, byte[] value, int numRows) throws IOException { byte[] qualifier, byte[] value, int numRows) throws IOException {
HFile.Writer writer = HFile HFile.Writer writer = HFile
.getWriterFactory(conf, new CacheConfig(conf)) .getWriterFactory(conf, new CacheConfig(conf))
.createWriter(fs, path, BLOCKSIZE, COMPRESSION, KeyValue.KEY_COMPARATOR); .withPath(fs, path)
.withBlockSize(BLOCKSIZE)
.withCompression(COMPRESSION)
.withComparator(KeyValue.KEY_COMPARATOR)
.create();
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
try { try {
// subtract 2 since iterateOnSplits doesn't include boundary keys // subtract 2 since iterateOnSplits doesn't include boundary keys

View File

@ -265,8 +265,10 @@ public class TestStore extends TestCase {
long seqid = f.getMaxSequenceId(); long seqid = f.getMaxSequenceId();
Configuration c = HBaseConfiguration.create(); Configuration c = HBaseConfiguration.create();
FileSystem fs = FileSystem.get(c); FileSystem fs = FileSystem.get(c);
StoreFile.Writer w = StoreFile.createWriter(fs, storedir, StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
StoreFile.DEFAULT_BLOCKSIZE_SMALL, c, new CacheConfig(c)); fs, StoreFile.DEFAULT_BLOCKSIZE_SMALL)
.withOutputDir(storedir)
.build();
w.appendMetadata(seqid + 1, false); w.appendMetadata(seqid + 1, false);
w.close(); w.close();
this.store.close(); this.store.close();

View File

@ -89,9 +89,12 @@ public class TestStoreFile extends HBaseTestCase {
*/ */
public void testBasicHalfMapFile() throws Exception { public void testBasicHalfMapFile() throws Exception {
// Make up a directory hierarchy that has a regiondir and familyname. // Make up a directory hierarchy that has a regiondir and familyname.
StoreFile.Writer writer = StoreFile.createWriter(this.fs, Path outputDir = new Path(new Path(this.testDir, "regionname"),
new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024, "familyname");
conf, cacheConf); StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf,
this.fs, 2 * 1024)
.withOutputDir(outputDir)
.build();
writeStoreFile(writer); writeStoreFile(writer);
checkHalfHFile(new StoreFile(this.fs, writer.getPath(), conf, cacheConf, checkHalfHFile(new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE)); StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE));
@ -131,8 +134,10 @@ public class TestStoreFile extends HBaseTestCase {
Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname"); Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname");
Path dir = new Path(storedir, "1234567890"); Path dir = new Path(storedir, "1234567890");
// Make a store file and write data to it. // Make a store file and write data to it.
StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024, StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf,
conf, cacheConf); this.fs, 8 * 1024)
.withOutputDir(dir)
.build();
writeStoreFile(writer); writeStoreFile(writer);
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf, StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE); StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
@ -391,10 +396,12 @@ public class TestStoreFile extends HBaseTestCase {
// write the file // write the file
Path f = new Path(ROOT_DIR, getName()); Path f = new Path(ROOT_DIR, getName());
StoreFile.Writer writer = StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
new StoreFile.Writer(fs, f, StoreFile.DEFAULT_BLOCKSIZE_SMALL, StoreFile.DEFAULT_BLOCKSIZE_SMALL)
HFile.DEFAULT_COMPRESSION_ALGORITHM, null, conf, cacheConf, .withFilePath(f)
KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000); .withBloomType(StoreFile.BloomType.ROW)
.withMaxKeyCount(2000)
.build();
bloomWriteRead(writer, fs); bloomWriteRead(writer, fs);
} }
@ -409,10 +416,11 @@ public class TestStoreFile extends HBaseTestCase {
// write the file // write the file
Path f = new Path(ROOT_DIR, getName()); Path f = new Path(ROOT_DIR, getName());
StoreFile.Writer writer = new StoreFile.Writer(fs, f, StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, fs, StoreFile.DEFAULT_BLOCKSIZE_SMALL)
null, conf, cacheConf, KeyValue.COMPARATOR, StoreFile.BloomType.NONE, .withFilePath(f)
2000); .withMaxKeyCount(2000)
.build();
// add delete family // add delete family
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@ -477,10 +485,12 @@ public class TestStoreFile extends HBaseTestCase {
for (int x : new int[]{0,1}) { for (int x : new int[]{0,1}) {
// write the file // write the file
Path f = new Path(ROOT_DIR, getName() + x); Path f = new Path(ROOT_DIR, getName() + x);
StoreFile.Writer writer = new StoreFile.Writer(fs, f, StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, fs, StoreFile.DEFAULT_BLOCKSIZE_SMALL)
HFile.DEFAULT_COMPRESSION_ALGORITHM, .withFilePath(f)
null, conf, cacheConf, KeyValue.COMPARATOR, bt[x], expKeys[x]); .withBloomType(bt[x])
.withMaxKeyCount(expKeys[x])
.build();
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
for (int i = 0; i < rowCount*2; i += 2) { // rows for (int i = 0; i < rowCount*2; i += 2) { // rows
@ -550,10 +560,12 @@ public class TestStoreFile extends HBaseTestCase {
conf.setInt(HFile.FORMAT_VERSION_KEY, 1); conf.setInt(HFile.FORMAT_VERSION_KEY, 1);
// this should not create a bloom because the max keys is too small // this should not create a bloom because the max keys is too small
StoreFile.Writer writer = new StoreFile.Writer(fs, f, StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, StoreFile.DEFAULT_BLOCKSIZE_SMALL)
null, conf, cacheConf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, .withFilePath(f)
2000); .withBloomType(StoreFile.BloomType.ROW)
.withMaxKeyCount(2000)
.build();
assertFalse(writer.hasGeneralBloom()); assertFalse(writer.hasGeneralBloom());
writer.close(); writer.close();
fs.delete(f, true); fs.delete(f, true);
@ -562,22 +574,25 @@ public class TestStoreFile extends HBaseTestCase {
Integer.MAX_VALUE); Integer.MAX_VALUE);
// TODO: commented out because we run out of java heap space on trunk // TODO: commented out because we run out of java heap space on trunk
/*
// the below config caused IllegalArgumentException in our production cluster // the below config caused IllegalArgumentException in our production cluster
// however, the resulting byteSize is < MAX_INT, so this should work properly // however, the resulting byteSize is < MAX_INT, so this should work properly
writer = new StoreFile.Writer(fs, f, writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, StoreFile.DEFAULT_BLOCKSIZE_SMALL)
conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 272446963); .withFilePath(f)
assertTrue(writer.hasBloom()); .withBloomType(StoreFile.BloomType.ROW)
.withMaxKeyCount(27244696)
.build();
assertTrue(writer.hasGeneralBloom());
bloomWriteRead(writer, fs); bloomWriteRead(writer, fs);
*/
// this, however, is too large and should not create a bloom // this, however, is too large and should not create a bloom
// because Java can't create a contiguous array > MAX_INT // because Java can't create a contiguous array > MAX_INT
writer = new StoreFile.Writer(fs, f, writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM, StoreFile.DEFAULT_BLOCKSIZE_SMALL)
null, conf, cacheConf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, .withFilePath(f)
Integer.MAX_VALUE); .withBloomType(StoreFile.BloomType.ROW)
.withMaxKeyCount(Integer.MAX_VALUE)
.build();
assertFalse(writer.hasGeneralBloom()); assertFalse(writer.hasGeneralBloom());
writer.close(); writer.close();
fs.delete(f, true); fs.delete(f, true);
@ -668,8 +683,10 @@ public class TestStoreFile extends HBaseTestCase {
Path storedir = new Path(new Path(this.testDir, "regionname"), Path storedir = new Path(new Path(this.testDir, "regionname"),
"familyname"); "familyname");
Path dir = new Path(storedir, "1234567890"); Path dir = new Path(storedir, "1234567890");
StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024, StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf,
conf, cacheConf); this.fs, 8 * 1024)
.withOutputDir(dir)
.build();
List<KeyValue> kvList = getKeyValueSet(timestamps,numRows, List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
family, qualifier); family, qualifier);
@ -838,10 +855,11 @@ public class TestStoreFile extends HBaseTestCase {
totalSize += kv.getLength() + 1; totalSize += kv.getLength() + 1;
} }
int blockSize = totalSize / numBlocks; int blockSize = totalSize / numBlocks;
StoreFile.Writer writer = new StoreFile.Writer(fs, path, blockSize, StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
HFile.DEFAULT_COMPRESSION_ALGORITHM, blockSize)
null, conf, cacheConf, KeyValue.COMPARATOR, StoreFile.BloomType.NONE, .withFilePath(path)
2000); .withMaxKeyCount(2000)
.build();
// We'll write N-1 KVs to ensure we don't write an extra block // We'll write N-1 KVs to ensure we don't write an extra block
kvs.remove(kvs.size()-1); kvs.remove(kvs.size()-1);
for (KeyValue kv : kvs) { for (KeyValue kv : kvs) {
@ -867,15 +885,12 @@ public class TestStoreFile extends HBaseTestCase {
dataBlockEncoderAlgo, dataBlockEncoderAlgo,
dataBlockEncoderAlgo); dataBlockEncoderAlgo);
cacheConf = new CacheConfig(conf); cacheConf = new CacheConfig(conf);
StoreFile.Writer writer = new StoreFile.Writer(fs, StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
path, HFile.DEFAULT_BLOCKSIZE, HFile.DEFAULT_BLOCKSIZE)
HFile.DEFAULT_COMPRESSION_ALGORITHM, .withFilePath(path)
dataBlockEncoder, .withDataBlockEncoder(dataBlockEncoder)
conf, .withMaxKeyCount(2000)
cacheConf, .build();
KeyValue.COMPARATOR,
StoreFile.BloomType.NONE,
2000);
writer.close(); writer.close();
StoreFile storeFile = new StoreFile(fs, writer.getPath(), conf, StoreFile storeFile = new StoreFile(fs, writer.getPath(), conf,

View File

@ -197,7 +197,7 @@ public class TestWALReplay {
HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf); HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
Path f = new Path(basedir, "hfile"); Path f = new Path(basedir, "hfile");
HFile.Writer writer = HFile.Writer writer =
HFile.getWriterFactoryNoCache(conf).createWriter(this.fs, f); HFile.getWriterFactoryNoCache(conf).withPath(fs, f).create();
byte [] family = htd.getFamilies().iterator().next().getName(); byte [] family = htd.getFamilies().iterator().next().getName();
byte [] row = Bytes.toBytes(tableNameStr); byte [] row = Bytes.toBytes(tableNameStr);
writer.append(new KeyValue(row, family, family, row)); writer.append(new KeyValue(row, family, family, row));