HBASE-22230 REST Server drops connection on long scan
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
539de1cae9
commit
fdf01ca7f8
|
@ -46,8 +46,8 @@ public class RESTServlet implements Constants {
|
|||
private final UserGroupInformation realUser;
|
||||
private final JvmPauseMonitor pauseMonitor;
|
||||
|
||||
static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
|
||||
static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
|
||||
public static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
|
||||
public static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
|
||||
static final String HBASE_REST_SUPPORT_PROXYUSER = "hbase.rest.support.proxyuser";
|
||||
|
||||
UserGroupInformation getRealUser() {
|
||||
|
@ -62,6 +62,13 @@ public class RESTServlet implements Constants {
|
|||
return INSTANCE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the ConnectionCache instance
|
||||
*/
|
||||
public ConnectionCache getConnectionCache() {
|
||||
return connectionCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf Existing configuration to use in rest servlet
|
||||
* @param userProvider the login user provider
|
||||
|
|
|
@ -83,6 +83,9 @@ public class ScannerInstanceResource extends ResourceBase {
|
|||
return Response.status(Response.Status.NOT_FOUND)
|
||||
.type(MIMETYPE_TEXT).entity("Not found" + CRLF)
|
||||
.build();
|
||||
} else {
|
||||
// Updated the connection access time for each client next() call
|
||||
RESTServlet.getInstance().getConnectionCache().updateConnectionAccessTime();
|
||||
}
|
||||
CellSetModel model = new CellSetModel();
|
||||
RowModel rowModel = null;
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.rest.HBaseRESTTestingUtility;
|
||||
import org.apache.hadoop.hbase.rest.RESTServlet;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -571,5 +572,60 @@ public class TestRemoteTable {
|
|||
assertTrue(response.hasBody());
|
||||
}
|
||||
|
||||
}
|
||||
/**
|
||||
* Tests keeping a HBase scanner alive for long periods of time. Each call to next() should reset
|
||||
* the ConnectionCache timeout for the scanner's connection
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testLongLivedScan() throws Exception {
|
||||
int numTrials = 6;
|
||||
int trialPause = 1000;
|
||||
int cleanUpInterval = 100;
|
||||
|
||||
// Shutdown the Rest Servlet container
|
||||
REST_TEST_UTIL.shutdownServletContainer();
|
||||
|
||||
// Set the ConnectionCache timeout to trigger halfway through the trials
|
||||
TEST_UTIL.getConfiguration().setLong(RESTServlet.MAX_IDLETIME, (numTrials / 2) * trialPause);
|
||||
TEST_UTIL.getConfiguration().setLong(RESTServlet.CLEANUP_INTERVAL, cleanUpInterval);
|
||||
|
||||
// Start the Rest Servlet container
|
||||
REST_TEST_UTIL.startServletContainer(TEST_UTIL.getConfiguration());
|
||||
|
||||
// Truncate the test table for inserting test scenarios rows keys
|
||||
TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
|
||||
TEST_UTIL.getHBaseAdmin().truncateTable(TABLE, false);
|
||||
|
||||
remoteTable = new RemoteHTable(
|
||||
new Client(new Cluster().add("localhost", REST_TEST_UTIL.getServletPort())),
|
||||
TEST_UTIL.getConfiguration(), TABLE.toBytes());
|
||||
|
||||
String row = "testrow";
|
||||
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) {
|
||||
List<Put> puts = new ArrayList<Put>();
|
||||
Put put = null;
|
||||
for (int i = 1; i <= numTrials; i++) {
|
||||
put = new Put(Bytes.toBytes(row + i));
|
||||
put.addColumn(COLUMN_1, QUALIFIER_1, TS_2, Bytes.toBytes("testvalue" + i));
|
||||
puts.add(put);
|
||||
}
|
||||
table.put(puts);
|
||||
}
|
||||
|
||||
Scan scan = new Scan();
|
||||
scan.setCaching(1);
|
||||
scan.setBatch(1);
|
||||
|
||||
ResultScanner scanner = remoteTable.getScanner(scan);
|
||||
Result result = null;
|
||||
// get scanner and rows
|
||||
for (int i = 1; i <= numTrials; i++) {
|
||||
// Make sure that the Scanner doesn't throw an exception after the ConnectionCache timeout
|
||||
result = scanner.next();
|
||||
assertEquals(row + i, Bytes.toString(result.getRow()));
|
||||
Thread.sleep(trialPause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue