HBASE-18752 Recalculate the TimeRange in flushing snapshot to store file
This commit is contained in:
parent
496fcda1d9
commit
13a53811de
|
@ -119,7 +119,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
||||||
status.setStatus("Flushing " + store + ": creating writer");
|
status.setStatus("Flushing " + store + ": creating writer");
|
||||||
// Write the map out to the disk
|
// Write the map out to the disk
|
||||||
writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
|
writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
|
||||||
false, true, true, false, snapshot.getTimeRangeTracker());
|
false, true, true, false);
|
||||||
IOException e = null;
|
IOException e = null;
|
||||||
try {
|
try {
|
||||||
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
|
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
|
||||||
|
|
|
@ -68,8 +68,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
||||||
/* isCompaction = */ false,
|
/* isCompaction = */ false,
|
||||||
/* includeMVCCReadpoint = */ true,
|
/* includeMVCCReadpoint = */ true,
|
||||||
/* includesTags = */ snapshot.isTagsPresent(),
|
/* includesTags = */ snapshot.isTagsPresent(),
|
||||||
/* shouldDropBehind = */ false,
|
/* shouldDropBehind = */ false);
|
||||||
snapshot.getTimeRangeTracker());
|
|
||||||
IOException e = null;
|
IOException e = null;
|
||||||
try {
|
try {
|
||||||
performFlush(scanner, writer, smallestReadPoint, throughputController);
|
performFlush(scanner, writer, smallestReadPoint, throughputController);
|
||||||
|
|
|
@ -1026,21 +1026,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
return sf;
|
return sf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param maxKeyCount
|
|
||||||
* @param compression Compression algorithm to use
|
|
||||||
* @param isCompaction whether we are creating a new file in a compaction
|
|
||||||
* @param includeMVCCReadpoint - whether to include MVCC or not
|
|
||||||
* @param includesTag - includesTag or not
|
|
||||||
* @return Writer for a new StoreFile in the tmp dir.
|
|
||||||
*/
|
|
||||||
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
|
||||||
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
|
|
||||||
boolean shouldDropBehind) throws IOException {
|
|
||||||
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
|
|
||||||
includesTag, shouldDropBehind, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param maxKeyCount
|
* @param maxKeyCount
|
||||||
* @param compression Compression algorithm to use
|
* @param compression Compression algorithm to use
|
||||||
|
@ -1053,7 +1038,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
// compaction
|
// compaction
|
||||||
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
||||||
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
|
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
|
||||||
boolean shouldDropBehind, TimeRangeTracker trt) throws IOException {
|
boolean shouldDropBehind) throws IOException {
|
||||||
final CacheConfig writerCacheConf;
|
final CacheConfig writerCacheConf;
|
||||||
if (isCompaction) {
|
if (isCompaction) {
|
||||||
// Don't cache data on write on compactions.
|
// Don't cache data on write on compactions.
|
||||||
|
@ -1079,9 +1064,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
.withFavoredNodes(favoredNodes)
|
.withFavoredNodes(favoredNodes)
|
||||||
.withFileContext(hFileContext)
|
.withFileContext(hFileContext)
|
||||||
.withShouldDropCacheBehind(shouldDropBehind);
|
.withShouldDropCacheBehind(shouldDropBehind);
|
||||||
if (trt != null) {
|
|
||||||
builder.withTimeRangeTracker(trt);
|
|
||||||
}
|
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -71,40 +71,10 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||||
private long deleteFamilyCnt = 0;
|
private long deleteFamilyCnt = 0;
|
||||||
private BloomContext bloomContext = null;
|
private BloomContext bloomContext = null;
|
||||||
private BloomContext deleteFamilyBloomContext = null;
|
private BloomContext deleteFamilyBloomContext = null;
|
||||||
|
private final TimeRangeTracker timeRangeTracker;
|
||||||
/**
|
|
||||||
* timeRangeTrackerSet is used to figure if we were passed a filled-out TimeRangeTracker or not.
|
|
||||||
* When flushing a memstore, we set the TimeRangeTracker that it accumulated during updates to
|
|
||||||
* memstore in here into this Writer and use this variable to indicate that we do not need to
|
|
||||||
* recalculate the timeRangeTracker bounds; it was done already as part of add-to-memstore.
|
|
||||||
* A completed TimeRangeTracker is not set in cases of compactions when it is recalculated.
|
|
||||||
*/
|
|
||||||
private final boolean timeRangeTrackerSet;
|
|
||||||
final TimeRangeTracker timeRangeTracker;
|
|
||||||
|
|
||||||
protected HFile.Writer writer;
|
protected HFile.Writer writer;
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates an HFile.Writer that also write helpful meta data.
|
|
||||||
* @param fs file system to write to
|
|
||||||
* @param path file name to create
|
|
||||||
* @param conf user configuration
|
|
||||||
* @param comparator key comparator
|
|
||||||
* @param bloomType bloom filter setting
|
|
||||||
* @param maxKeys the expected maximum number of keys to be added. Was used
|
|
||||||
* for Bloom filter size in {@link HFile} format version 1.
|
|
||||||
* @param fileContext - The HFile context
|
|
||||||
* @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
|
|
||||||
* @throws IOException problem writing to FS
|
|
||||||
*/
|
|
||||||
StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
|
|
||||||
final CellComparator comparator, BloomType bloomType, long maxKeys,
|
|
||||||
InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind)
|
|
||||||
throws IOException {
|
|
||||||
this(fs, path, conf, cacheConf, comparator, bloomType, maxKeys, favoredNodes, fileContext,
|
|
||||||
shouldDropCacheBehind, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an HFile.Writer that also write helpful meta data.
|
* Creates an HFile.Writer that also write helpful meta data.
|
||||||
* @param fs file system to write to
|
* @param fs file system to write to
|
||||||
|
@ -117,7 +87,6 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||||
* @param favoredNodes
|
* @param favoredNodes
|
||||||
* @param fileContext - The HFile context
|
* @param fileContext - The HFile context
|
||||||
* @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
|
* @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
|
||||||
* @param trt Ready-made timetracker to use.
|
|
||||||
* @throws IOException problem writing to FS
|
* @throws IOException problem writing to FS
|
||||||
*/
|
*/
|
||||||
private StoreFileWriter(FileSystem fs, Path path,
|
private StoreFileWriter(FileSystem fs, Path path,
|
||||||
|
@ -125,13 +94,9 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||||
CacheConfig cacheConf,
|
CacheConfig cacheConf,
|
||||||
final CellComparator comparator, BloomType bloomType, long maxKeys,
|
final CellComparator comparator, BloomType bloomType, long maxKeys,
|
||||||
InetSocketAddress[] favoredNodes, HFileContext fileContext,
|
InetSocketAddress[] favoredNodes, HFileContext fileContext,
|
||||||
boolean shouldDropCacheBehind, final TimeRangeTracker trt)
|
boolean shouldDropCacheBehind)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// If passed a TimeRangeTracker, use it. Set timeRangeTrackerSet so we don't destroy it.
|
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
|
||||||
// TODO: put the state of the TRT on the TRT; i.e. make a read-only version (TimeRange) when
|
|
||||||
// it no longer writable.
|
|
||||||
this.timeRangeTrackerSet = trt != null;
|
|
||||||
this.timeRangeTracker = this.timeRangeTrackerSet? trt: TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
|
|
||||||
// TODO : Change all writers to be specifically created for compaction context
|
// TODO : Change all writers to be specifically created for compaction context
|
||||||
writer = HFile.getWriterFactory(conf, cacheConf)
|
writer = HFile.getWriterFactory(conf, cacheConf)
|
||||||
.withPath(fs, path)
|
.withPath(fs, path)
|
||||||
|
@ -232,9 +197,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||||
if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
|
if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
|
||||||
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
|
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
|
||||||
}
|
}
|
||||||
if (!timeRangeTrackerSet) {
|
timeRangeTracker.includeTimestamp(cell);
|
||||||
timeRangeTracker.includeTimestamp(cell);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void appendGeneralBloomfilter(final Cell cell) throws IOException {
|
private void appendGeneralBloomfilter(final Cell cell) throws IOException {
|
||||||
|
@ -389,7 +352,6 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||||
private Path filePath;
|
private Path filePath;
|
||||||
private InetSocketAddress[] favoredNodes;
|
private InetSocketAddress[] favoredNodes;
|
||||||
private HFileContext fileContext;
|
private HFileContext fileContext;
|
||||||
private TimeRangeTracker trt;
|
|
||||||
private boolean shouldDropCacheBehind;
|
private boolean shouldDropCacheBehind;
|
||||||
|
|
||||||
public Builder(Configuration conf, CacheConfig cacheConf,
|
public Builder(Configuration conf, CacheConfig cacheConf,
|
||||||
|
@ -408,17 +370,6 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param trt A premade TimeRangeTracker to use rather than build one per append (building one
|
|
||||||
* of these is expensive so good to pass one in if you have one).
|
|
||||||
* @return this (for chained invocation)
|
|
||||||
*/
|
|
||||||
public Builder withTimeRangeTracker(final TimeRangeTracker trt) {
|
|
||||||
Preconditions.checkNotNull(trt);
|
|
||||||
this.trt = trt;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use either this method or {@link #withFilePath}, but not both.
|
* Use either this method or {@link #withFilePath}, but not both.
|
||||||
* @param dir Path to column family directory. The directory is created if
|
* @param dir Path to column family directory. The directory is created if
|
||||||
|
@ -523,7 +474,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||||
}
|
}
|
||||||
return new StoreFileWriter(fs, filePath,
|
return new StoreFileWriter(fs, filePath,
|
||||||
conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext,
|
conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext,
|
||||||
shouldDropCacheBehind, trt);
|
shouldDropCacheBehind);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,8 +75,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
||||||
StripeMultiFileWriter mw = null;
|
StripeMultiFileWriter mw = null;
|
||||||
try {
|
try {
|
||||||
mw = req.createWriter(); // Writer according to the policy.
|
mw = req.createWriter(); // Writer according to the policy.
|
||||||
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
|
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
|
||||||
snapshot.getTimeRangeTracker(), cellsCount);
|
|
||||||
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
|
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
|
||||||
mw.init(storeScanner, factory);
|
mw.init(storeScanner, factory);
|
||||||
|
|
||||||
|
@ -104,8 +103,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private StripeMultiFileWriter.WriterFactory createWriterFactory(
|
private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
|
||||||
final TimeRangeTracker tracker, final long kvCount) {
|
|
||||||
return new StripeMultiFileWriter.WriterFactory() {
|
return new StripeMultiFileWriter.WriterFactory() {
|
||||||
@Override
|
@Override
|
||||||
public StoreFileWriter createWriter() throws IOException {
|
public StoreFileWriter createWriter() throws IOException {
|
||||||
|
@ -114,8 +112,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
||||||
/* isCompaction = */ false,
|
/* isCompaction = */ false,
|
||||||
/* includeMVCCReadpoint = */ true,
|
/* includeMVCCReadpoint = */ true,
|
||||||
/* includesTags = */ true,
|
/* includesTags = */ true,
|
||||||
/* shouldDropBehind = */ false,
|
/* shouldDropBehind = */ false);
|
||||||
tracker);
|
|
||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -515,6 +515,37 @@ public class TestHStore {
|
||||||
assertCheck();
|
assertCheck();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
|
||||||
|
testTimeRangeIfSomeCellsAreDroppedInFlush(1);
|
||||||
|
testTimeRangeIfSomeCellsAreDroppedInFlush(3);
|
||||||
|
testTimeRangeIfSomeCellsAreDroppedInFlush(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
|
||||||
|
init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
|
||||||
|
long currentTs = 100;
|
||||||
|
long minTs = currentTs;
|
||||||
|
// the extra cell won't be flushed to disk,
|
||||||
|
// so the min of timerange will be different between memStore and hfile.
|
||||||
|
for (int i = 0; i != (maxVersion + 1); ++i) {
|
||||||
|
this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
|
||||||
|
if (i == 1) {
|
||||||
|
minTs = currentTs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
flushStore(store, id++);
|
||||||
|
|
||||||
|
Collection<HStoreFile> files = store.getStorefiles();
|
||||||
|
assertEquals(1, files.size());
|
||||||
|
HStoreFile f = files.iterator().next();
|
||||||
|
f.initReader();
|
||||||
|
StoreFileReader reader = f.getReader();
|
||||||
|
assertEquals(minTs, reader.timeRange.getMin());
|
||||||
|
assertEquals(currentTs, reader.timeRange.getMax());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Getting data from files only
|
* Getting data from files only
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
|
Loading…
Reference in New Issue