diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 79d42a671d1..0f1a3aa83f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -266,6 +266,7 @@ public class ClientScanner extends AbstractClientScanner { // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. boolean skipFirst = false; + boolean retryAfterOutOfOrderException = true; do { try { if (skipFirst) { @@ -280,6 +281,7 @@ public class ClientScanner extends AbstractClientScanner { // returns an empty array if scanning is to go on and we've just // exhausted current region. values = callable.withRetries(); + retryAfterOutOfOrderException = true; } catch (DoNotRetryIOException e) { if (e instanceof UnknownScannerException) { long timeout = lastNext + scannerTimeout; @@ -310,6 +312,14 @@ public class ClientScanner extends AbstractClientScanner { // invocation. skipFirst = true; } + if (e instanceof OutOfOrderScannerNextException) { + if (retryAfterOutOfOrderException) { + retryAfterOutOfOrderException = false; + } else { + throw new DoNotRetryIOException("Failed after retry" + + ", it could be cause by rpc timeout", e); + } + } // Clear region this.currentRegion = null; callable = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java index 3c90fdc781d..b3e166483b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -43,17 +45,20 @@ import com.google.protobuf.ServiceException; */ @Category(MediumTests.class) public class TestClientScannerRPCTimeout { + final Log LOG = LogFactory.getLog(getClass()); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final byte[] FAMILY = Bytes.toBytes("testFamily"); private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); private static final byte[] VALUE = Bytes.toBytes("testValue"); private static final int rpcTimeout = 2 * 1000; + private static final int CLIENT_RETRIES_NUMBER = 3; @BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); TEST_UTIL.startMiniCluster(1); } @@ -86,6 +91,20 @@ public class TestClientScannerRPCTimeout { result = scanner.next(); assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow())); scanner.close(); + + // test the case that RPC is always timesout + scanner = ht.getScanner(scan); + RegionServerWithScanTimeout.sleepAlways = true; + RegionServerWithScanTimeout.tryNumber = 0; + try { + result = scanner.next(); + } catch (IOException ioe) { + // catch the exception after max retry number + LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe); + } + assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER + + ", actual =" + RegionServerWithScanTimeout.tryNumber, + RegionServerWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER); } private void putToTable(HTable ht, byte[] rowkey) throws IOException { @@ -98,6 +117,8 @@ public class TestClientScannerRPCTimeout { private long tableScannerId; private boolean slept; private static long seqNoToSleepOn = -1; + private static boolean sleepAlways = false; + private static int tryNumber = 0; public RegionServerWithScanTimeout(Configuration conf) throws IOException, InterruptedException { super(conf); @@ -107,15 +128,20 @@ public class TestClientScannerRPCTimeout { public ScanResponse scan(final RpcController controller, final ScanRequest request) throws ServiceException { if (request.hasScannerId()) { - if (!slept && this.tableScannerId == request.getScannerId() - && seqNoToSleepOn == request.getNextCallSeq()) { + ScanResponse scanResponse = super.scan(controller, request); + if (this.tableScannerId == request.getScannerId() && + (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) { try { Thread.sleep(rpcTimeout + 500); } catch (InterruptedException e) { } slept = true; + tryNumber++; + if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) { + sleepAlways = false; + } } - return super.scan(controller, request); + return scanResponse; } else { ScanResponse scanRes = super.scan(controller, request); String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());