HBASE-15236 Inconsistent cell reads over multiple bulk-loaded HFiles. In KeyValueHeap, if two cells are same i.e. have same key and timestamp, then instead of directly using seq id to determine newer one, we should use StoreFile.Comparater.SEQ_ID because that's what is used to determine order of hfiles. In this patch, we assign each scanner an order based on it's index in storefiles list, which is then used in KeyValueHeap to disambiguate between same cells. Changes the getSequenceId() in KeyValueScanner class to getScannerOrder(). Testing: Adds unit test to TestKeyValueHeap. Manual testing: Three cases (Tables t, t2, t3 in the jira description), single region, 2 hfiles with same seq id, timestamps and duplicate KVs. Made sure that returned kv was same for get and scan. (Apekshit)

Change-Id: I22600c91c0a51fb63eb17db73472839d2f13957c

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Apekshit 2016-02-23 00:31:18 -08:00 committed by stack
parent cadc4cf15b
commit a6f8214e9b
11 changed files with 198 additions and 218 deletions

View File

@ -939,11 +939,12 @@ public class DefaultMemStore implements MemStore {
} }
/** /**
* MemStoreScanner returns max value as sequence id because it will * MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all
* always have the latest data among all files. * scanners.
* @see KeyValueScanner#getScannerOrder()
*/ */
@Override @Override
public long getSequenceID() { public long getScannerOrder() {
return Long.MAX_VALUE; return Long.MAX_VALUE;
} }

View File

@ -182,17 +182,10 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
if (comparison != 0) { if (comparison != 0) {
return comparison; return comparison;
} else { } else {
// Since both the keys are exactly the same, we break the tie in favor // Since both the keys are exactly the same, we break the tie in favor of higher ordered
// of the key which came latest. // scanner since it'll have newer data. Since higher value should come first, we reverse
long leftSequenceID = left.getSequenceID(); // sort here.
long rightSequenceID = right.getSequenceID(); return Long.compare(right.getScannerOrder(), left.getScannerOrder());
if (leftSequenceID > rightSequenceID) {
return -1;
} else if (leftSequenceID < rightSequenceID) {
return 1;
} else {
return 0;
}
} }
} }
/** /**
@ -392,8 +385,11 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
return this.heap; return this.heap;
} }
/**
* @see KeyValueScanner#getScannerOrder()
*/
@Override @Override
public long getSequenceID() { public long getScannerOrder() {
return 0; return 0;
} }

View File

@ -70,13 +70,13 @@ public interface KeyValueScanner {
boolean reseek(Cell key) throws IOException; boolean reseek(Cell key) throws IOException;
/** /**
* Get the sequence id associated with this KeyValueScanner. This is required * Get the order of this KeyValueScanner. This is only relevant for StoreFileScanners and
* for comparing multiple files to find out which one has the latest data. * MemStoreScanners (other scanners simply return 0). This is required for comparing multiple
* The default implementation for this would be to return 0. A file having * files to find out which one has the latest data. StoreFileScanners are ordered from 0
* lower sequence id will be considered to be the older one. * (oldest) to newest in increasing order. MemStoreScanner gets LONG.max since it always
* contains freshest data.
*/ */
// TODO: Implement SequenceId Interface instead. long getScannerOrder();
long getSequenceID();
/** /**
* Close the KeyValue scanner. * Close the KeyValue scanner.

View File

@ -174,7 +174,8 @@ public class StoreFile {
Bytes.toBytes("BULKLOAD_TIMESTAMP"); Bytes.toBytes("BULKLOAD_TIMESTAMP");
/** /**
* Map of the metadata entries in the corresponding HFile * Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
* after which it is not modified again.
*/ */
private Map<byte[], byte[]> metadataMap; private Map<byte[], byte[]> metadataMap;
@ -247,6 +248,7 @@ public class StoreFile {
this.fileInfo = other.fileInfo; this.fileInfo = other.fileInfo;
this.cacheConf = other.cacheConf; this.cacheConf = other.cacheConf;
this.cfBloomType = other.cfBloomType; this.cfBloomType = other.cfBloomType;
this.metadataMap = other.metadataMap;
} }
/** /**
@ -380,7 +382,7 @@ public class StoreFile {
if (startPos != -1) { if (startPos != -1) {
bulkLoadedHFile = true; bulkLoadedHFile = true;
} }
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
} }
@VisibleForTesting @VisibleForTesting
@ -1155,37 +1157,43 @@ public class StoreFile {
} }
/** /**
* Get a scanner to scan over this StoreFile. Do not use * Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting
* this overload if using this scanner for compactions. * {@code isCompaction} to false, {@code readPt} to 0 and {@code scannerOrder} to 0.
* * Do not use this overload if using this scanner for compactions.
* @param cacheBlocks should this scanner cache blocks? * @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
* @param pread use pread (for highly concurrent small readers)
* @return a scanner
*/ */
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) {
boolean pread) {
return getStoreFileScanner(cacheBlocks, pread, false,
// 0 is passed as readpoint because this method is only used by test // 0 is passed as readpoint because this method is only used by test
// where StoreFile is directly operated upon // where StoreFile is directly operated upon
0); return getStoreFileScanner(cacheBlocks, pread, false, 0, 0);
}
/**
* Uses {@link #getStoreFileScanner(boolean, boolean, boolean, long, long)} by setting
* {@code scannerOrder} to 0.
* @see #getStoreFileScanner(boolean, boolean, boolean, long, long)
*/
public StoreFileScanner getStoreFileScanner(
boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt) {
return getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, 0);
} }
/** /**
* Get a scanner to scan over this StoreFile. * Get a scanner to scan over this StoreFile.
*
* @param cacheBlocks should this scanner cache blocks? * @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers) * @param pread use pread (for highly concurrent small readers)
* @param isCompaction is scanner being used for compaction? * @param isCompaction is scanner being used for compaction?
* @param scannerOrder Order of this scanner relative to other scanners. See
* {@link KeyValueScanner#getScannerOrder()}.
* @return a scanner * @return a scanner
*/ */
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, public StoreFileScanner getStoreFileScanner(
boolean pread, boolean cacheBlocks, boolean pread, boolean isCompaction, long readPt, long scannerOrder) {
boolean isCompaction, long readPt) {
// Increment the ref count // Increment the ref count
refCount.incrementAndGet(); refCount.incrementAndGet();
return new StoreFileScanner(this, return new StoreFileScanner(
getScanner(cacheBlocks, pread, isCompaction), this, getScanner(cacheBlocks, pread, isCompaction), !isCompaction, reader.hasMVCCInfo(),
!isCompaction, reader.hasMVCCInfo(), readPt); readPt, scannerOrder);
} }
/** /**
@ -1250,7 +1258,7 @@ public class StoreFile {
/** /**
* Check if this storeFile may contain keys within the TimeRange that * Check if this storeFile may contain keys within the TimeRange that
* have not expired (i.e. not older than oldestUnexpiredTS). * have not expired (i.e. not older than oldestUnexpiredTS).
* @param timeRange the timeRange to restrict * @param tr the timeRange to restrict
* @param oldestUnexpiredTS the oldest timestamp that is not expired, as * @param oldestUnexpiredTS the oldest timestamp that is not expired, as
* determined by the column family's TTL * determined by the column family's TTL
* @return false if queried keys definitely don't exist in this StoreFile * @return false if queried keys definitely don't exist in this StoreFile

View File

@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -69,17 +70,37 @@ public class StoreFileScanner implements KeyValueScanner {
private long readPt; private long readPt;
// Order of this scanner relative to other scanners when duplicate key-value is found.
// Higher values means scanner has newer data.
private long scannerOrder;
/** /**
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner} * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param hfs HFile scanner * @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
* @param readPt MVCC value to use to filter out the updates newer than this scanner.
* @param hasMVCC Set to true if underlying store file reader has MVCC info.
*/ */
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC, public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
boolean hasMVCC, long readPt) { boolean hasMVCC, long readPt) {
this (reader, hfs, useMVCC, hasMVCC, readPt, 0);
}
/**
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param useMVCC If true, scanner will filter out updates with MVCC larger than {@code readPt}.
* @param readPt MVCC value to use to filter out the updates newer than this scanner.
* @param hasMVCC Set to true if underlying store file reader has MVCC info.
* @param scannerOrder Order of the scanner relative to other scanners.
* See {@link KeyValueScanner#getScannerOrder()}.
*/
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC,
boolean hasMVCC, long readPt, long scannerOrder) {
this.readPt = readPt; this.readPt = readPt;
this.reader = reader; this.reader = reader;
this.hfs = hfs; this.hfs = hfs;
this.enforceMVCC = useMVCC; this.enforceMVCC = useMVCC;
this.hasMVCCInfo = hasMVCC; this.hasMVCCInfo = hasMVCC;
this.scannerOrder = scannerOrder;
} }
boolean isPrimaryReplica() { boolean isPrimaryReplica() {
@ -119,11 +140,13 @@ public class StoreFileScanner implements KeyValueScanner {
ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException { ScanQueryMatcher matcher, long readPt, boolean isPrimaryReplica) throws IOException {
List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>( List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
files.size()); files.size());
for (StoreFile file : files) { List<StoreFile> sorted_files = new ArrayList<>(files);
StoreFile.Reader r = file.createReader(canUseDrop); Collections.sort(sorted_files, StoreFile.Comparators.SEQ_ID);
for (int i = 0; i < sorted_files.size(); i++) {
StoreFile.Reader r = sorted_files.get(i).createReader(canUseDrop);
r.setReplicaStoreFile(isPrimaryReplica); r.setReplicaStoreFile(isPrimaryReplica);
StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
isCompaction, readPt); isCompaction, readPt, i);
scanner.setScanQueryMatcher(matcher); scanner.setScanQueryMatcher(matcher);
scanners.add(scanner); scanners.add(scanner);
} }
@ -308,9 +331,12 @@ public class StoreFileScanner implements KeyValueScanner {
return s.next(); return s.next();
} }
/**
* @see KeyValueScanner#getScannerOrder()
*/
@Override @Override
public long getSequenceID() { public long getScannerOrder() {
return reader.getSequenceID(); return scannerOrder;
} }
/** /**

View File

@ -906,8 +906,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return false; return false;
} }
/**
* @see KeyValueScanner#getScannerOrder()
*/
@Override @Override
public long getSequenceID() { public long getScannerOrder() {
return 0; return 0;
} }

View File

@ -116,8 +116,11 @@ public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner {
return false; return false;
} }
/**
* @see org.apache.hadoop.hbase.regionserver.KeyValueScanner#getScannerOrder()
*/
@Override @Override
public long getSequenceID() { public long getScannerOrder() {
return 0; return 0;
} }

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -30,175 +30,107 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category(SmallTests.class) @Category(SmallTests.class)
public class TestKeyValueHeap extends HBaseTestCase { public class TestKeyValueHeap extends HBaseTestCase {
private static final boolean PRINT = false; private byte[] row1 = Bytes.toBytes("row1");
private byte[] fam1 = Bytes.toBytes("fam1");
private byte[] col1 = Bytes.toBytes("col1");
private byte[] data = Bytes.toBytes("data");
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); private byte[] row2 = Bytes.toBytes("row2");
private byte[] fam2 = Bytes.toBytes("fam2");
private byte[] col2 = Bytes.toBytes("col2");
private byte[] row1; private byte[] col3 = Bytes.toBytes("col3");
private byte[] fam1; private byte[] col4 = Bytes.toBytes("col4");
private byte[] col1; private byte[] col5 = Bytes.toBytes("col5");
private byte[] data;
private byte[] row2; // Variable name encoding. kv<row#><fam#><col#>
private byte[] fam2; Cell kv111 = new KeyValue(row1, fam1, col1, data);
private byte[] col2; Cell kv112 = new KeyValue(row1, fam1, col2, data);
Cell kv113 = new KeyValue(row1, fam1, col3, data);
Cell kv114 = new KeyValue(row1, fam1, col4, data);
Cell kv115 = new KeyValue(row1, fam1, col5, data);
Cell kv121 = new KeyValue(row1, fam2, col1, data);
Cell kv122 = new KeyValue(row1, fam2, col2, data);
Cell kv211 = new KeyValue(row2, fam1, col1, data);
Cell kv212 = new KeyValue(row2, fam1, col2, data);
Cell kv213 = new KeyValue(row2, fam1, col3, data);
private byte[] col3; TestScanner s1 = new TestScanner(Arrays.asList(kv115, kv211, kv212));
private byte[] col4; TestScanner s2 = new TestScanner(Arrays.asList(kv111, kv112));
private byte[] col5; TestScanner s3 = new TestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
public void setUp() throws Exception { List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(Arrays.asList(s1, s2, s3));
super.setUp();
data = Bytes.toBytes("data");
row1 = Bytes.toBytes("row1");
fam1 = Bytes.toBytes("fam1");
col1 = Bytes.toBytes("col1");
row2 = Bytes.toBytes("row2");
fam2 = Bytes.toBytes("fam2");
col2 = Bytes.toBytes("col2");
col3 = Bytes.toBytes("col3");
col4 = Bytes.toBytes("col4");
col5 = Bytes.toBytes("col5");
}
public void testSorted() throws IOException{
//Cases that need to be checked are:
//1. The "smallest" KeyValue is in the same scanners as current
//2. Current scanner gets empty
List<Cell> l1 = new ArrayList<Cell>();
l1.add(new KeyValue(row1, fam1, col5, data));
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
scanners.add(new Scanner(l1));
List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
scanners.add(new Scanner(l2));
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
scanners.add(new Scanner(l3));
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(new KeyValue(row1, fam1, col1, data));
expected.add(new KeyValue(row1, fam1, col2, data));
expected.add(new KeyValue(row1, fam1, col3, data));
expected.add(new KeyValue(row1, fam1, col4, data));
expected.add(new KeyValue(row1, fam1, col5, data));
expected.add(new KeyValue(row1, fam2, col1, data));
expected.add(new KeyValue(row1, fam2, col2, data));
expected.add(new KeyValue(row2, fam1, col1, data));
expected.add(new KeyValue(row2, fam1, col2, data));
expected.add(new KeyValue(row2, fam1, col3, data));
/*
* Uses {@code scanners} to build a KeyValueHeap, iterates over it and asserts that returned
* Cells are same as {@code expected}.
* @return List of Cells returned from scanners.
*/
public List<Cell> assertCells(List<Cell> expected, List<KeyValueScanner> scanners)
throws IOException {
//Creating KeyValueHeap //Creating KeyValueHeap
KeyValueHeap kvh = KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
new KeyValueHeap(scanners, KeyValue.COMPARATOR);
List<Cell> actual = new ArrayList<Cell>(); List<Cell> actual = new ArrayList<>();
while(kvh.peek() != null){ while(kvh.peek() != null){
actual.add(kvh.next()); actual.add(kvh.next());
} }
assertEquals(expected.size(), actual.size()); assertEquals(expected, actual);
for(int i=0; i<expected.size(); i++){ return actual;
assertEquals(expected.get(i), actual.get(i));
if(PRINT){
System.out.println("expected " +expected.get(i)+
"\nactual " +actual.get(i) +"\n");
} }
public void setUp() throws Exception {
super.setUp();
} }
public void testSorted() throws IOException{
//Cases that need to be checked are:
//1. The "smallest" Cell is in the same scanners as current
//2. Current scanner gets empty
List<Cell> expected = Arrays.asList(
kv111, kv112, kv113, kv114, kv115, kv121, kv122, kv211, kv212, kv213);
List<Cell> actual = assertCells(expected, scanners);
//Check if result is sorted according to Comparator //Check if result is sorted according to Comparator
for(int i=0; i<actual.size()-1; i++){ for(int i=0; i<actual.size()-1; i++){
int ret = KeyValue.COMPARATOR.compare(actual.get(i), actual.get(i+1)); int ret = KeyValue.COMPARATOR.compare(actual.get(i), actual.get(i+1));
assertTrue(ret < 0); assertTrue(ret < 0);
} }
} }
public void testSeek() throws IOException { public void testSeek() throws IOException {
//Cases: //Cases:
//1. Seek KeyValue that is not in scanner //1. Seek Cell that is not in scanner
//2. Check that smallest that is returned from a seek is correct //2. Check that smallest that is returned from a seek is correct
List<Cell> l1 = new ArrayList<Cell>(); List<Cell> expected = Arrays.asList(kv211);
l1.add(new KeyValue(row1, fam1, col5, data));
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
scanners.add(new Scanner(l1));
List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
scanners.add(new Scanner(l2));
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
scanners.add(new Scanner(l3));
List<KeyValue> expected = new ArrayList<KeyValue>();
expected.add(new KeyValue(row2, fam1, col1, data));
//Creating KeyValueHeap //Creating KeyValueHeap
KeyValueHeap kvh = KeyValueHeap kvh =
new KeyValueHeap(scanners, KeyValue.COMPARATOR); new KeyValueHeap(scanners, KeyValue.COMPARATOR);
KeyValue seekKv = new KeyValue(row2, fam1, null, null); Cell seekKv = new KeyValue(row2, fam1, null, null);
kvh.seek(seekKv); kvh.seek(seekKv);
List<Cell> actual = new ArrayList<Cell>(); List<Cell> actual = Arrays.asList(kvh.peek());
actual.add(kvh.peek());
assertEquals(expected.size(), actual.size());
for(int i=0; i<expected.size(); i++){
assertEquals(expected.get(i), actual.get(i));
if(PRINT){
System.out.println("expected " +expected.get(i)+
"\nactual " +actual.get(i) +"\n");
}
}
assertEquals("Expected = " + Arrays.toString(expected.toArray())
+ "\n Actual = " + Arrays.toString(actual.toArray()), expected, actual);
} }
public void testScannerLeak() throws IOException { public void testScannerLeak() throws IOException {
// Test for unclosed scanners (HBASE-1927) // Test for unclosed scanners (HBASE-1927)
List<Cell> l1 = new ArrayList<Cell>(); TestScanner s4 = new TestScanner(new ArrayList<Cell>());
l1.add(new KeyValue(row1, fam1, col5, data)); scanners.add(s4);
l1.add(new KeyValue(row2, fam1, col1, data));
l1.add(new KeyValue(row2, fam1, col2, data));
scanners.add(new Scanner(l1));
List<Cell> l2 = new ArrayList<Cell>();
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
scanners.add(new Scanner(l2));
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
scanners.add(new Scanner(l3));
List<Cell> l4 = new ArrayList<Cell>();
scanners.add(new Scanner(l4));
//Creating KeyValueHeap //Creating KeyValueHeap
KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
@ -206,45 +138,26 @@ public class TestKeyValueHeap extends HBaseTestCase {
while(kvh.next() != null); while(kvh.next() != null);
for(KeyValueScanner scanner : scanners) { for(KeyValueScanner scanner : scanners) {
assertTrue(((Scanner)scanner).isClosed()); assertTrue(((TestScanner)scanner).isClosed());
} }
} }
public void testScannerException() throws IOException { public void testScannerException() throws IOException {
// Test for NPE issue when exception happens in scanners (HBASE-13835) // Test for NPE issue when exception happens in scanners (HBASE-13835)
List<Cell> l1 = new ArrayList<Cell>(); TestScanner s1 = new SeekTestScanner(Arrays.asList(kv115, kv211, kv212));
l1.add(new KeyValue(row1, fam1, col5, data)); TestScanner s2 = new SeekTestScanner(Arrays.asList(kv111, kv112));
l1.add(new KeyValue(row2, fam1, col1, data)); TestScanner s3 = new SeekTestScanner(Arrays.asList(kv113, kv114, kv121, kv122, kv213));
l1.add(new KeyValue(row2, fam1, col2, data)); TestScanner s4 = new SeekTestScanner(new ArrayList<Cell>());
SeekScanner s1 = new SeekScanner(l1);
scanners.add(s1);
List<Cell> l2 = new ArrayList<Cell>(); List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(Arrays.asList(s1, s2, s3, s4));
l2.add(new KeyValue(row1, fam1, col1, data));
l2.add(new KeyValue(row1, fam1, col2, data));
SeekScanner s2 = new SeekScanner(l2);
scanners.add(s2);
List<Cell> l3 = new ArrayList<Cell>();
l3.add(new KeyValue(row1, fam1, col3, data));
l3.add(new KeyValue(row1, fam1, col4, data));
l3.add(new KeyValue(row1, fam2, col1, data));
l3.add(new KeyValue(row1, fam2, col2, data));
l3.add(new KeyValue(row2, fam1, col3, data));
SeekScanner s3 = new SeekScanner(l3);
scanners.add(s3);
List<Cell> l4 = new ArrayList<Cell>();
SeekScanner s4 = new SeekScanner(l4);
scanners.add(s4);
// Creating KeyValueHeap // Creating KeyValueHeap
KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR); KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
try { try {
for (KeyValueScanner scanner : scanners) { for (KeyValueScanner scanner : scanners) {
((SeekScanner) scanner).setRealSeekDone(false); ((SeekTestScanner) scanner).setRealSeekDone(false);
} }
while (kvh.next() != null); while (kvh.next() != null);
// The pollRealKV should throw IOE. // The pollRealKV should throw IOE.
@ -256,20 +169,47 @@ public class TestKeyValueHeap extends HBaseTestCase {
// It implies there is no NPE thrown from kvh.close() if getting here // It implies there is no NPE thrown from kvh.close() if getting here
for (KeyValueScanner scanner : scanners) { for (KeyValueScanner scanner : scanners) {
// Verify that close is called and only called once for each scanner // Verify that close is called and only called once for each scanner
assertTrue(((SeekScanner) scanner).isClosed()); assertTrue(((SeekTestScanner) scanner).isClosed());
assertEquals(((SeekScanner) scanner).getClosedNum(), 1); assertEquals(((SeekTestScanner) scanner).getClosedNum(), 1);
} }
} }
private static class Scanner extends CollectionBackedScanner { @Test
private Iterator<Cell> iter; public void testPriorityId() throws IOException {
private Cell current; Cell kv113A = new KeyValue(row1, fam1, col3, Bytes.toBytes("aaa"));
Cell kv113B = new KeyValue(row1, fam1, col3, Bytes.toBytes("bbb"));
{
TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 1);
TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 2);
List<Cell> expected = Arrays.asList(kv111, kv112, kv113B, kv113A);
assertCells(expected, new ArrayList<KeyValueScanner>(Arrays.asList(scan1, scan2)));
}
{
TestScanner scan1 = new TestScanner(Arrays.asList(kv111, kv112, kv113A), 2);
TestScanner scan2 = new TestScanner(Arrays.asList(kv113B), 1);
List<Cell> expected = Arrays.asList(kv111, kv112, kv113A, kv113B);
assertCells(expected, new ArrayList<KeyValueScanner>(Arrays.asList(scan1, scan2)));
}
}
private static class TestScanner extends CollectionBackedScanner {
private boolean closed = false; private boolean closed = false;
private long scannerOrder = 0;
public Scanner(List<Cell> list) { public TestScanner(List<Cell> list) {
super(list); super(list);
} }
public TestScanner(List<Cell> list, long scannerOrder) {
this(list);
this.scannerOrder = scannerOrder;
}
@Override
public long getScannerOrder() {
return scannerOrder;
}
@Override @Override
public void close(){ public void close(){
closed = true; closed = true;
@ -280,11 +220,11 @@ public class TestKeyValueHeap extends HBaseTestCase {
} }
} }
private static class SeekScanner extends Scanner { private static class SeekTestScanner extends TestScanner {
private int closedNum = 0; private int closedNum = 0;
private boolean realSeekDone = true; private boolean realSeekDone = true;
public SeekScanner(List<Cell> list) { public SeekTestScanner(List<Cell> list) {
super(list); super(list);
} }

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -233,6 +234,7 @@ public class TestStoreFile extends HBaseTestCase {
assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count); assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
} }
@Test
public void testEmptyStoreFileRestrictKeyRanges() throws Exception { public void testEmptyStoreFileRestrictKeyRanges() throws Exception {
StoreFile.Reader reader = mock(StoreFile.Reader.class); StoreFile.Reader reader = mock(StoreFile.Reader.class);
Store store = mock(Store.class); Store store = mock(Store.class);
@ -241,7 +243,7 @@ public class TestStoreFile extends HBaseTestCase {
when(hcd.getName()).thenReturn(cf); when(hcd.getName()).thenReturn(cf);
when(store.getFamily()).thenReturn(hcd); when(store.getFamily()).thenReturn(hcd);
StoreFileScanner scanner = StoreFileScanner scanner =
new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0); new StoreFileScanner(reader, mock(HFileScanner.class), false, false, 0, 0);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setColumnFamilyTimeRange(cf, 0, 1); scan.setColumnFamilyTimeRange(cf, 0, 1);
assertFalse(scanner.shouldUseScanner(scan, store, 0)); assertFalse(scanner.shouldUseScanner(scan, store, 0));

View File

@ -62,7 +62,7 @@ public class TestCompactor {
when(r.length()).thenReturn(1L); when(r.length()).thenReturn(1L);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())) when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(mock(StoreFileScanner.class)); .thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r); when(sf.getReader()).thenReturn(r);
when(sf.createReader()).thenReturn(r); when(sf.createReader()).thenReturn(r);

View File

@ -738,7 +738,8 @@ public class TestStripeCompactionPolicy {
when(r.length()).thenReturn(size); when(r.length()).thenReturn(size);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE); when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class)); when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn( when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(
mock(StoreFileScanner.class)); mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r); when(sf.getReader()).thenReturn(r);
when(sf.createReader(anyBoolean())).thenReturn(r); when(sf.createReader(anyBoolean())).thenReturn(r);