HBASE-8166 Avoid writing the memstoreTS into HFiles when possible
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1459960 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4c9177de9e
commit
9066302dcd
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile.WriterBuilder;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
|
@ -338,6 +339,7 @@ public class HFile {
|
|||
protected KeyComparator comparator;
|
||||
protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
|
||||
protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM;
|
||||
protected boolean includeMVCCReadpoint = true;
|
||||
|
||||
WriterFactory(Configuration conf, CacheConfig cacheConf) {
|
||||
this.conf = conf;
|
||||
|
@ -398,6 +400,15 @@ public class HFile {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
|
||||
* @return this (for chained invocation)
|
||||
*/
|
||||
public WriterFactory includeMVCCReadpoint(boolean includeMVCCReadpoint) {
|
||||
this.includeMVCCReadpoint = includeMVCCReadpoint;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Writer create() throws IOException {
|
||||
if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
|
||||
throw new AssertionError("Please specify exactly one of " +
|
||||
|
@ -407,7 +418,7 @@ public class HFile {
|
|||
ostream = AbstractHFileWriter.createOutputStream(conf, fs, path);
|
||||
}
|
||||
return createWriter(fs, path, ostream, blockSize,
|
||||
compression, encoder, comparator, checksumType, bytesPerChecksum);
|
||||
compression, encoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint);
|
||||
}
|
||||
|
||||
protected abstract Writer createWriter(FileSystem fs, Path path,
|
||||
|
@ -415,7 +426,7 @@ public class HFile {
|
|||
Compression.Algorithm compress,
|
||||
HFileDataBlockEncoder dataBlockEncoder,
|
||||
KeyComparator comparator, ChecksumType checksumType,
|
||||
int bytesPerChecksum) throws IOException;
|
||||
int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException;
|
||||
}
|
||||
|
||||
/** The configuration key for HFile version to use for new files */
|
||||
|
|
|
@ -85,7 +85,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
|
|||
private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
|
||||
private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
|
||||
|
||||
private final boolean includeMemstoreTS = true;
|
||||
private final boolean includeMemstoreTS;
|
||||
private long maxMemstoreTS = 0;
|
||||
|
||||
static class WriterFactoryV2 extends HFile.WriterFactory {
|
||||
|
@ -98,9 +98,9 @@ public class HFileWriterV2 extends AbstractHFileWriter {
|
|||
FSDataOutputStream ostream, int blockSize,
|
||||
Compression.Algorithm compress, HFileDataBlockEncoder blockEncoder,
|
||||
final KeyComparator comparator, final ChecksumType checksumType,
|
||||
final int bytesPerChecksum) throws IOException {
|
||||
return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize,
|
||||
compress, blockEncoder, comparator, checksumType, bytesPerChecksum);
|
||||
final int bytesPerChecksum, boolean includeMVCCReadpoint) throws IOException {
|
||||
return new HFileWriterV2(conf, cacheConf, fs, path, ostream, blockSize, compress,
|
||||
blockEncoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,12 +109,13 @@ public class HFileWriterV2 extends AbstractHFileWriter {
|
|||
FileSystem fs, Path path, FSDataOutputStream ostream, int blockSize,
|
||||
Compression.Algorithm compressAlgo, HFileDataBlockEncoder blockEncoder,
|
||||
final KeyComparator comparator, final ChecksumType checksumType,
|
||||
final int bytesPerChecksum) throws IOException {
|
||||
final int bytesPerChecksum, final boolean includeMVCCReadpoint) throws IOException {
|
||||
super(cacheConf,
|
||||
ostream == null ? createOutputStream(conf, fs, path) : ostream,
|
||||
path, blockSize, compressAlgo, blockEncoder, comparator);
|
||||
this.checksumType = checksumType;
|
||||
this.bytesPerChecksum = bytesPerChecksum;
|
||||
this.includeMemstoreTS = includeMVCCReadpoint;
|
||||
finishInit(conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -827,7 +827,7 @@ public class HStore implements Store {
|
|||
*/
|
||||
private StoreFile.Writer createWriterInTmp(long maxKeyCount)
|
||||
throws IOException {
|
||||
return createWriterInTmp(maxKeyCount, this.family.getCompression(), false);
|
||||
return createWriterInTmp(maxKeyCount, this.family.getCompression(), false, true);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -837,7 +837,7 @@ public class HStore implements Store {
|
|||
* @return Writer for a new StoreFile in the tmp dir.
|
||||
*/
|
||||
public StoreFile.Writer createWriterInTmp(long maxKeyCount,
|
||||
Compression.Algorithm compression, boolean isCompaction)
|
||||
Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint)
|
||||
throws IOException {
|
||||
final CacheConfig writerCacheConf;
|
||||
if (isCompaction) {
|
||||
|
@ -857,6 +857,7 @@ public class HStore implements Store {
|
|||
.withChecksumType(checksumType)
|
||||
.withBytesPerChecksum(bytesPerChecksum)
|
||||
.withCompression(compression)
|
||||
.includeMVCCReadpoint(includeMVCCReadpoint)
|
||||
.build();
|
||||
return w;
|
||||
}
|
||||
|
|
|
@ -143,10 +143,11 @@ public interface Store extends HeapSize, StoreConfigInformation {
|
|||
* @param maxKeyCount
|
||||
* @param compression Compression algorithm to use
|
||||
* @param isCompaction whether we are creating a new file in a compaction
|
||||
* @param includeMVCCReadpoint whether we should out the MVCC readpoint
|
||||
* @return Writer for a new StoreFile in the tmp dir.
|
||||
*/
|
||||
public StoreFile.Writer createWriterInTmp(long maxKeyCount,
|
||||
Compression.Algorithm compression, boolean isCompaction) throws IOException;
|
||||
public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
||||
boolean isCompaction, boolean includeMVCCReadpoint) throws IOException;
|
||||
|
||||
// Compaction oriented methods
|
||||
|
||||
|
|
|
@ -530,6 +530,7 @@ public class StoreFile {
|
|||
private Path filePath;
|
||||
private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE;
|
||||
private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM;
|
||||
private boolean includeMVCCReadpoint = true;
|
||||
|
||||
public WriterBuilder(Configuration conf, CacheConfig cacheConf,
|
||||
FileSystem fs, int blockSize) {
|
||||
|
@ -614,6 +615,15 @@ public class StoreFile {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
|
||||
* @return this (for chained invocation)
|
||||
*/
|
||||
public WriterBuilder includeMVCCReadpoint(boolean includeMVCCReadpoint) {
|
||||
this.includeMVCCReadpoint = includeMVCCReadpoint;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a store file writer. Client is responsible for closing file when
|
||||
* done. If metadata, add BEFORE closing using
|
||||
|
@ -648,7 +658,7 @@ public class StoreFile {
|
|||
}
|
||||
return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder,
|
||||
conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType,
|
||||
bytesPerChecksum);
|
||||
bytesPerChecksum, includeMVCCReadpoint);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -752,6 +762,7 @@ public class StoreFile {
|
|||
* for Bloom filter size in {@link HFile} format version 1.
|
||||
* @param checksumType the checksum type
|
||||
* @param bytesPerChecksum the number of bytes per checksum value
|
||||
* @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV
|
||||
* @throws IOException problem writing to FS
|
||||
*/
|
||||
private Writer(FileSystem fs, Path path, int blocksize,
|
||||
|
@ -759,8 +770,8 @@ public class StoreFile {
|
|||
HFileDataBlockEncoder dataBlockEncoder, final Configuration conf,
|
||||
CacheConfig cacheConf,
|
||||
final KVComparator comparator, BloomType bloomType, long maxKeys,
|
||||
final ChecksumType checksumType, final int bytesPerChecksum)
|
||||
throws IOException {
|
||||
final ChecksumType checksumType, final int bytesPerChecksum,
|
||||
final boolean includeMVCCReadpoint) throws IOException {
|
||||
this.dataBlockEncoder = dataBlockEncoder != null ?
|
||||
dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE;
|
||||
writer = HFile.getWriterFactory(conf, cacheConf)
|
||||
|
@ -771,6 +782,7 @@ public class StoreFile {
|
|||
.withComparator(comparator.getRawComparator())
|
||||
.withChecksumType(checksumType)
|
||||
.withBytesPerChecksum(bytesPerChecksum)
|
||||
.includeMVCCReadpoint(includeMVCCReadpoint)
|
||||
.create();
|
||||
|
||||
this.kvComparator = comparator;
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.CellOutputStream;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
|
||||
|
@ -111,6 +113,8 @@ public abstract class Compactor {
|
|||
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
|
||||
/** The last key in the files we're compacting. */
|
||||
public long maxSeqId = 0;
|
||||
/** Latest memstore read point found in any of the involved files */
|
||||
public long maxMVCCReadpoint = 0;
|
||||
}
|
||||
|
||||
protected FileDetails getFileDetails(
|
||||
|
@ -130,11 +134,17 @@ public abstract class Compactor {
|
|||
long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())
|
||||
? r.getFilterEntries() : r.getEntries();
|
||||
fd.maxKeyCount += keyCount;
|
||||
// calculate the latest MVCC readpoint in any of the involved store files
|
||||
Map<byte[], byte[]> fileInfo = r.loadFileInfo();
|
||||
byte tmp[] = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
|
||||
if (tmp != null) {
|
||||
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
|
||||
}
|
||||
// If required, calculate the earliest put timestamp of all involved storefiles.
|
||||
// This is used to remove family delete marker during compaction.
|
||||
long earliestPutTs = 0;
|
||||
if (calculatePutTs) {
|
||||
byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
|
||||
tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
|
||||
if (tmp == null) {
|
||||
// There's a file with no information, must be an old one
|
||||
// assume we have very old puts
|
||||
|
|
|
@ -20,36 +20,22 @@ package org.apache.hadoop.hbase.regionserver.compactions;
|
|||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreScanner;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Compact passed set of files. Create an instance and then call {@link #compact(CompactionRequest)}
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DefaultCompactor extends Compactor {
|
||||
private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
|
||||
|
||||
public DefaultCompactor(final Configuration conf, final Store store) {
|
||||
super(conf, store);
|
||||
}
|
||||
|
@ -84,7 +70,8 @@ public class DefaultCompactor extends Compactor {
|
|||
}
|
||||
// Create the writer even if no kv(Empty store file is also ok),
|
||||
// because we need record the max seq id for the store file, see HBASE-6059
|
||||
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true);
|
||||
writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
|
||||
fd.maxMVCCReadpoint >= smallestReadPoint);
|
||||
boolean finished = performCompaction(scanner, writer, smallestReadPoint);
|
||||
if (!finished) {
|
||||
abortWriter(writer);
|
||||
|
|
|
@ -347,7 +347,7 @@ public class HFileReadWriteTest {
|
|||
HRegion region = new HRegion(outputDir, null, fs, conf, regionInfo, htd, null);
|
||||
HStore store = new HStore(region, columnDescriptor, conf);
|
||||
|
||||
StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false);
|
||||
StoreFile.Writer writer = store.createWriterInTmp(maxKeyCount, compression, false, true);
|
||||
|
||||
StatisticsPrinter statsPrinter = new StatisticsPrinter();
|
||||
statsPrinter.startThread();
|
||||
|
|
|
@ -198,7 +198,7 @@ public class TestCacheOnWriteInSchema {
|
|||
public void testCacheOnWriteInSchema() throws IOException {
|
||||
// Write some random data into the store
|
||||
StoreFile.Writer writer = store.createWriterInTmp(Integer.MAX_VALUE,
|
||||
HFile.DEFAULT_COMPRESSION_ALGORITHM, false);
|
||||
HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true);
|
||||
writeStoreFile(writer);
|
||||
writer.close();
|
||||
// Verify the block types of interest were cached on write
|
||||
|
|
|
@ -178,7 +178,7 @@ public class TestStore extends TestCase {
|
|||
init(getName(), conf, hcd);
|
||||
|
||||
// Test createWriterInTmp()
|
||||
StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false);
|
||||
StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true);
|
||||
Path path = writer.getPath();
|
||||
writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
|
||||
writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
|
||||
|
|
Loading…
Reference in New Issue