[jira] [HBASE-5274] Filter out expired scanners on compaction as well
Summary: This is a followup for D1017 to make it similar to D909 (89-fb). The fix for 89-fb used the TTL-based scanner filtering logic on both normal scanners and compactions, while the trunk fix D1017 did not. This is just the delta between the two diffs that brings filtering expired store files on compaction to trunk. Test Plan: Unit tests Reviewers: Liyin, JIRA, lhofhansl, Kannan Reviewed By: Liyin CC: Liyin, tedyu, Kannan, mbautin, lhofhansl Differential Revision: https://reviews.facebook.net/D1473 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1236483 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
36788d1cc2
commit
e06a31d62b
|
@ -845,6 +845,11 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all scanners with no filtering based on TTL (that happens further down
|
||||||
|
* the line).
|
||||||
|
* @return all scanners for this store
|
||||||
|
*/
|
||||||
protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
|
protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
|
||||||
boolean isGet,
|
boolean isGet,
|
||||||
boolean isCompaction,
|
boolean isCompaction,
|
||||||
|
@ -964,10 +969,10 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
+ StringUtils.humanReadableInt(storeSize));
|
+ StringUtils.humanReadableInt(storeSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Compact the most recent N files. Essentially a hook for testing.
|
* Compact the most recent N files. Used in testing.
|
||||||
*/
|
*/
|
||||||
protected void compactRecent(int N) throws IOException {
|
public void compactRecentForTesting(int N) throws IOException {
|
||||||
List<StoreFile> filesToCompact;
|
List<StoreFile> filesToCompact;
|
||||||
long maxId;
|
long maxId;
|
||||||
boolean isMajor;
|
boolean isMajor;
|
||||||
|
@ -1926,7 +1931,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a scanner for both the memstore and the HStore files
|
* Return a scanner for both the memstore and the HStore files. Assumes we
|
||||||
|
* are not in a compaction.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public StoreScanner getScanner(Scan scan,
|
public StoreScanner getScanner(Scan scan,
|
||||||
|
|
|
@ -56,6 +56,8 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
private final boolean isGet;
|
private final boolean isGet;
|
||||||
private final boolean explicitColumnQuery;
|
private final boolean explicitColumnQuery;
|
||||||
private final boolean useRowColBloom;
|
private final boolean useRowColBloom;
|
||||||
|
private final Scan scan;
|
||||||
|
private final NavigableSet<byte[]> columns;
|
||||||
private final long oldestUnexpiredTS;
|
private final long oldestUnexpiredTS;
|
||||||
private final int minVersions;
|
private final int minVersions;
|
||||||
|
|
||||||
|
@ -77,6 +79,8 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
isGet = scan.isGetScan();
|
isGet = scan.isGetScan();
|
||||||
int numCol = columns == null ? 0 : columns.size();
|
int numCol = columns == null ? 0 : columns.size();
|
||||||
explicitColumnQuery = numCol > 0;
|
explicitColumnQuery = numCol > 0;
|
||||||
|
this.scan = scan;
|
||||||
|
this.columns = columns;
|
||||||
oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
|
oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
|
||||||
this.minVersions = minVersions;
|
this.minVersions = minVersions;
|
||||||
|
|
||||||
|
@ -88,7 +92,8 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Opens a scanner across memstore, snapshot, and all StoreFiles.
|
* Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
|
||||||
|
* are not in a compaction.
|
||||||
*
|
*
|
||||||
* @param store who we scan
|
* @param store who we scan
|
||||||
* @param scan the spec
|
* @param scan the spec
|
||||||
|
@ -109,7 +114,7 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
oldestUnexpiredTS);
|
oldestUnexpiredTS);
|
||||||
|
|
||||||
// Pass columns to try to filter out unnecessary StoreFiles.
|
// Pass columns to try to filter out unnecessary StoreFiles.
|
||||||
List<KeyValueScanner> scanners = getScanners(scan, columns);
|
List<KeyValueScanner> scanners = getScannersNoCompaction();
|
||||||
|
|
||||||
// Seek all scanners to the start of the Row (or if the exact matching row
|
// Seek all scanners to the start of the Row (or if the exact matching row
|
||||||
// key does not exist, then to the start of the next matching Row).
|
// key does not exist, then to the start of the next matching Row).
|
||||||
|
@ -150,6 +155,9 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType,
|
matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType,
|
||||||
smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
|
smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
|
||||||
|
|
||||||
|
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
|
||||||
|
scanners = selectScannersFrom(scanners);
|
||||||
|
|
||||||
// Seek all scanners to the initial key
|
// Seek all scanners to the initial key
|
||||||
for(KeyValueScanner scanner : scanners) {
|
for(KeyValueScanner scanner : scanners) {
|
||||||
scanner.seek(matcher.getStartKey());
|
scanner.seek(matcher.getStartKey());
|
||||||
|
@ -159,7 +167,7 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
heap = new KeyValueHeap(scanners, store.comparator);
|
heap = new KeyValueHeap(scanners, store.comparator);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Constructor for testing.
|
/** Constructor for testing. */
|
||||||
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
|
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
|
||||||
StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
|
StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
|
||||||
final List<KeyValueScanner> scanners) throws IOException {
|
final List<KeyValueScanner> scanners) throws IOException {
|
||||||
|
@ -202,18 +210,22 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
tableName, family) + "getsize";
|
tableName, family) + "getsize";
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* @return List of scanners ordered properly.
|
* Get a filtered list of scanners. Assumes we are not in a compaction.
|
||||||
|
* @return list of scanners to seek
|
||||||
*/
|
*/
|
||||||
private List<KeyValueScanner> getScanners() throws IOException {
|
private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
|
||||||
return this.store.getScanners(cacheBlocks, isGet, false, null);
|
final boolean isCompaction = false;
|
||||||
|
return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
|
||||||
|
isCompaction, matcher));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* @return List of scanners to seek, possibly filtered by StoreFile.
|
* Filters the given list of scanners using Bloom filter, time range, and
|
||||||
|
* TTL.
|
||||||
*/
|
*/
|
||||||
private List<KeyValueScanner> getScanners(Scan scan,
|
private List<KeyValueScanner> selectScannersFrom(
|
||||||
final NavigableSet<byte[]> columns) throws IOException {
|
final List<? extends KeyValueScanner> allScanners) {
|
||||||
boolean memOnly;
|
boolean memOnly;
|
||||||
boolean filesOnly;
|
boolean filesOnly;
|
||||||
if (scan instanceof InternalScan) {
|
if (scan instanceof InternalScan) {
|
||||||
|
@ -224,11 +236,9 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
memOnly = false;
|
memOnly = false;
|
||||||
filesOnly = false;
|
filesOnly = false;
|
||||||
}
|
}
|
||||||
List<KeyValueScanner> allStoreScanners =
|
|
||||||
this.store.getScanners(cacheBlocks, isGet, false, this.matcher);
|
|
||||||
|
|
||||||
List<KeyValueScanner> scanners =
|
List<KeyValueScanner> scanners =
|
||||||
new ArrayList<KeyValueScanner>(allStoreScanners.size());
|
new ArrayList<KeyValueScanner>(allScanners.size());
|
||||||
|
|
||||||
// We can only exclude store files based on TTL if minVersions is set to 0.
|
// We can only exclude store files based on TTL if minVersions is set to 0.
|
||||||
// Otherwise, we might have to return KVs that have technically expired.
|
// Otherwise, we might have to return KVs that have technically expired.
|
||||||
|
@ -236,7 +246,7 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
Long.MIN_VALUE;
|
Long.MIN_VALUE;
|
||||||
|
|
||||||
// include only those scan files which pass all filters
|
// include only those scan files which pass all filters
|
||||||
for (KeyValueScanner kvs : allStoreScanners) {
|
for (KeyValueScanner kvs : allScanners) {
|
||||||
boolean isFile = kvs.isFileScanner();
|
boolean isFile = kvs.isFileScanner();
|
||||||
if ((!isFile && filesOnly) || (isFile && memOnly)) {
|
if ((!isFile && filesOnly) || (isFile && memOnly)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -246,7 +256,6 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
scanners.add(kvs);
|
scanners.add(kvs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return scanners;
|
return scanners;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,7 +290,7 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
public synchronized boolean seek(KeyValue key) throws IOException {
|
public synchronized boolean seek(KeyValue key) throws IOException {
|
||||||
if (this.heap == null) {
|
if (this.heap == null) {
|
||||||
|
|
||||||
List<KeyValueScanner> scanners = getScanners();
|
List<KeyValueScanner> scanners = getScannersNoCompaction();
|
||||||
|
|
||||||
heap = new KeyValueHeap(scanners, store.comparator);
|
heap = new KeyValueHeap(scanners, store.comparator);
|
||||||
}
|
}
|
||||||
|
@ -479,7 +488,7 @@ class StoreScanner extends NonLazyKeyValueScanner
|
||||||
/* When we have the scan object, should we not pass it to getScanners()
|
/* When we have the scan object, should we not pass it to getScanners()
|
||||||
* to get a limited set of scanners? We did so in the constructor and we
|
* to get a limited set of scanners? We did so in the constructor and we
|
||||||
* could have done it now by storing the scan object from the constructor */
|
* could have done it now by storing the scan object from the constructor */
|
||||||
List<KeyValueScanner> scanners = getScanners();
|
List<KeyValueScanner> scanners = getScannersNoCompaction();
|
||||||
|
|
||||||
for(KeyValueScanner scanner : scanners) {
|
for(KeyValueScanner scanner : scanners) {
|
||||||
scanner.seek(lastTopKey);
|
scanner.seek(lastTopKey);
|
||||||
|
|
|
@ -742,7 +742,7 @@ public class SchemaMetrics {
|
||||||
return metricsSnapshot;
|
return metricsSnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getLong(Map<String, Long> m, String k) {
|
public static long getLong(Map<String, Long> m, String k) {
|
||||||
Long l = m.get(k);
|
Long l = m.get(k);
|
||||||
return l != null ? l : 0;
|
return l != null ? l : 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,9 +20,9 @@ import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -35,9 +35,12 @@ import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.SmallTests;
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
|
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.BlockMetricType;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -70,19 +73,27 @@ public class TestScannerSelectionUsingTTL {
|
||||||
private static final int NUM_ROWS = 8;
|
private static final int NUM_ROWS = 8;
|
||||||
private static final int NUM_COLS_PER_ROW = 5;
|
private static final int NUM_COLS_PER_ROW = 5;
|
||||||
|
|
||||||
public final int numFreshFiles;
|
public final int numFreshFiles, totalNumFiles;
|
||||||
|
|
||||||
|
/** Whether we are specifying the exact files to compact */
|
||||||
|
private final boolean explicitCompaction;
|
||||||
|
|
||||||
@Parameters
|
@Parameters
|
||||||
public static Collection<Object[]> parametersNumFreshFiles() {
|
public static Collection<Object[]> parameters() {
|
||||||
return Arrays.asList(new Object[][]{
|
List<Object[]> params = new ArrayList<Object[]>();
|
||||||
new Object[] { new Integer(1) },
|
for (int numFreshFiles = 1; numFreshFiles <= 3; ++numFreshFiles) {
|
||||||
new Object[] { new Integer(2) },
|
for (boolean explicitCompaction : new boolean[] { false, true }) {
|
||||||
new Object[] { new Integer(3) }
|
params.add(new Object[] { numFreshFiles, explicitCompaction });
|
||||||
});
|
}
|
||||||
|
}
|
||||||
|
return params;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestScannerSelectionUsingTTL(int numFreshFiles) {
|
public TestScannerSelectionUsingTTL(int numFreshFiles,
|
||||||
|
boolean explicitCompaction) {
|
||||||
this.numFreshFiles = numFreshFiles;
|
this.numFreshFiles = numFreshFiles;
|
||||||
|
this.totalNumFiles = numFreshFiles + NUM_EXPIRED_FILES;
|
||||||
|
this.explicitCompaction = explicitCompaction;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -101,7 +112,7 @@ public class TestScannerSelectionUsingTTL {
|
||||||
HRegion.createHRegion(info, TEST_UTIL.getClusterTestDir(),
|
HRegion.createHRegion(info, TEST_UTIL.getClusterTestDir(),
|
||||||
TEST_UTIL.getConfiguration(), htd);
|
TEST_UTIL.getConfiguration(), htd);
|
||||||
|
|
||||||
for (int iFile = 0; iFile < NUM_EXPIRED_FILES + numFreshFiles; ++iFile) {
|
for (int iFile = 0; iFile < totalNumFiles; ++iFile) {
|
||||||
if (iFile == NUM_EXPIRED_FILES) {
|
if (iFile == NUM_EXPIRED_FILES) {
|
||||||
Threads.sleepWithoutInterrupt(TTL_MS);
|
Threads.sleepWithoutInterrupt(TTL_MS);
|
||||||
}
|
}
|
||||||
|
@ -135,11 +146,28 @@ public class TestScannerSelectionUsingTTL {
|
||||||
assertEquals(NUM_ROWS, numReturnedRows);
|
assertEquals(NUM_ROWS, numReturnedRows);
|
||||||
Set<String> accessedFiles = cache.getCachedFileNamesForTest();
|
Set<String> accessedFiles = cache.getCachedFileNamesForTest();
|
||||||
LOG.debug("Files accessed during scan: " + accessedFiles);
|
LOG.debug("Files accessed during scan: " + accessedFiles);
|
||||||
assertEquals("If " + (NUM_EXPIRED_FILES + numFreshFiles) + " files are "
|
|
||||||
+ "accessed instead of " + numFreshFiles + ", we are "
|
|
||||||
+ "not filtering expired files out.", numFreshFiles,
|
|
||||||
accessedFiles.size());
|
|
||||||
|
|
||||||
|
Map<String, Long> metricsBeforeCompaction =
|
||||||
|
SchemaMetrics.getMetricsSnapshot();
|
||||||
|
|
||||||
|
// Exercise both compaction codepaths.
|
||||||
|
if (explicitCompaction) {
|
||||||
|
region.getStore(FAMILY_BYTES).compactRecentForTesting(totalNumFiles);
|
||||||
|
} else {
|
||||||
|
region.compactStores();
|
||||||
|
}
|
||||||
|
|
||||||
|
SchemaMetrics.validateMetricChanges(metricsBeforeCompaction);
|
||||||
|
Map<String, Long> compactionMetrics =
|
||||||
|
SchemaMetrics.diffMetrics(metricsBeforeCompaction,
|
||||||
|
SchemaMetrics.getMetricsSnapshot());
|
||||||
|
long compactionDataBlocksRead = SchemaMetrics.getLong(
|
||||||
|
compactionMetrics,
|
||||||
|
SchemaMetrics.getInstance(TABLE, FAMILY).getBlockMetricName(
|
||||||
|
BlockCategory.DATA, true, BlockMetricType.READ_COUNT));
|
||||||
|
assertEquals("Invalid number of blocks accessed during compaction. " +
|
||||||
|
"We only expect non-expired files to be accessed.",
|
||||||
|
numFreshFiles, compactionDataBlocksRead);
|
||||||
region.close();
|
region.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -391,7 +391,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
Store store2 = this.r.stores.get(fam2);
|
Store store2 = this.r.stores.get(fam2);
|
||||||
int numFiles1 = store2.getStorefiles().size();
|
int numFiles1 = store2.getStorefiles().size();
|
||||||
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
|
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
|
||||||
store2.compactRecent(compactionThreshold); // = 3
|
store2.compactRecentForTesting(compactionThreshold); // = 3
|
||||||
int numFiles2 = store2.getStorefiles().size();
|
int numFiles2 = store2.getStorefiles().size();
|
||||||
// Check that we did compact
|
// Check that we did compact
|
||||||
assertTrue("Number of store files should go down", numFiles1 > numFiles2);
|
assertTrue("Number of store files should go down", numFiles1 > numFiles2);
|
||||||
|
|
Loading…
Reference in New Issue