HBASE-2772 Scan doesn't recover from region server failure
HBASE-2775 Update of hadoop jar in HBASE-2771 broke TestMultiClusters git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@957333 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c4654b285a
commit
049120cdde
2
pom.xml
2
pom.xml
|
@ -443,7 +443,7 @@
|
||||||
<compileSource>1.6</compileSource>
|
<compileSource>1.6</compileSource>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<hbase.version>0.21.0-SNAPSHOT</hbase.version>
|
<hbase.version>0.21.0-SNAPSHOT</hbase.version>
|
||||||
<hadoop.version>0.20.3-append-r956776</hadoop.version>
|
<hadoop.version>0.20.3-append-r956776+1240</hadoop.version>
|
||||||
|
|
||||||
<commons-cli.version>1.2</commons-cli.version>
|
<commons-cli.version>1.2</commons-cli.version>
|
||||||
<commons-logging.version>1.1.1</commons-logging.version>
|
<commons-logging.version>1.1.1</commons-logging.version>
|
||||||
|
|
|
@ -942,19 +942,24 @@ public class HTable implements HTableInterface {
|
||||||
values = getConnection().getRegionServerWithRetries(callable);
|
values = getConnection().getRegionServerWithRetries(callable);
|
||||||
}
|
}
|
||||||
} catch (DoNotRetryIOException e) {
|
} catch (DoNotRetryIOException e) {
|
||||||
long timeout = lastNext + scannerTimeout;
|
if (e instanceof UnknownScannerException) {
|
||||||
if (e instanceof UnknownScannerException &&
|
long timeout = lastNext + scannerTimeout;
|
||||||
timeout < System.currentTimeMillis()) {
|
// If we are over the timeout, throw this exception to the client
|
||||||
long elapsed = System.currentTimeMillis() - lastNext;
|
// Else, it's because the region moved and we used the old id
|
||||||
ScannerTimeoutException ex = new ScannerTimeoutException(
|
// against the new region server; reset the scanner.
|
||||||
elapsed + "ms passed since the last invocation, " +
|
if (timeout < System.currentTimeMillis()) {
|
||||||
"timeout is currently set to " + scannerTimeout);
|
long elapsed = System.currentTimeMillis() - lastNext;
|
||||||
ex.initCause(e);
|
ScannerTimeoutException ex = new ScannerTimeoutException(
|
||||||
throw ex;
|
elapsed + "ms passed since the last invocation, " +
|
||||||
}
|
"timeout is currently set to " + scannerTimeout);
|
||||||
Throwable cause = e.getCause();
|
ex.initCause(e);
|
||||||
if (cause == null || !(cause instanceof NotServingRegionException)) {
|
throw ex;
|
||||||
throw e;
|
}
|
||||||
|
} else {
|
||||||
|
Throwable cause = e.getCause();
|
||||||
|
if (cause == null || !(cause instanceof NotServingRegionException)) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Else, its signal from depths of ScannerCallable that we got an
|
// Else, its signal from depths of ScannerCallable that we got an
|
||||||
// NSRE on a next and that we need to reset the scanner.
|
// NSRE on a next and that we need to reset the scanner.
|
||||||
|
|
|
@ -386,15 +386,25 @@ public class MiniHBaseCluster {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
|
* @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
|
||||||
* of HRS carrying .META. Returns -1 if none found.
|
* of HRS carrying regionName. Returns -1 if none found.
|
||||||
*/
|
*/
|
||||||
public int getServerWithMeta() {
|
public int getServerWithMeta() {
|
||||||
|
return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the location of the specified region
|
||||||
|
* @param regionName Name of the region in bytes
|
||||||
|
* @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
|
||||||
|
* of HRS carrying .META.. Returns -1 if none found.
|
||||||
|
*/
|
||||||
|
public int getServerWith(byte[] regionName) {
|
||||||
int index = -1;
|
int index = -1;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
|
for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
|
||||||
HRegionServer hrs = rst.getRegionServer();
|
HRegionServer hrs = rst.getRegionServer();
|
||||||
HRegion metaRegion =
|
HRegion metaRegion =
|
||||||
hrs.getOnlineRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
|
hrs.getOnlineRegion(regionName);
|
||||||
if (metaRegion != null) {
|
if (metaRegion != null) {
|
||||||
index = count;
|
index = count;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -23,7 +23,11 @@ public class TestScannerTimeout {
|
||||||
TEST_UTIL = new HBaseTestingUtility();
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
final Log LOG = LogFactory.getLog(getClass());
|
final Log LOG = LogFactory.getLog(getClass());
|
||||||
private final byte[] someBytes = Bytes.toBytes("f");
|
private final static byte[] SOME_BYTES = Bytes.toBytes("f");
|
||||||
|
private final static byte[] TABLE_NAME = Bytes.toBytes("t");
|
||||||
|
private final static int NB_ROWS = 10;
|
||||||
|
private final static int SCANNER_TIMEOUT = 6000;
|
||||||
|
private static HTable table;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws java.lang.Exception
|
* @throws java.lang.Exception
|
||||||
|
@ -31,8 +35,14 @@ public class TestScannerTimeout {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
Configuration c = TEST_UTIL.getConfiguration();
|
Configuration c = TEST_UTIL.getConfiguration();
|
||||||
c.setInt("hbase.regionserver.lease.period", 1000);
|
c.setInt("hbase.regionserver.lease.period", SCANNER_TIMEOUT);
|
||||||
TEST_UTIL.startMiniCluster(1);
|
TEST_UTIL.startMiniCluster(2);
|
||||||
|
table = TEST_UTIL.createTable(Bytes.toBytes("t"), SOME_BYTES);
|
||||||
|
for (int i = 0; i < NB_ROWS; i++) {
|
||||||
|
Put put = new Put(Bytes.toBytes(i));
|
||||||
|
put.add(SOME_BYTES, SOME_BYTES, SOME_BYTES);
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,13 +58,7 @@ public class TestScannerTimeout {
|
||||||
*/
|
*/
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
}
|
TEST_UTIL.ensureSomeRegionServersAvailable(2);
|
||||||
|
|
||||||
/**
|
|
||||||
* @throws java.lang.Exception
|
|
||||||
*/
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -63,22 +67,16 @@ public class TestScannerTimeout {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void test2481() throws Exception {
|
public void test2481() throws Exception {
|
||||||
int initialCount = 10;
|
|
||||||
HTable t = TEST_UTIL.createTable(Bytes.toBytes("t"), someBytes);
|
|
||||||
for (int i = 0; i < initialCount; i++) {
|
|
||||||
Put put = new Put(Bytes.toBytes(i));
|
|
||||||
put.add(someBytes, someBytes, someBytes);
|
|
||||||
t.put(put);
|
|
||||||
}
|
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
ResultScanner r = t.getScanner(scan);
|
ResultScanner r = table.getScanner(scan);
|
||||||
int count = 0;
|
int count = 0;
|
||||||
try {
|
try {
|
||||||
Result res = r.next();
|
Result res = r.next();
|
||||||
while (res != null) {
|
while (res != null) {
|
||||||
count++;
|
count++;
|
||||||
if (count == 5) {
|
if (count == 5) {
|
||||||
Thread.sleep(1500);
|
// Sleep just a bit more to be sure
|
||||||
|
Thread.sleep(SCANNER_TIMEOUT+100);
|
||||||
}
|
}
|
||||||
res = r.next();
|
res = r.next();
|
||||||
}
|
}
|
||||||
|
@ -88,4 +86,24 @@ public class TestScannerTimeout {
|
||||||
}
|
}
|
||||||
fail("We should be timing out");
|
fail("We should be timing out");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that scanner can continue even if the region server it was reading
|
||||||
|
* from failed. Before 2772, it reused the same scanner id.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test2772() throws Exception {
|
||||||
|
int rs = TEST_UTIL.getHBaseCluster().getServerWith(
|
||||||
|
TEST_UTIL.getHBaseCluster().getRegions(
|
||||||
|
TABLE_NAME).get(0).getRegionName());
|
||||||
|
Scan scan = new Scan();
|
||||||
|
ResultScanner r = table.getScanner(scan);
|
||||||
|
// This takes exactly 5 seconds
|
||||||
|
TEST_UTIL.getHBaseCluster().getRegionServer(rs).abort("die!");
|
||||||
|
Result[] results = r.next(NB_ROWS);
|
||||||
|
assertEquals(NB_ROWS, results.length);
|
||||||
|
r.close();
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue