diff --git a/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java b/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java index a3639716cc8..5212ccdd85b 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java +++ b/src/main/java/org/apache/hadoop/hbase/mapred/TableRecordReaderImpl.java @@ -24,7 +24,6 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -206,9 +205,9 @@ public class TableRecordReaderImpl { rowcount = 0; } } - } catch (DoNotRetryIOException e) { - throw e; } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown LOG.debug("recovered from " + StringUtils.stringifyException(e)); if (lastSuccessfulRow == null) { LOG.warn("We are restarting the first next() invocation," + diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 29f8915215b..7994aa038a7 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -65,6 +64,7 @@ public class TableRecordReaderImpl { private Result value = null; private TaskAttemptContext context = null; private Method getCounter = null; + private long numRestarts = 0; private long timestamp; private int rowcount; private boolean logScannerActivity = false; @@ -202,9 +202,9 @@ public class TableRecordReaderImpl { rowcount = 0; } } - } catch (DoNotRetryIOException e) { - throw e; } catch (IOException e) { + // try to handle all IOExceptions by restarting + // the scanner, if the second call fails, it will be rethrown LOG.info("recovered from " + StringUtils.stringifyException(e)); if (lastSuccessfulRow == null) { LOG.warn("We are restarting the first next() invocation," + @@ -219,6 +219,7 @@ public class TableRecordReaderImpl { scanner.next(); // skip presumed already mapped row } value = scanner.next(); + numRestarts++; } if (value != null && value.size() > 0) { key.set(value.getRow()); @@ -274,6 +275,8 @@ public class TableRecordReaderImpl { HBASE_COUNTER_GROUP_NAME, mlv.getName()); ct.increment(mlv.getCurrentIntervalValue()); } + ((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, + "NUM_SCANNER_RESTARTS")).increment(numRestarts); } catch (Exception e) { LOG.debug("can't update counter." + StringUtils.stringifyException(e)); } diff --git a/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java b/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java index 5956169c20b..38be7e42f53 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java +++ b/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java @@ -195,16 +195,16 @@ public class TestTableInputFormat { * * @throws IOException */ - static HTable createIOEScannerTable(byte[] name) throws IOException { + static HTable createIOEScannerTable(byte[] name, final int failCnt) + throws IOException { // build up a mock scanner stuff to fail the first time Answer a = new Answer() { - boolean first = true; + int cnt = 0; @Override public ResultScanner answer(InvocationOnMock invocation) throws Throwable { // first invocation return the busted mock scanner - if (first) { - first = false; + if (cnt++ < failCnt) { // create mock ResultScanner that always fails. Scan scan = mock(Scan.class); doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe @@ -230,16 +230,16 @@ public class TestTableInputFormat { * * @throws IOException */ - static HTable createDNRIOEScannerTable(byte[] name) throws IOException { + static HTable createDNRIOEScannerTable(byte[] name, final int failCnt) + throws IOException { // build up a mock scanner stuff to fail the first time Answer a = new Answer() { - boolean first = true; + int cnt = 0; @Override public ResultScanner answer(InvocationOnMock invocation) throws Throwable { // first invocation return the busted mock scanner - if (first) { - first = false; + if (cnt++ < failCnt) { // create mock ResultScanner that always fails. Scan scan = mock(Scan.class); doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe @@ -280,7 +280,30 @@ public class TestTableInputFormat { */ @Test public void testTableRecordReaderScannerFail() throws IOException { - HTable htable = createIOEScannerTable("table2".getBytes()); + HTable htable = createIOEScannerTable("table2".getBytes(), 1); + runTestMapred(htable); + } + + /** + * Run test assuming Scanner IOException failure using mapred api, + * + * @throws IOException + */ + @Test(expected = IOException.class) + public void testTableRecordReaderScannerFailTwice() throws IOException { + HTable htable = createIOEScannerTable("table3".getBytes(), 2); + runTestMapred(htable); + } + + /** + * Run test assuming UnknownScannerException (which is a type of + * DoNotRetryIOException) using mapred api. + * + * @throws DoNotRetryIOException + */ + @Test + public void testTableRecordReaderScannerTimeout() throws IOException { + HTable htable = createDNRIOEScannerTable("table4".getBytes(), 1); runTestMapred(htable); } @@ -291,8 +314,8 @@ public class TestTableInputFormat { * @throws DoNotRetryIOException */ @Test(expected = DoNotRetryIOException.class) - public void testTableRecordReaderScannerTimeout() throws IOException { - HTable htable = createDNRIOEScannerTable("table3".getBytes()); + public void testTableRecordReaderScannerTimeoutTwice() throws IOException { + HTable htable = createDNRIOEScannerTable("table5".getBytes(), 2); runTestMapred(htable); } @@ -318,7 +341,34 @@ public class TestTableInputFormat { @Test public void testTableRecordReaderScannerFailMapreduce() throws IOException, InterruptedException { - HTable htable = createIOEScannerTable("table2-mr".getBytes()); + HTable htable = createIOEScannerTable("table2-mr".getBytes(), 1); + runTestMapreduce(htable); + } + + /** + * Run test assuming Scanner IOException failure using newer mapreduce api + * + * @throws IOException + * @throws InterruptedException + */ + @Test(expected = IOException.class) + public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException, + InterruptedException { + HTable htable = createIOEScannerTable("table3-mr".getBytes(), 2); + runTestMapreduce(htable); + } + + /** + * Run test assuming UnknownScannerException (which is a type of + * DoNotRetryIOException) using newer mapreduce api + * + * @throws InterruptedException + * @throws DoNotRetryIOException + */ + @Test + public void testTableRecordReaderScannerTimeoutMapreduce() + throws IOException, InterruptedException { + HTable htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1); runTestMapreduce(htable); } @@ -330,9 +380,9 @@ public class TestTableInputFormat { * @throws DoNotRetryIOException */ @Test(expected = DoNotRetryIOException.class) - public void testTableRecordReaderScannerTimeoutMapreduce() + public void testTableRecordReaderScannerTimeoutMapreduceTwice() throws IOException, InterruptedException { - HTable htable = createDNRIOEScannerTable("table3-mr".getBytes()); + HTable htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2); runTestMapreduce(htable); }