HBASE-15236 Inconsistent cell reads over multiple bulk-loaded HFiles. In KeyValueHeap, if two cells are same i.e. have same key and timestamp, then instead of directly using seq id to determine newer one, we should use StoreFile.Comparater.SEQ_ID because that's what is used to determine order of hfiles. In this patch, we assign each scanner an order based on it's index in storefiles list, which is then used in KeyValueHeap to disambiguate between same cells. Changes the getSequenceId() in KeyValueScanner class to getScannerOrder(). Testing: Adds unit test to TestKeyValueHeap. Manual testing: Three cases (Tables t, t2, t3 in the jira description), single region, 2 hfiles with same seq id, timestamps and duplicate KVs. Made sure that returned kv was same for get and scan. (Apekshit)

Change-Id: I22600c91c0a51fb63eb17db73472839d2f13957c

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Apekshit 2016-02-23 00:31:18 -08:00 committed by stack
parent a6e29676db
commit 11740570c1
13 changed files with 210 additions and 233 deletions

View File

@ -189,17 +189,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
if (comparison != 0) {
return comparison;
} else {
// Since both the keys are exactly the same, we break the tie in favor
// of the key which came latest.
long leftSequenceID = left.getSequenceID();
long rightSequenceID = right.getSequenceID();
if (leftSequenceID > rightSequenceID) {
return -1;
} else if (leftSequenceID < rightSequenceID) {
return 1;
} else {
return 0;
}
// Since both the keys are exactly the same, we break the tie in favor of higher ordered
// scanner since it'll have newer data. Since higher value should come first, we reverse
// sort here.
return Long.compare(right.getScannerOrder(), left.getScannerOrder());
}
}
/**
@ -406,8 +399,11 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
return this.heap;
}
/**
* @see KeyValueScanner#getScannerOrder()
*/
@Override
public long getSequenceID() {
public long getScannerOrder() {
return 0;
}

View File

@ -70,13 +70,13 @@ public interface KeyValueScanner extends Shipper, Closeable {
boolean reseek(Cell key) throws IOException;
/**
* Get the sequence id associated with this KeyValueScanner. This is required
* for comparing multiple files to find out which one has the latest data.
* The default implementation for this would be to return 0. A file having
* lower sequence id will be considered to be the older one.
* Get the order of this KeyValueScanner. This is only relevant for StoreFileScanners and
* MemStoreScanners (other scanners simply return 0). This is required for comparing multiple
* files to find out which one has the latest data. StoreFileScanners are ordered from 0
* (oldest) to newest in increasing order. MemStoreScanner gets LONG.max since it always
* contains freshest data.
*/
// TODO: Implement SequenceId Interface instead.
long getSequenceID();
long getScannerOrder();
/**
* Close the KeyValue scanner.

View File

@ -186,11 +186,12 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
}
/**
* MemStoreScanner returns max value as sequence id because it will
* always have the latest data among all files.
* MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all
* scanners.
* @see KeyValueScanner#getScannerOrder()
*/
@Override
public synchronized long getSequenceID() {
public long getScannerOrder() {
return Long.MAX_VALUE;
}

View File

@ -33,7 +33,12 @@ import org.apache.hadoop.hbase.client.Scan;
@InterfaceAudience.Private
public class SegmentScanner implements KeyValueScanner {
private long sequenceID = Long.MAX_VALUE;
/**
* Order of this scanner relative to other scanners. See
* {@link KeyValueScanner#getScannerOrder()}.
*/
private long scannerOrder;
private static final long DEFAULT_SCANNER_ORDER = Long.MAX_VALUE;
// the observed structure
private final Segment segment;
@ -52,6 +57,13 @@ public class SegmentScanner implements KeyValueScanner {
private Cell last = null;
protected SegmentScanner(Segment segment, long readPoint) {
this(segment, readPoint, DEFAULT_SCANNER_ORDER);
}
/**
* @param scannerOrder see {@link KeyValueScanner#getScannerOrder()}.
*/
protected SegmentScanner(Segment segment, long readPoint, long scannerOrder) {
this.segment = segment;
this.readPoint = readPoint;
iter = segment.iterator();
@ -59,6 +71,7 @@ public class SegmentScanner implements KeyValueScanner {
current = getNext();
//increase the reference count so the underlying structure will not be de-allocated
this.segment.incScannerCount();
this.scannerOrder = scannerOrder;
}
/**
@ -208,14 +221,11 @@ public class SegmentScanner implements KeyValueScanner {
}
/**
* Get the sequence id associated with this KeyValueScanner. This is required
* for comparing multiple files (or memstore segments) scanners to find out
* which one has the latest data.
*
* @see KeyValueScanner#getScannerOrder()
*/
@Override
public long getSequenceID() {
return sequenceID;
public long getScannerOrder() {
return scannerOrder;
}
/**
@ -296,15 +306,6 @@ public class SegmentScanner implements KeyValueScanner {
// do nothing
}
/**
* Set the sequence id of the scanner.
* This is used to determine an order between memory segment scanners.
* @param x a unique sequence id
*/
public void setSequenceID(long x) {
sequenceID = x;
}
/**
* Returns whether the given scan should seek in this segment
* @return whether the given scan should seek in this segment
@ -321,7 +322,7 @@ public class SegmentScanner implements KeyValueScanner {
@Override
public String toString() {
String res = "Store segment scanner of type "+this.getClass().getName()+"; ";
res += "sequence id "+getSequenceID()+"; ";
res += "Scanner order " + getScannerOrder() + "; ";
res += getSegment().toString();
return res;
}

View File

@ -157,7 +157,8 @@ public class StoreFile {
Bytes.toBytes("BULKLOAD_TIMESTAMP");
/**
* Map of the metadata entries in the corresponding HFile
* Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
* after which it is not modified again.
*/
private Map<byte[], byte[]> metadataMap;
@ -237,6 +238,7 @@ public class StoreFile {
this.fileInfo = other.fileInfo;
this.cacheConf = other.cacheConf;
this.cfBloomType = other.cfBloomType;
this.metadataMap = other.metadataMap;
}
/**
@ -370,7 +372,7 @@ public class StoreFile {
if (startPos != -1) {
bulkLoadedHFile = true;
}
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
}
@VisibleForTesting

View File

@ -114,19 +114,27 @@ public class StoreFileReader {
}
/**
* Get a scanner to scan over this StoreFile. Do not use
* this overload if using this scanner for compactions.
* Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting
* {@code isCompaction} to false, {@code readPt} to 0 and {@code scannerOrder} to 0.
* Do not use this overload if using this scanner for compactions.
*
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
* @return a scanner
* @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
*/
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread) {
return getStoreFileScanner(cacheBlocks, pread, false,
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) {
// 0 is passed as readpoint because this method is only used by test
// where StoreFile is directly operated upon
0);
return getStoreFileScanner(cacheBlocks, pread, false, 0, 0);
}
/**
* Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting
* {@code scannerOrder} to 0.
*
* @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
*/
public StoreFileScanner getStoreFileScanner(
boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) {
return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0);
}
/**
@ -135,16 +143,17 @@ public class StoreFileReader {
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
* @param isCompaction is scanner being used for compaction?
* @param scannerOrder Order of this scanner relative to other scanners. See
* {@link KeyValueScanner#getScannerOrder()}.
* @return a scanner
*/
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread,
boolean isCompaction, long readPt) {
public StoreFileScanner getStoreFileScanner(
boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long scannerOrder) {
// Increment the ref count
refCount.incrementAndGet();
return new StoreFileScanner(this,
getScanner(cacheBlocks, pread, isCompaction),
!isCompaction, reader.hasMVCCInfo(), readPt);
return new StoreFileScanner(
this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(),
readPt, scannerOrder);
}
/**

View File

@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -65,17 +66,37 @@ public class StoreFileScanner implements KeyValueScanner {
private long readPt;
// Order of this scanner relative to other scanners when duplicate key-value is found.
// Higher values means scanner has newer data.
private long scannerOrder;
/**
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param hfs HFile scanner
* @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
* @param readPt MVCC value to use to filter out the updates newer than this scanner.
* @param hasMVCC Set to true if underlying store file reader has MVCC info.
*/
public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
boolean hasMVCC, long readPt) {
this (reader, hfs, useMVCC, hasMVCC, readPt, 0);
}
/**
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
* @param readPt MVCC value to use to filter out the updates newer than this scanner.
* @param hasMVCC Set to true if underlying store file reader has MVCC info.
* @param scannerOrder Order of the scanner relative to other scanners.
* See {@link KeyValueScanner#getScannerOrder()}.
*/
public StoreFileScanner(StoreFileReader reader, HFileScanner hfs, boolean useMVCC,
boolean hasMVCC, long readPt, long scannerOrder) {
this.readPt = readPt;
this.reader = reader;
this.hfs = hfs;
this.enforceMVCC = useMVCC;
this.hasMVCCInfo = hasMVCC;
this.scannerOrder = scannerOrder;
}
boolean isPrimaryReplica() {
@ -115,11 +136,13 @@ public class StoreFileScanner implements KeyValueScanner {
ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
files.size());
for (StoreFile file : files) {
StoreFileReader r = file.createReader(canUseDrop);
List<StoreFile> sorted_files = new ArrayList<>(files);
Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
for (int i = 0; i < sorted_files.size(); i++) {
StoreFileReader r = sorted_files.get(i).createReader();
r.setReplicaStoreFile(isPrimaryReplica);
StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
isCompaction, readPt);
isCompaction, readPt, i);
scanner.setScanQueryMatcher(matcher);
scanners.add(scanner);
}
@ -303,9 +326,12 @@ public class StoreFileScanner implements KeyValueScanner {
return s.next();
}
/**
* @see KeyValueScanner#getScannerOrder()
*/
@Override
public long getSequenceID() {
return reader.getSequenceID();
public long getScannerOrder() {
return scannerOrder;
}
/**

View File

@ -887,8 +887,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return false;
}
/**
* @see KeyValueScanner#getScannerOrder()
*/
@Override
public long getSequenceID() {
public long getScannerOrder() {
return 0;
}

View File

@ -116,8 +116,11 @@ public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner {
return false;
}
/**
* @see org.apache.hadoop.hbase.regionserver.KeyValueScanner#getScannerOrder()
*/
@Override
public long getSequenceID() {
public long getScannerOrder() {
return 0;
}

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
@ -38,178 +38,105 @@ import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
public class TestKeyValueHeap extends HBaseTestCase {
private static final boolean PRINT = false;
private byte[] row1 = Bytes.toBytes("row1");
private byte[] fam1 = Bytes.toBytes("fam1");
private byte[] col1 = Bytes.toBytes("col1");
private byte[] data = Bytes.toBytes("data");
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
private byte[] row2 = Bytes.toBytes("row2");
private byte[] fam2 = Bytes.toBytes("fam2");
private byte[] col2 = Bytes.toBytes("col2");
private byte[] row1;
private byte[] fam1;
private byte[] col1;
private byte[] data;
private byte[] col3 = Bytes.toBytes("col3");
private byte[] col4 = Bytes.toBytes("col4");
private byte[] col5 = Bytes.toBytes("col5");
private byte[] row2;
private byte[] fam2;
private byte[] col2;
// Variable name encoding. kv<row#><fam#><col#>
Cell kv111 = new KeyValue(row1, fam1, col1, data);
Cell kv112 = new KeyValue(row1, fam1, col2, data);
Cell kv113 = new KeyValue(row1, fam1, col3, data);
Cell kv114 = new KeyValue(row1, fam1, col4, data);
Cell kv115 = new KeyValue(row1, fam1, col5, data);
Cell kv121 = new KeyValue(row1, fam2, col1, data);
Cell kv122 = new KeyValue(row1, fam2, col2, data);
Cell kv211 = new KeyValue(row2, fam1, col1, data);
Cell kv212 = new KeyValue(row2, fam1, col2, data);
Cell kv213 = new KeyValue(row2, fam1, col3, data);
private byte[] col3;
private byte[] col4;
private byte[] col5;
TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(Arrays.asList(s1, s2, s3));
/*
* Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned
* Cells are same as {@code expected}.
* @return List of Cells returned from scanners.
*/
public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners)
throws IOException {
//Creating KeyValueHeap
KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR);
List<Cell> actual = new ArrayList<>();
while(kvh.peek() != null){
actual.add(kvh.next());
}
assertEquals(expected, actual);
return actual;
}
@Before
public void setUp() throws Exception {
super.setUp();
data = Bytes.toBytes("data");
row1 = Bytes.toBytes("row1");
fam1 = Bytes.toBytes("fam1");
col1 = Bytes.toBytes("col1");
row2 = Bytes.toBytes("row2");
fam2 = Bytes.toBytes("fam2");
col2 = Bytes.toBytes("col2");
col3 = Bytes.toBytes("col3");
col4 = Bytes.toBytes("col4");
col5 = Bytes.toBytes("col5");
}
@Test
public void testSorted() throws IOException{
//Cases that need to be checked are:
//1. The "smallest" KeyValue is in the same scanners as current
//1. The "smallest" Cell is in the same scanners as current
//2. Current scanner gets empty
List<Cell> l1 = new ArrayList<Cell>();
l1.add(new KeyValue(row1, fam1, col5, data));
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
scanners.add(new Scanner(l1));
List<Cell> expected = Arrays.asList(
kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
scanners.add(new Scanner(l2));
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
scanners.add(new Scanner(l3));
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(new KeyValue(row1, fam1, col1, data));
expected.add(new KeyValue(row1, fam1, col2, data));
expected.add(new KeyValue(row1, fam1, col3, data));
expected.add(new KeyValue(row1, fam1, col4, data));
expected.add(new KeyValue(row1, fam1, col5, data));
expected.add(new KeyValue(row1, fam2, col1, data));
expected.add(new KeyValue(row1, fam2, col2, data));
expected.add(new KeyValue(row2, fam1, col1, data));
expected.add(new KeyValue(row2, fam1, col2, data));
expected.add(new KeyValue(row2, fam1, col3, data));
//Creating KeyValueHeap
KeyValueHeap kvh =
new KeyValueHeap(scanners, CellComparator.COMPARATOR);
List<Cell> actual = new ArrayList<Cell>();
while(kvh.peek() != null){
actual.add(kvh.next());
}
assertEquals(expected.size(), actual.size());
for(int i=0; i<expected.size(); i++){
assertEquals(expected.get(i), actual.get(i));
if(PRINT){
System.out.println("expected " +expected.get(i)+
"\nactual " +actual.get(i) +"\n");
}
}
List<Cell> actual = assertCells(expected, scanners);
//Check if result is sorted according to Comparator
for(int i=0; i<actual.size()-1; i++){
int ret = CellComparator.COMPARATOR.compare(actual.get(i), actual.get(i+1));
assertTrue(ret < 0);
}
}
@Test
public void testSeek() throws IOException {
//Cases:
//1. Seek KeyValue that is not in scanner
//1. Seek Cell that is not in scanner
//2. Check that smallest that is returned from a seek is correct
List<Cell> l1 = new ArrayList<Cell>();
l1.add(new KeyValue(row1, fam1, col5, data));
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
scanners.add(new Scanner(l1));
List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
scanners.add(new Scanner(l2));
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
scanners.add(new Scanner(l3));
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(new KeyValue(row2, fam1, col1, data));
List<Cell> expected = Arrays.asList(kv211);
//Creating KeyValueHeap
KeyValueHeap kvh =
new KeyValueHeap(scanners, CellComparator.COMPARATOR);
KeyValue seekKv = new KeyValue(row2, fam1, null, null);
Cell seekKv = new KeyValue(row2, fam1, null, null);
kvh.seek(seekKv);
List<Cell> actual = new ArrayList<Cell>();
actual.add(kvh.peek());
assertEquals(expected.size(), actual.size());
for(int i=0; i<expected.size(); i++){
assertEquals(expected.get(i), actual.get(i));
if(PRINT){
System.out.println("expected " +expected.get(i)+
"\nactual " +actual.get(i) +"\n");
}
}
List<Cell> actual = Arrays.asList(kvh.peek());
assertEquals("Expected = " + Arrays.toString(expected.toArray())
+ "\n Actual = " + Arrays.toString(actual.toArray()), expected, actual);
}
@Test
public void testScannerLeak() throws IOException {
// Test for unclosed scanners (HBASE-1927)
List<Cell> l1 = new ArrayList<Cell>();
l1.add(new KeyValue(row1, fam1, col5, data));
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
Scanner s1 = new Scanner(l1);
scanners.add(s1);
List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
Scanner s2 = new Scanner(l2);
scanners.add(s2);
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
Scanner s3 = new Scanner(l3);
scanners.add(s3);
List<Cell> l4 = new ArrayList<Cell>();
Scanner s4 = new Scanner(l4);
TestScanner s4 = new TestScanner(new ArrayList<Cell>());
scanners.add(s4);
//Creating KeyValueHeap
@ -225,7 +152,7 @@ public class TestKeyValueHeap extends HBaseTestCase {
assertTrue(kvh.scannersForDelayedClose.contains(s4));
kvh.close();
for(KeyValueScanner scanner : scanners) {
assertTrue(((Scanner)scanner).isClosed());
assertTrue(((TestScanner)scanner).isClosed());
}
}
@ -233,38 +160,19 @@ public class TestKeyValueHeap extends HBaseTestCase {
public void testScannerException() throws IOException {
// Test for NPE issue when exception happens in scanners (HBASE-13835)
List<Cell> l1 = new ArrayList<Cell>();
l1.add(new KeyValue(row1, fam1, col5, data));
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
SeekScanner s1 = new SeekScanner(l1);
scanners.add(s1);
TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
TestScanner s4 = new SeekTestScanner(new ArrayList<Cell>());
List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
SeekScanner s2 = new SeekScanner(l2);
scanners.add(s2);
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
SeekScanner s3 = new SeekScanner(l3);
scanners.add(s3);
List<Cell> l4 = new ArrayList<Cell>();
SeekScanner s4 = new SeekScanner(l4);
scanners.add(s4);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(Arrays.asList(s1, s2, s3, s4));
// Creating KeyValueHeap
KeyValueHeap kvh = new KeyValueHeap(scanners, CellComparator.COMPARATOR);
try {
for (KeyValueScanner scanner : scanners) {
((SeekScanner) scanner).setRealSeekDone(false);
((SeekTestScanner) scanner).setRealSeekDone(false);
}
while (kvh.next() != null);
// The pollRealKV should throw IOE.
@ -276,20 +184,47 @@ public class TestKeyValueHeap extends HBaseTestCase {
// It implies there is no NPE thrown from kvh.close() if getting here
for (KeyValueScanner scanner : scanners) {
// Verify that close is called and only called once for each scanner
assertTrue(((SeekScanner) scanner).isClosed());
assertEquals(((SeekScanner) scanner).getClosedNum(), 1);
assertTrue(((SeekTestScanner) scanner).isClosed());
assertEquals(((SeekTestScanner) scanner).getClosedNum(), 1);
}
}
private static class Scanner extends CollectionBackedScanner {
private Iterator<Cell> iter;
private Cell current;
@Test
public void testPriorityId() throws IOException {
Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
{
TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1);
TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
assertCells(expected, new ArrayList<KeyValueScanner>(Arrays.asList(scan1, scan2)));
}
{
TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2);
TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 1);
List<Cell> expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
assertCells(expected, new ArrayList<KeyValueScanner>(Arrays.asList(scan1, scan2)));
}
}
private static class TestScanner extends CollectionBackedScanner {
private boolean closed = false;
private long scannerOrder = 0;
public Scanner(List<Cell> list) {
public TestScanner(List<Cell> list) {
super(list);
}
public TestScanner(List<Cell> list, long scannerOrder) {
this(list);
this.scannerOrder = scannerOrder;
}
@Override
public long getScannerOrder() {
return scannerOrder;
}
@Override
public void close(){
closed = true;
@ -300,11 +235,11 @@ public class TestKeyValueHeap extends HBaseTestCase {
}
}
private static class SeekScanner extends Scanner {
private static class SeekTestScanner extends TestScanner {
private int closedNum = 0;
private boolean realSeekDone = true;
public SeekScanner(List<Cell> list) {
public SeekTestScanner(List<Cell> list) {
super(list);
}

View File

@ -211,7 +211,7 @@ public class TestStoreFile extends HBaseTestCase {
when(hcd.getName()).thenReturn(cf);
when(store.getFamily()).thenReturn(hcd);
StoreFileScanner scanner =
new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0);
new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0);
Scan scan = new Scan();
scan.setColumnFamilyTimeRange(cf, 0, 1);
assertFalse(scanner.shouldUseScanner(scan, store, 0));

View File

@ -64,7 +64,7 @@ public class TestCompactor {
when(r.length()).thenReturn(1L);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
when(sf.createReader()).thenReturn(r);

View File

@ -750,7 +750,8 @@ public class TestStripeCompactionPolicy {
when(r.length()).thenReturn(size);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(
mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
when(sf.createReader(anyBoolean())).thenReturn(r);