HBASE-5757 TableInputFormat should handle as many errors as possible (Jan Lukavsky)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1341132 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2cac550467
commit
b36c9ceea2
|
@ -24,7 +24,6 @@ import java.io.IOException;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
@ -206,9 +205,9 @@ public class TableRecordReaderImpl {
|
||||||
rowcount = 0;
|
rowcount = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (DoNotRetryIOException e) {
|
|
||||||
throw e;
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
// try to handle all IOExceptions by restarting
|
||||||
|
// the scanner, if the second call fails, it will be rethrown
|
||||||
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
LOG.debug("recovered from " + StringUtils.stringifyException(e));
|
||||||
if (lastSuccessfulRow == null) {
|
if (lastSuccessfulRow == null) {
|
||||||
LOG.warn("We are restarting the first next() invocation," +
|
LOG.warn("We are restarting the first next() invocation," +
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
@ -65,6 +64,7 @@ public class TableRecordReaderImpl {
|
||||||
private Result value = null;
|
private Result value = null;
|
||||||
private TaskAttemptContext context = null;
|
private TaskAttemptContext context = null;
|
||||||
private Method getCounter = null;
|
private Method getCounter = null;
|
||||||
|
private long numRestarts = 0;
|
||||||
private long timestamp;
|
private long timestamp;
|
||||||
private int rowcount;
|
private int rowcount;
|
||||||
private boolean logScannerActivity = false;
|
private boolean logScannerActivity = false;
|
||||||
|
@ -202,9 +202,9 @@ public class TableRecordReaderImpl {
|
||||||
rowcount = 0;
|
rowcount = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (DoNotRetryIOException e) {
|
|
||||||
throw e;
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
// try to handle all IOExceptions by restarting
|
||||||
|
// the scanner, if the second call fails, it will be rethrown
|
||||||
LOG.info("recovered from " + StringUtils.stringifyException(e));
|
LOG.info("recovered from " + StringUtils.stringifyException(e));
|
||||||
if (lastSuccessfulRow == null) {
|
if (lastSuccessfulRow == null) {
|
||||||
LOG.warn("We are restarting the first next() invocation," +
|
LOG.warn("We are restarting the first next() invocation," +
|
||||||
|
@ -219,6 +219,7 @@ public class TableRecordReaderImpl {
|
||||||
scanner.next(); // skip presumed already mapped row
|
scanner.next(); // skip presumed already mapped row
|
||||||
}
|
}
|
||||||
value = scanner.next();
|
value = scanner.next();
|
||||||
|
numRestarts++;
|
||||||
}
|
}
|
||||||
if (value != null && value.size() > 0) {
|
if (value != null && value.size() > 0) {
|
||||||
key.set(value.getRow());
|
key.set(value.getRow());
|
||||||
|
@ -274,6 +275,8 @@ public class TableRecordReaderImpl {
|
||||||
HBASE_COUNTER_GROUP_NAME, mlv.getName());
|
HBASE_COUNTER_GROUP_NAME, mlv.getName());
|
||||||
ct.increment(mlv.getCurrentIntervalValue());
|
ct.increment(mlv.getCurrentIntervalValue());
|
||||||
}
|
}
|
||||||
|
((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
|
||||||
|
"NUM_SCANNER_RESTARTS")).increment(numRestarts);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
|
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,16 +195,16 @@ public class TestTableInputFormat {
|
||||||
*
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
static HTable createIOEScannerTable(byte[] name) throws IOException {
|
static HTable createIOEScannerTable(byte[] name, final int failCnt)
|
||||||
|
throws IOException {
|
||||||
// build up a mock scanner stuff to fail the first time
|
// build up a mock scanner stuff to fail the first time
|
||||||
Answer<ResultScanner> a = new Answer<ResultScanner>() {
|
Answer<ResultScanner> a = new Answer<ResultScanner>() {
|
||||||
boolean first = true;
|
int cnt = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
|
public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
|
||||||
// first invocation return the busted mock scanner
|
// first invocation return the busted mock scanner
|
||||||
if (first) {
|
if (cnt++ < failCnt) {
|
||||||
first = false;
|
|
||||||
// create mock ResultScanner that always fails.
|
// create mock ResultScanner that always fails.
|
||||||
Scan scan = mock(Scan.class);
|
Scan scan = mock(Scan.class);
|
||||||
doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
|
doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
|
||||||
|
@ -230,16 +230,16 @@ public class TestTableInputFormat {
|
||||||
*
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
static HTable createDNRIOEScannerTable(byte[] name) throws IOException {
|
static HTable createDNRIOEScannerTable(byte[] name, final int failCnt)
|
||||||
|
throws IOException {
|
||||||
// build up a mock scanner stuff to fail the first time
|
// build up a mock scanner stuff to fail the first time
|
||||||
Answer<ResultScanner> a = new Answer<ResultScanner>() {
|
Answer<ResultScanner> a = new Answer<ResultScanner>() {
|
||||||
boolean first = true;
|
int cnt = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
|
public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
|
||||||
// first invocation return the busted mock scanner
|
// first invocation return the busted mock scanner
|
||||||
if (first) {
|
if (cnt++ < failCnt) {
|
||||||
first = false;
|
|
||||||
// create mock ResultScanner that always fails.
|
// create mock ResultScanner that always fails.
|
||||||
Scan scan = mock(Scan.class);
|
Scan scan = mock(Scan.class);
|
||||||
doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
|
doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
|
||||||
|
@ -280,7 +280,30 @@ public class TestTableInputFormat {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testTableRecordReaderScannerFail() throws IOException {
|
public void testTableRecordReaderScannerFail() throws IOException {
|
||||||
HTable htable = createIOEScannerTable("table2".getBytes());
|
HTable htable = createIOEScannerTable("table2".getBytes(), 1);
|
||||||
|
runTestMapred(htable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run test assuming Scanner IOException failure using mapred api,
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testTableRecordReaderScannerFailTwice() throws IOException {
|
||||||
|
HTable htable = createIOEScannerTable("table3".getBytes(), 2);
|
||||||
|
runTestMapred(htable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run test assuming UnknownScannerException (which is a type of
|
||||||
|
* DoNotRetryIOException) using mapred api.
|
||||||
|
*
|
||||||
|
* @throws DoNotRetryIOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTableRecordReaderScannerTimeout() throws IOException {
|
||||||
|
HTable htable = createDNRIOEScannerTable("table4".getBytes(), 1);
|
||||||
runTestMapred(htable);
|
runTestMapred(htable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,8 +314,8 @@ public class TestTableInputFormat {
|
||||||
* @throws DoNotRetryIOException
|
* @throws DoNotRetryIOException
|
||||||
*/
|
*/
|
||||||
@Test(expected = DoNotRetryIOException.class)
|
@Test(expected = DoNotRetryIOException.class)
|
||||||
public void testTableRecordReaderScannerTimeout() throws IOException {
|
public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
|
||||||
HTable htable = createDNRIOEScannerTable("table3".getBytes());
|
HTable htable = createDNRIOEScannerTable("table5".getBytes(), 2);
|
||||||
runTestMapred(htable);
|
runTestMapred(htable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,7 +341,34 @@ public class TestTableInputFormat {
|
||||||
@Test
|
@Test
|
||||||
public void testTableRecordReaderScannerFailMapreduce() throws IOException,
|
public void testTableRecordReaderScannerFailMapreduce() throws IOException,
|
||||||
InterruptedException {
|
InterruptedException {
|
||||||
HTable htable = createIOEScannerTable("table2-mr".getBytes());
|
HTable htable = createIOEScannerTable("table2-mr".getBytes(), 1);
|
||||||
|
runTestMapreduce(htable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run test assuming Scanner IOException failure using newer mapreduce api
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
HTable htable = createIOEScannerTable("table3-mr".getBytes(), 2);
|
||||||
|
runTestMapreduce(htable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run test assuming UnknownScannerException (which is a type of
|
||||||
|
* DoNotRetryIOException) using newer mapreduce api
|
||||||
|
*
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws DoNotRetryIOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTableRecordReaderScannerTimeoutMapreduce()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
HTable htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
|
||||||
runTestMapreduce(htable);
|
runTestMapreduce(htable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,9 +380,9 @@ public class TestTableInputFormat {
|
||||||
* @throws DoNotRetryIOException
|
* @throws DoNotRetryIOException
|
||||||
*/
|
*/
|
||||||
@Test(expected = DoNotRetryIOException.class)
|
@Test(expected = DoNotRetryIOException.class)
|
||||||
public void testTableRecordReaderScannerTimeoutMapreduce()
|
public void testTableRecordReaderScannerTimeoutMapreduceTwice()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
HTable htable = createDNRIOEScannerTable("table3-mr".getBytes());
|
HTable htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
|
||||||
runTestMapreduce(htable);
|
runTestMapreduce(htable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue