HBASE-2519 StoreFileScanner.seek swallows IOEs
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@947511 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5f679cb525
commit
c7ed31b969
|
@ -350,6 +350,7 @@ Release 0.21.0 - Unreleased
|
|||
deploy at / instead of at /webapps/master/master.jsp
|
||||
HBASE-2590 Failed parse of branch element in saveVersion.sh
|
||||
HBASE-2591 HBASE-2587 hardcoded the port that dfscluster runs on
|
||||
HBASE-2519 StoreFileScanner.seek swallows IOEs (Todd Lipcon via Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
|
|
@ -80,15 +80,15 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
if (e instanceof RemoteException) {
|
||||
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
|
||||
}
|
||||
if (ioe != null) {
|
||||
if (ioe instanceof NotServingRegionException) {
|
||||
if (ioe == null) throw new IOException(e);
|
||||
if (ioe instanceof NotServingRegionException) {
|
||||
// Throw a DNRE so that we break out of cycle of calling NSRE
|
||||
// when what we need is to open scanner against new location.
|
||||
// Attach NSRE to signal client that it needs to resetup scanner.
|
||||
throw new DoNotRetryIOException("Reset scanner", ioe);
|
||||
} else if (ioe instanceof DoNotRetryIOException) {
|
||||
throw ioe;
|
||||
}
|
||||
} else {
|
||||
// The outer layers will retry
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
return rrs;
|
||||
|
|
|
@ -1368,6 +1368,11 @@ public class HFile {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HFileScanner for reader " + String.valueOf(reader);
|
||||
}
|
||||
}
|
||||
|
||||
public String getTrailerInfo() {
|
||||
|
|
|
@ -1952,7 +1952,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
|
|||
this(scan, null);
|
||||
}
|
||||
|
||||
void initHeap() {
|
||||
void initHeap() throws IOException {
|
||||
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
|
||||
if (extraScanners != null) {
|
||||
scanners.addAll(extraScanners);
|
||||
|
|
|
@ -72,7 +72,7 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
return this.current.peek();
|
||||
}
|
||||
|
||||
public KeyValue next() {
|
||||
public KeyValue next() throws IOException {
|
||||
if(this.current == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -178,8 +178,9 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
* automatically closed and removed from the heap.
|
||||
* @param seekKey KeyValue to seek at or after
|
||||
* @return true if KeyValues exist at or after specified key, false if not
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean seek(KeyValue seekKey) {
|
||||
public boolean seek(KeyValue seekKey) throws IOException {
|
||||
if(this.current == null) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
/**
|
||||
|
@ -35,17 +37,17 @@ public interface KeyValueScanner {
|
|||
* Return the next KeyValue in this scanner, iterating the scanner
|
||||
* @return the next KeyValue
|
||||
*/
|
||||
public KeyValue next();
|
||||
public KeyValue next() throws IOException;
|
||||
|
||||
/**
|
||||
* Seek the scanner at or after the specified KeyValue.
|
||||
* @param key seek value
|
||||
* @return true if scanner has values left, false if end of scanner
|
||||
*/
|
||||
public boolean seek(KeyValue key);
|
||||
public boolean seek(KeyValue key) throws IOException;
|
||||
|
||||
/**
|
||||
* Close the KeyValue scanner.
|
||||
*/
|
||||
public void close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,8 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
|
|||
private KeyValueHeap heap;
|
||||
private KeyValue.KVComparator comparator;
|
||||
|
||||
MinorCompactingStoreScanner(Store store, List<? extends KeyValueScanner> scanners) {
|
||||
MinorCompactingStoreScanner(Store store, List<? extends KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
comparator = store.comparator;
|
||||
KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
|
||||
for (KeyValueScanner scanner : scanners ) {
|
||||
|
@ -46,7 +47,8 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
|
|||
}
|
||||
|
||||
MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator,
|
||||
List<? extends KeyValueScanner> scanners) {
|
||||
List<? extends KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
this.comparator = comparator;
|
||||
|
||||
KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW);
|
||||
|
@ -61,7 +63,7 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
|
|||
return heap.peek();
|
||||
}
|
||||
|
||||
public KeyValue next() {
|
||||
public KeyValue next() throws IOException {
|
||||
return heap.next();
|
||||
}
|
||||
|
||||
|
|
|
@ -1304,9 +1304,10 @@ public class Store implements HConstants, HeapSize {
|
|||
|
||||
/**
|
||||
* Return a scanner for both the memstore and the HStore files
|
||||
* @throws IOException
|
||||
*/
|
||||
protected KeyValueScanner getScanner(Scan scan,
|
||||
final NavigableSet<byte []> targetCols) {
|
||||
final NavigableSet<byte []> targetCols) throws IOException {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
return new StoreScanner(this, scan, targetCols);
|
||||
|
|
|
@ -56,17 +56,11 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
public static List<StoreFileScanner> getScannersForStoreFiles(
|
||||
Collection<StoreFile> filesToCompact,
|
||||
boolean cacheBlocks,
|
||||
boolean usePread) {
|
||||
boolean usePread) throws IOException {
|
||||
List<StoreFileScanner> scanners =
|
||||
new ArrayList<StoreFileScanner>(filesToCompact.size());
|
||||
for (StoreFile file : filesToCompact) {
|
||||
Reader r = file.getReader();
|
||||
if (r == null) {
|
||||
// TODO why can this happen? this seems like something worth
|
||||
// throwing an exception over!
|
||||
LOG.error("StoreFile " + file + " has a null Reader");
|
||||
continue;
|
||||
}
|
||||
Reader r = file.createReader();
|
||||
scanners.add(new StoreFileScanner(r.getScanner(cacheBlocks, usePread)));
|
||||
}
|
||||
return scanners;
|
||||
|
@ -84,7 +78,7 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
return cur;
|
||||
}
|
||||
|
||||
public KeyValue next() {
|
||||
public KeyValue next() throws IOException {
|
||||
KeyValue retKey = cur;
|
||||
cur = hfs.getKeyValue();
|
||||
try {
|
||||
|
@ -92,13 +86,12 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
if (cur != null)
|
||||
hfs.next();
|
||||
} catch(IOException e) {
|
||||
// Turn checked exception into runtime exception.
|
||||
throw new RuntimeException(e);
|
||||
throw new IOException("Could not iterate " + this, e);
|
||||
}
|
||||
return retKey;
|
||||
}
|
||||
|
||||
public boolean seek(KeyValue key) {
|
||||
public boolean seek(KeyValue key) throws IOException {
|
||||
try {
|
||||
if(!seekAtOrAfter(hfs, key)) {
|
||||
close();
|
||||
|
@ -108,8 +101,7 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
hfs.next();
|
||||
return true;
|
||||
} catch(IOException ioe) {
|
||||
close();
|
||||
return false;
|
||||
throw new IOException("Could not seek " + this, ioe);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,8 +52,9 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
* @param store who we scan
|
||||
* @param scan the spec
|
||||
* @param columns which columns we are scanning
|
||||
* @throws IOException
|
||||
*/
|
||||
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) {
|
||||
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) throws IOException {
|
||||
this.store = store;
|
||||
this.cacheBlocks = scan.getCacheBlocks();
|
||||
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
|
||||
|
@ -83,7 +84,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
* @param scan the spec
|
||||
* @param scanners ancilliary scanners
|
||||
*/
|
||||
StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners) {
|
||||
StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
this.store = store;
|
||||
this.cacheBlocks = false;
|
||||
this.isGet = false;
|
||||
|
@ -104,7 +106,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
|
||||
final KeyValue.KVComparator comparator,
|
||||
final NavigableSet<byte[]> columns,
|
||||
final List<KeyValueScanner> scanners) {
|
||||
final List<KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
this.store = null;
|
||||
this.isGet = false;
|
||||
this.cacheBlocks = scan.getCacheBlocks();
|
||||
|
@ -121,7 +124,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
/*
|
||||
* @return List of scanners ordered properly.
|
||||
*/
|
||||
private List<KeyValueScanner> getScanners() {
|
||||
private List<KeyValueScanner> getScanners() throws IOException {
|
||||
// First the store file scanners
|
||||
Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
|
||||
List<StoreFileScanner> sfScanners = StoreFileScanner
|
||||
|
@ -138,7 +141,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
* @return List of scanners to seek, possibly filtered by StoreFile.
|
||||
*/
|
||||
private List<KeyValueScanner> getScanners(Scan scan,
|
||||
final NavigableSet<byte[]> columns) {
|
||||
final NavigableSet<byte[]> columns) throws IOException {
|
||||
// First the store file scanners
|
||||
Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
|
||||
List<StoreFileScanner> sfScanners = StoreFileScanner
|
||||
|
@ -178,7 +181,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
this.heap.close();
|
||||
}
|
||||
|
||||
public synchronized boolean seek(KeyValue key) {
|
||||
public synchronized boolean seek(KeyValue key) throws IOException {
|
||||
return this.heap.seek(key);
|
||||
}
|
||||
|
||||
|
@ -282,4 +285,4 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
KeyValue kv = heap.peek();
|
||||
matcher.setRow((kv == null ? topKey : kv).getRow());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -470,6 +470,21 @@ public class HBaseTestingUtility {
|
|||
return rowCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of rows in the given table.
|
||||
*/
|
||||
public int countRows(final HTable table) throws IOException {
|
||||
Scan scan = new Scan();
|
||||
ResultScanner results = table.getScanner(scan);
|
||||
int count = 0;
|
||||
for (@SuppressWarnings("unused") Result res : results) {
|
||||
count++;
|
||||
}
|
||||
results.close();
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates many regions names "aaa" to "zzz".
|
||||
*
|
||||
|
@ -727,4 +742,8 @@ public class HBaseTestingUtility {
|
|||
public MiniDFSCluster getDFSCluster() {
|
||||
return dfsCluster;
|
||||
}
|
||||
}
|
||||
|
||||
public FileSystem getTestFileSystem() throws IOException {
|
||||
return FileSystem.get(conf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.HConnectionManager;
|
|||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
|
@ -345,6 +346,19 @@ public class MiniHBaseCluster implements HConstants {
|
|||
public HRegionServer getRegionServer(int serverNumber) {
|
||||
return hbaseCluster.getRegionServer(serverNumber);
|
||||
}
|
||||
|
||||
public List<HRegion> getRegions(byte[] tableName) {
|
||||
List<HRegion> ret = new ArrayList<HRegion>();
|
||||
for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
|
||||
HRegionServer hrs = rst.getRegionServer();
|
||||
for (HRegion region : hrs.getOnlineRegions()) {
|
||||
if (Bytes.equals(region.getTableDesc().getName(), tableName)) {
|
||||
ret.add(region);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
@ -64,7 +65,7 @@ implements HConstants {
|
|||
col5 = Bytes.toBytes("col5");
|
||||
}
|
||||
|
||||
public void testSorted(){
|
||||
public void testSorted() throws IOException{
|
||||
//Cases that need to be checked are:
|
||||
//1. The "smallest" KeyValue is in the same scanners as current
|
||||
//2. Current scanner gets empty
|
||||
|
@ -126,7 +127,7 @@ implements HConstants {
|
|||
|
||||
}
|
||||
|
||||
public void testSeek(){
|
||||
public void testSeek() throws IOException {
|
||||
//Cases:
|
||||
//1. Seek KeyValue that is not in scanner
|
||||
//2. Check that smallest that is returned from a seek is correct
|
||||
|
@ -174,7 +175,7 @@ implements HConstants {
|
|||
|
||||
}
|
||||
|
||||
public void testScannerLeak() {
|
||||
public void testScannerLeak() throws IOException {
|
||||
// Test for unclosed scanners (HBASE-1927)
|
||||
|
||||
List<KeyValue> l1 = new ArrayList<KeyValue>();
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||
|
@ -28,7 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
public class TestKeyValueScanFixture extends TestCase {
|
||||
|
||||
|
||||
public void testKeyValueScanFixture() {
|
||||
public void testKeyValueScanFixture() throws IOException {
|
||||
KeyValue kvs[] = new KeyValue[]{
|
||||
KeyValueTestUtil.create("RowA", "family", "qf1",
|
||||
1, KeyValue.Type.Put, "value-1"),
|
||||
|
|
|
@ -158,8 +158,9 @@ public class TestMemStore extends TestCase {
|
|||
|
||||
/**
|
||||
* A simple test which verifies the 3 possible states when scanning across snapshot.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testScanAcrossSnapshot2() {
|
||||
public void testScanAcrossSnapshot2() throws IOException {
|
||||
// we are going to the scanning across snapshot with two kvs
|
||||
// kv1 should always be returned before kv2
|
||||
final byte[] one = Bytes.toBytes(1);
|
||||
|
@ -188,7 +189,8 @@ public class TestMemStore extends TestCase {
|
|||
verifyScanAcrossSnapshot2(kv1, kv2);
|
||||
}
|
||||
|
||||
private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) {
|
||||
private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
|
||||
throws IOException {
|
||||
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
|
||||
List<KeyValueScanner> memstorescanners = this.memstore.getScanners();
|
||||
assertEquals(1, memstorescanners.size());
|
||||
|
@ -199,7 +201,8 @@ public class TestMemStore extends TestCase {
|
|||
assertNull(scanner.next());
|
||||
}
|
||||
|
||||
private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) {
|
||||
private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)
|
||||
throws IOException {
|
||||
scanner.seek(KeyValue.createFirstOnRow(new byte[]{}));
|
||||
for (KeyValue kv : expected) {
|
||||
assertTrue(0 ==
|
||||
|
@ -209,7 +212,7 @@ public class TestMemStore extends TestCase {
|
|||
assertNull(scanner.peek());
|
||||
}
|
||||
|
||||
public void testMemstoreConcurrentControl() {
|
||||
public void testMemstoreConcurrentControl() throws IOException {
|
||||
final byte[] row = Bytes.toBytes(1);
|
||||
final byte[] f = Bytes.toBytes("family");
|
||||
final byte[] q1 = Bytes.toBytes("q1");
|
||||
|
@ -250,7 +253,6 @@ public class TestMemStore extends TestCase {
|
|||
}
|
||||
|
||||
private static class ReadOwnWritesTester extends Thread {
|
||||
final int id;
|
||||
static final int NUM_TRIES = 1000;
|
||||
|
||||
final byte[] row;
|
||||
|
@ -269,7 +271,6 @@ public class TestMemStore extends TestCase {
|
|||
ReadWriteConsistencyControl rwcc,
|
||||
AtomicReference<Throwable> caughtException)
|
||||
{
|
||||
this.id = id;
|
||||
this.rwcc = rwcc;
|
||||
this.memstore = memstore;
|
||||
this.caughtException = caughtException;
|
||||
|
@ -284,7 +285,7 @@ public class TestMemStore extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private void internalRun() {
|
||||
private void internalRun() throws IOException {
|
||||
for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
|
||||
ReadWriteConsistencyControl.WriteEntry w =
|
||||
rwcc.beginMemstoreInsert();
|
||||
|
@ -855,7 +856,7 @@ public class TestMemStore extends TestCase {
|
|||
}
|
||||
|
||||
|
||||
static void doScan(MemStore ms, int iteration) {
|
||||
static void doScan(MemStore ms, int iteration) throws IOException {
|
||||
long nanos = System.nanoTime();
|
||||
KeyValueScanner s = ms.getScanners().get(0);
|
||||
s.seek(KeyValue.createFirstOnRow(new byte[]{}));
|
||||
|
@ -868,7 +869,7 @@ public class TestMemStore extends TestCase {
|
|||
|
||||
}
|
||||
|
||||
public static void main(String [] args) {
|
||||
public static void main(String [] args) throws IOException {
|
||||
ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
|
||||
MemStore ms = new MemStore();
|
||||
|
||||
|
|
|
@ -79,22 +79,23 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
StoreFile.BloomType.NONE, false));
|
||||
}
|
||||
|
||||
private void writeStoreFile(final HFile.Writer writer) throws IOException {
|
||||
writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
|
||||
}
|
||||
/*
|
||||
* Writes HStoreKey and ImmutableBytes data to passed writer and
|
||||
* then closes it.
|
||||
* @param writer
|
||||
* @throws IOException
|
||||
*/
|
||||
private void writeStoreFile(final HFile.Writer writer)
|
||||
public static void writeStoreFile(final HFile.Writer writer, byte[] fam, byte[] qualifier)
|
||||
throws IOException {
|
||||
long now = System.currentTimeMillis();
|
||||
byte [] fam = Bytes.toBytes(getName());
|
||||
byte [] qf = Bytes.toBytes(getName());
|
||||
try {
|
||||
for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
|
||||
for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
|
||||
byte[] b = new byte[] { (byte) d, (byte) e };
|
||||
writer.append(new KeyValue(b, fam, qf, now, b));
|
||||
writer.append(new KeyValue(b, fam, qualifier, now, b));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue