HBASE-7070 Scanner may retry forever after HBASE-5974
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1405107 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
853d775a33
commit
624021fb4d
|
@ -266,6 +266,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
// This flag is set when we want to skip the result returned. We do
|
||||
// this when we reset scanner because it split under us.
|
||||
boolean skipFirst = false;
|
||||
boolean retryAfterOutOfOrderException = true;
|
||||
do {
|
||||
try {
|
||||
if (skipFirst) {
|
||||
|
@ -280,6 +281,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
// returns an empty array if scanning is to go on and we've just
|
||||
// exhausted current region.
|
||||
values = callable.withRetries();
|
||||
retryAfterOutOfOrderException = true;
|
||||
} catch (DoNotRetryIOException e) {
|
||||
if (e instanceof UnknownScannerException) {
|
||||
long timeout = lastNext + scannerTimeout;
|
||||
|
@ -310,6 +312,14 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
// invocation.
|
||||
skipFirst = true;
|
||||
}
|
||||
if (e instanceof OutOfOrderScannerNextException) {
|
||||
if (retryAfterOutOfOrderException) {
|
||||
retryAfterOutOfOrderException = false;
|
||||
} else {
|
||||
throw new DoNotRetryIOException("Failed after retry"
|
||||
+ ", it could be cause by rpc timeout", e);
|
||||
}
|
||||
}
|
||||
// Clear region
|
||||
this.currentRegion = null;
|
||||
callable = null;
|
||||
|
|
|
@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -43,17 +45,20 @@ import com.google.protobuf.ServiceException;
|
|||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestClientScannerRPCTimeout {
|
||||
final Log LOG = LogFactory.getLog(getClass());
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
|
||||
private static final byte[] VALUE = Bytes.toBytes("testValue");
|
||||
private static final int rpcTimeout = 2 * 1000;
|
||||
private static final int CLIENT_RETRIES_NUMBER = 3;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
|
||||
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
|
||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
|
@ -86,6 +91,20 @@ public class TestClientScannerRPCTimeout {
|
|||
result = scanner.next();
|
||||
assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
|
||||
scanner.close();
|
||||
|
||||
// test the case that RPC is always timesout
|
||||
scanner = ht.getScanner(scan);
|
||||
RegionServerWithScanTimeout.sleepAlways = true;
|
||||
RegionServerWithScanTimeout.tryNumber = 0;
|
||||
try {
|
||||
result = scanner.next();
|
||||
} catch (IOException ioe) {
|
||||
// catch the exception after max retry number
|
||||
LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe);
|
||||
}
|
||||
assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER
|
||||
+ ", actual =" + RegionServerWithScanTimeout.tryNumber,
|
||||
RegionServerWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER);
|
||||
}
|
||||
|
||||
private void putToTable(HTable ht, byte[] rowkey) throws IOException {
|
||||
|
@ -98,6 +117,8 @@ public class TestClientScannerRPCTimeout {
|
|||
private long tableScannerId;
|
||||
private boolean slept;
|
||||
private static long seqNoToSleepOn = -1;
|
||||
private static boolean sleepAlways = false;
|
||||
private static int tryNumber = 0;
|
||||
|
||||
public RegionServerWithScanTimeout(Configuration conf) throws IOException, InterruptedException {
|
||||
super(conf);
|
||||
|
@ -107,15 +128,20 @@ public class TestClientScannerRPCTimeout {
|
|||
public ScanResponse scan(final RpcController controller, final ScanRequest request)
|
||||
throws ServiceException {
|
||||
if (request.hasScannerId()) {
|
||||
if (!slept && this.tableScannerId == request.getScannerId()
|
||||
&& seqNoToSleepOn == request.getNextCallSeq()) {
|
||||
ScanResponse scanResponse = super.scan(controller, request);
|
||||
if (this.tableScannerId == request.getScannerId() &&
|
||||
(sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
|
||||
try {
|
||||
Thread.sleep(rpcTimeout + 500);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
slept = true;
|
||||
tryNumber++;
|
||||
if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) {
|
||||
sleepAlways = false;
|
||||
}
|
||||
}
|
||||
return super.scan(controller, request);
|
||||
return scanResponse;
|
||||
} else {
|
||||
ScanResponse scanRes = super.scan(controller, request);
|
||||
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
|
||||
|
|
Loading…
Reference in New Issue