HBASE-18752 Recalculate the TimeRange in flushing snapshot to store file
This commit is contained in:
parent
fcdf96a0e8
commit
e2cef8aa80
|
@ -119,7 +119,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
|||
status.setStatus("Flushing " + store + ": creating writer");
|
||||
// Write the map out to the disk
|
||||
writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
|
||||
false, true, true, false, snapshot.getTimeRangeTracker());
|
||||
false, true, true, false);
|
||||
IOException e = null;
|
||||
try {
|
||||
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
|
||||
|
|
|
@ -68,8 +68,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
|||
/* isCompaction = */ false,
|
||||
/* includeMVCCReadpoint = */ true,
|
||||
/* includesTags = */ snapshot.isTagsPresent(),
|
||||
/* shouldDropBehind = */ false,
|
||||
snapshot.getTimeRangeTracker());
|
||||
/* shouldDropBehind = */ false);
|
||||
IOException e = null;
|
||||
try {
|
||||
performFlush(scanner, writer, smallestReadPoint, throughputController);
|
||||
|
|
|
@ -1026,21 +1026,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
return sf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxKeyCount
|
||||
* @param compression Compression algorithm to use
|
||||
* @param isCompaction whether we are creating a new file in a compaction
|
||||
* @param includeMVCCReadpoint - whether to include MVCC or not
|
||||
* @param includesTag - includesTag or not
|
||||
* @return Writer for a new StoreFile in the tmp dir.
|
||||
*/
|
||||
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
||||
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
|
||||
boolean shouldDropBehind) throws IOException {
|
||||
return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
|
||||
includesTag, shouldDropBehind, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxKeyCount
|
||||
* @param compression Compression algorithm to use
|
||||
|
@ -1053,7 +1038,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
// compaction
|
||||
public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
|
||||
boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
|
||||
boolean shouldDropBehind, TimeRangeTracker trt) throws IOException {
|
||||
boolean shouldDropBehind) throws IOException {
|
||||
final CacheConfig writerCacheConf;
|
||||
if (isCompaction) {
|
||||
// Don't cache data on write on compactions.
|
||||
|
@ -1079,9 +1064,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
.withFavoredNodes(favoredNodes)
|
||||
.withFileContext(hFileContext)
|
||||
.withShouldDropCacheBehind(shouldDropBehind);
|
||||
if (trt != null) {
|
||||
builder.withTimeRangeTracker(trt);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -71,40 +71,10 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
private long deleteFamilyCnt = 0;
|
||||
private BloomContext bloomContext = null;
|
||||
private BloomContext deleteFamilyBloomContext = null;
|
||||
|
||||
/**
|
||||
* timeRangeTrackerSet is used to figure if we were passed a filled-out TimeRangeTracker or not.
|
||||
* When flushing a memstore, we set the TimeRangeTracker that it accumulated during updates to
|
||||
* memstore in here into this Writer and use this variable to indicate that we do not need to
|
||||
* recalculate the timeRangeTracker bounds; it was done already as part of add-to-memstore.
|
||||
* A completed TimeRangeTracker is not set in cases of compactions when it is recalculated.
|
||||
*/
|
||||
private final boolean timeRangeTrackerSet;
|
||||
final TimeRangeTracker timeRangeTracker;
|
||||
private final TimeRangeTracker timeRangeTracker;
|
||||
|
||||
protected HFile.Writer writer;
|
||||
|
||||
/**
|
||||
* Creates an HFile.Writer that also write helpful meta data.
|
||||
* @param fs file system to write to
|
||||
* @param path file name to create
|
||||
* @param conf user configuration
|
||||
* @param comparator key comparator
|
||||
* @param bloomType bloom filter setting
|
||||
* @param maxKeys the expected maximum number of keys to be added. Was used
|
||||
* for Bloom filter size in {@link HFile} format version 1.
|
||||
* @param fileContext - The HFile context
|
||||
* @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
|
||||
* @throws IOException problem writing to FS
|
||||
*/
|
||||
StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
|
||||
final CellComparator comparator, BloomType bloomType, long maxKeys,
|
||||
InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind)
|
||||
throws IOException {
|
||||
this(fs, path, conf, cacheConf, comparator, bloomType, maxKeys, favoredNodes, fileContext,
|
||||
shouldDropCacheBehind, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an HFile.Writer that also write helpful meta data.
|
||||
* @param fs file system to write to
|
||||
|
@ -117,7 +87,6 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
* @param favoredNodes
|
||||
* @param fileContext - The HFile context
|
||||
* @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
|
||||
* @param trt Ready-made timetracker to use.
|
||||
* @throws IOException problem writing to FS
|
||||
*/
|
||||
private StoreFileWriter(FileSystem fs, Path path,
|
||||
|
@ -125,13 +94,9 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
CacheConfig cacheConf,
|
||||
final CellComparator comparator, BloomType bloomType, long maxKeys,
|
||||
InetSocketAddress[] favoredNodes, HFileContext fileContext,
|
||||
boolean shouldDropCacheBehind, final TimeRangeTracker trt)
|
||||
boolean shouldDropCacheBehind)
|
||||
throws IOException {
|
||||
// If passed a TimeRangeTracker, use it. Set timeRangeTrackerSet so we don't destroy it.
|
||||
// TODO: put the state of the TRT on the TRT; i.e. make a read-only version (TimeRange) when
|
||||
// it no longer writable.
|
||||
this.timeRangeTrackerSet = trt != null;
|
||||
this.timeRangeTracker = this.timeRangeTrackerSet? trt: TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
|
||||
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
|
||||
// TODO : Change all writers to be specifically created for compaction context
|
||||
writer = HFile.getWriterFactory(conf, cacheConf)
|
||||
.withPath(fs, path)
|
||||
|
@ -232,10 +197,8 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
|
||||
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
|
||||
}
|
||||
if (!timeRangeTrackerSet) {
|
||||
timeRangeTracker.includeTimestamp(cell);
|
||||
}
|
||||
}
|
||||
|
||||
private void appendGeneralBloomfilter(final Cell cell) throws IOException {
|
||||
if (this.generalBloomFilterWriter != null) {
|
||||
|
@ -389,7 +352,6 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
private Path filePath;
|
||||
private InetSocketAddress[] favoredNodes;
|
||||
private HFileContext fileContext;
|
||||
private TimeRangeTracker trt;
|
||||
private boolean shouldDropCacheBehind;
|
||||
|
||||
public Builder(Configuration conf, CacheConfig cacheConf,
|
||||
|
@ -408,17 +370,6 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
this.fs = fs;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param trt A premade TimeRangeTracker to use rather than build one per append (building one
|
||||
* of these is expensive so good to pass one in if you have one).
|
||||
* @return this (for chained invocation)
|
||||
*/
|
||||
public Builder withTimeRangeTracker(final TimeRangeTracker trt) {
|
||||
Preconditions.checkNotNull(trt);
|
||||
this.trt = trt;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use either this method or {@link #withFilePath}, but not both.
|
||||
* @param dir Path to column family directory. The directory is created if
|
||||
|
@ -523,7 +474,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
}
|
||||
return new StoreFileWriter(fs, filePath,
|
||||
conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext,
|
||||
shouldDropCacheBehind, trt);
|
||||
shouldDropCacheBehind);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,8 +75,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
|||
StripeMultiFileWriter mw = null;
|
||||
try {
|
||||
mw = req.createWriter(); // Writer according to the policy.
|
||||
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
|
||||
snapshot.getTimeRangeTracker(), cellsCount);
|
||||
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(cellsCount);
|
||||
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
|
||||
mw.init(storeScanner, factory);
|
||||
|
||||
|
@ -104,8 +103,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
|||
return result;
|
||||
}
|
||||
|
||||
private StripeMultiFileWriter.WriterFactory createWriterFactory(
|
||||
final TimeRangeTracker tracker, final long kvCount) {
|
||||
private StripeMultiFileWriter.WriterFactory createWriterFactory(final long kvCount) {
|
||||
return new StripeMultiFileWriter.WriterFactory() {
|
||||
@Override
|
||||
public StoreFileWriter createWriter() throws IOException {
|
||||
|
@ -114,8 +112,7 @@ public class StripeStoreFlusher extends StoreFlusher {
|
|||
/* isCompaction = */ false,
|
||||
/* includeMVCCReadpoint = */ true,
|
||||
/* includesTags = */ true,
|
||||
/* shouldDropBehind = */ false,
|
||||
tracker);
|
||||
/* shouldDropBehind = */ false);
|
||||
return writer;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -515,6 +515,37 @@ public class TestHStore {
|
|||
assertCheck();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeRangeIfSomeCellsAreDroppedInFlush() throws IOException {
|
||||
testTimeRangeIfSomeCellsAreDroppedInFlush(1);
|
||||
testTimeRangeIfSomeCellsAreDroppedInFlush(3);
|
||||
testTimeRangeIfSomeCellsAreDroppedInFlush(5);
|
||||
}
|
||||
|
||||
private void testTimeRangeIfSomeCellsAreDroppedInFlush(int maxVersion) throws IOException {
|
||||
init(this.name.getMethodName(), TEST_UTIL.getConfiguration(),
|
||||
ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(maxVersion).build());
|
||||
long currentTs = 100;
|
||||
long minTs = currentTs;
|
||||
// the extra cell won't be flushed to disk,
|
||||
// so the min of timerange will be different between memStore and hfile.
|
||||
for (int i = 0; i != (maxVersion + 1); ++i) {
|
||||
this.store.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
|
||||
if (i == 1) {
|
||||
minTs = currentTs;
|
||||
}
|
||||
}
|
||||
flushStore(store, id++);
|
||||
|
||||
Collection<HStoreFile> files = store.getStorefiles();
|
||||
assertEquals(1, files.size());
|
||||
HStoreFile f = files.iterator().next();
|
||||
f.initReader();
|
||||
StoreFileReader reader = f.getReader();
|
||||
assertEquals(minTs, reader.timeRange.getMin());
|
||||
assertEquals(currentTs, reader.timeRange.getMax());
|
||||
}
|
||||
|
||||
/**
|
||||
* Getting data from files only
|
||||
* @throws IOException
|
||||
|
|
Loading…
Reference in New Issue