HBASE-19027 Honor the CellComparator of ScanInfo in scanning over a store
This commit is contained in:
parent
f812218ffe
commit
35a2b8e53c
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hbase.util.CollectionUtils;
|
import org.apache.hadoop.hbase.util.CollectionUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -67,6 +68,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
|
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
|
||||||
// In unit tests, the store could be null
|
// In unit tests, the store could be null
|
||||||
protected final HStore store;
|
protected final HStore store;
|
||||||
|
private final CellComparator comparator;
|
||||||
private ScanQueryMatcher matcher;
|
private ScanQueryMatcher matcher;
|
||||||
protected KeyValueHeap heap;
|
protected KeyValueHeap heap;
|
||||||
private boolean cacheBlocks;
|
private boolean cacheBlocks;
|
||||||
|
@ -164,6 +166,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
this.readPt = readPt;
|
this.readPt = readPt;
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.cacheBlocks = cacheBlocks;
|
this.cacheBlocks = cacheBlocks;
|
||||||
|
this.comparator = Preconditions.checkNotNull(scanInfo.getComparator());
|
||||||
get = scan.isGetScan();
|
get = scan.isGetScan();
|
||||||
explicitColumnQuery = numColumns > 0;
|
explicitColumnQuery = numColumns > 0;
|
||||||
this.scan = scan;
|
this.scan = scan;
|
||||||
|
@ -252,7 +255,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
this.storeOffset = scan.getRowOffsetPerColumnFamily();
|
this.storeOffset = scan.getRowOffsetPerColumnFamily();
|
||||||
addCurrentScanners(scanners);
|
addCurrentScanners(scanners);
|
||||||
// Combine all seeked scanners with a heap
|
// Combine all seeked scanners with a heap
|
||||||
resetKVHeap(scanners, store.getComparator());
|
resetKVHeap(scanners, comparator);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// remove us from the HStore#changedReaderObservers here or we'll have no chance to
|
// remove us from the HStore#changedReaderObservers here or we'll have no chance to
|
||||||
// and might cause memory leak
|
// and might cause memory leak
|
||||||
|
@ -311,7 +314,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
|
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
|
||||||
addCurrentScanners(scanners);
|
addCurrentScanners(scanners);
|
||||||
// Combine all seeked scanners with a heap
|
// Combine all seeked scanners with a heap
|
||||||
resetKVHeap(scanners, store.getComparator());
|
resetKVHeap(scanners, comparator);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
|
private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
|
||||||
|
@ -319,7 +322,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// Seek all scanners to the initial key
|
// Seek all scanners to the initial key
|
||||||
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
|
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
|
||||||
addCurrentScanners(scanners);
|
addCurrentScanners(scanners);
|
||||||
resetKVHeap(scanners, scanInfo.getComparator());
|
resetKVHeap(scanners, comparator);
|
||||||
}
|
}
|
||||||
|
|
||||||
// For mob compaction only as we do not have a Store instance when doing mob compaction.
|
// For mob compaction only as we do not have a Store instance when doing mob compaction.
|
||||||
|
@ -543,9 +546,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
scannerContext.clearProgress();
|
scannerContext.clearProgress();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only do a sanity-check if store and comparator are available.
|
|
||||||
CellComparator comparator = store != null ? store.getComparator() : null;
|
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
long totalBytesRead = 0;
|
long totalBytesRead = 0;
|
||||||
|
|
||||||
|
@ -997,7 +997,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
|
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
|
||||||
newCurrentScanners.addAll(fileScanners);
|
newCurrentScanners.addAll(fileScanners);
|
||||||
newCurrentScanners.addAll(memstoreScanners);
|
newCurrentScanners.addAll(memstoreScanners);
|
||||||
newHeap = new KeyValueHeap(newCurrentScanners, store.getComparator());
|
newHeap = new KeyValueHeap(newCurrentScanners, comparator);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("failed to switch to stream read", e);
|
LOG.warn("failed to switch to stream read", e);
|
||||||
if (fileScanners != null) {
|
if (fileScanners != null) {
|
||||||
|
|
|
@ -402,7 +402,7 @@ public abstract class ScanQueryMatcher implements ShipperListener {
|
||||||
columnTracker = (NewVersionBehaviorTracker) deleteTracker;
|
columnTracker = (NewVersionBehaviorTracker) deleteTracker;
|
||||||
} else if (columns == null || columns.size() == 0) {
|
} else if (columns == null || columns.size() == 0) {
|
||||||
columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersionToCheck,
|
columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersionToCheck,
|
||||||
oldestUnexpiredTS);
|
oldestUnexpiredTS, scanInfo.getComparator());
|
||||||
} else {
|
} else {
|
||||||
columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(),
|
columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(),
|
||||||
maxVersionToCheck, oldestUnexpiredTS);
|
maxVersionToCheck, oldestUnexpiredTS);
|
||||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver.querymatcher;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
|
@ -49,16 +49,20 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
|
||||||
|
|
||||||
private long oldestStamp;
|
private long oldestStamp;
|
||||||
|
|
||||||
|
private final CellComparator comparator;
|
||||||
/**
|
/**
|
||||||
* Return maxVersions of every row.
|
* Return maxVersions of every row.
|
||||||
* @param minVersion Minimum number of versions to keep
|
* @param minVersion Minimum number of versions to keep
|
||||||
* @param maxVersion Maximum number of versions to return
|
* @param maxVersion Maximum number of versions to return
|
||||||
* @param oldestUnexpiredTS oldest timestamp that has not expired according to the TTL.
|
* @param oldestUnexpiredTS oldest timestamp that has not expired according to the TTL.
|
||||||
|
* @param comparator used to compare the qualifier of cell
|
||||||
*/
|
*/
|
||||||
public ScanWildcardColumnTracker(int minVersion, int maxVersion, long oldestUnexpiredTS) {
|
public ScanWildcardColumnTracker(int minVersion, int maxVersion,
|
||||||
|
long oldestUnexpiredTS, CellComparator comparator) {
|
||||||
this.maxVersions = maxVersion;
|
this.maxVersions = maxVersion;
|
||||||
this.minVersions = minVersion;
|
this.minVersions = minVersion;
|
||||||
this.oldestStamp = oldestUnexpiredTS;
|
this.oldestStamp = oldestUnexpiredTS;
|
||||||
|
this.comparator = comparator;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -85,7 +89,7 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
|
||||||
// do not count a delete marker as another version
|
// do not count a delete marker as another version
|
||||||
return checkVersion(type, timestamp);
|
return checkVersion(type, timestamp);
|
||||||
}
|
}
|
||||||
int cmp = CellComparatorImpl.COMPARATOR.compareQualifiers(cell, this.columnCell);
|
int cmp = comparator.compareQualifiers(cell, this.columnCell);
|
||||||
if (cmp == 0) {
|
if (cmp == 0) {
|
||||||
if (ignoreCount) {
|
if (ignoreCount) {
|
||||||
return ScanQueryMatcher.MatchCode.INCLUDE;
|
return ScanQueryMatcher.MatchCode.INCLUDE;
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanDeleteTracker;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanDeleteTracker;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||||
|
@ -64,7 +63,7 @@ public class VisibilityScanDeleteTracker extends ScanDeleteTracker {
|
||||||
// Need to track it per ts.
|
// Need to track it per ts.
|
||||||
private List<Triple<List<Tag>, Byte, Long>> visibilityTagsDeleteFamilyVersion = new ArrayList<>();
|
private List<Triple<List<Tag>, Byte, Long>> visibilityTagsDeleteFamilyVersion = new ArrayList<>();
|
||||||
private List<Pair<List<Tag>, Byte>> visibilityTagsDeleteColumns;
|
private List<Pair<List<Tag>, Byte>> visibilityTagsDeleteColumns;
|
||||||
// Tracking as List<List> is to handle same ts cell but different visibility tag.
|
// Tracking as List<List> is to handle same ts cell but different visibility tag.
|
||||||
// TODO : Need to handle puts with same ts but different vis tags.
|
// TODO : Need to handle puts with same ts but different vis tags.
|
||||||
private List<Pair<List<Tag>, Byte>> visiblityTagsDeleteColumnVersion = new ArrayList<>();
|
private List<Pair<List<Tag>, Byte>> visiblityTagsDeleteColumnVersion = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -247,7 +246,7 @@ public class VisibilityScanDeleteTracker extends ScanDeleteTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (deleteCell != null) {
|
if (deleteCell != null) {
|
||||||
int ret = CellComparatorImpl.COMPARATOR.compareQualifiers(cell, deleteCell);
|
int ret = comparator.compareQualifiers(cell, deleteCell);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
if (deleteType == KeyValue.Type.DeleteColumn.getCode()) {
|
if (deleteType == KeyValue.Type.DeleteColumn.getCode()) {
|
||||||
if (visibilityTagsDeleteColumns != null) {
|
if (visibilityTagsDeleteColumns != null) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
|
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
@ -41,7 +42,8 @@ public class TestScanWildcardColumnTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckColumnOk() throws IOException {
|
public void testCheckColumnOk() throws IOException {
|
||||||
ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
|
ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(
|
||||||
|
0, VERSIONS, Long.MIN_VALUE, CellComparatorImpl.COMPARATOR);
|
||||||
|
|
||||||
// Create list of qualifiers
|
// Create list of qualifiers
|
||||||
List<byte[]> qualifiers = new ArrayList<>(4);
|
List<byte[]> qualifiers = new ArrayList<>(4);
|
||||||
|
@ -73,7 +75,8 @@ public class TestScanWildcardColumnTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckColumnEnforceVersions() throws IOException {
|
public void testCheckColumnEnforceVersions() throws IOException {
|
||||||
ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
|
ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(
|
||||||
|
0, VERSIONS, Long.MIN_VALUE, CellComparatorImpl.COMPARATOR);
|
||||||
|
|
||||||
// Create list of qualifiers
|
// Create list of qualifiers
|
||||||
List<byte[]> qualifiers = new ArrayList<>(4);
|
List<byte[]> qualifiers = new ArrayList<>(4);
|
||||||
|
@ -106,7 +109,8 @@ public class TestScanWildcardColumnTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void DisabledTestCheckColumnWrongOrder() {
|
public void DisabledTestCheckColumnWrongOrder() {
|
||||||
ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
|
ScanWildcardColumnTracker tracker = new ScanWildcardColumnTracker(
|
||||||
|
0, VERSIONS, Long.MIN_VALUE, CellComparatorImpl.COMPARATOR);
|
||||||
|
|
||||||
// Create list of qualifiers
|
// Create list of qualifiers
|
||||||
List<byte[]> qualifiers = new ArrayList<>(2);
|
List<byte[]> qualifiers = new ArrayList<>(2);
|
||||||
|
|
Loading…
Reference in New Issue