HBASE-22230 REST Server drops connection on long scan

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Pankaj 2019-04-21 01:31:59 +05:30 committed by stack
parent 1f0e43a27a
commit 4d358b0a2c
3 changed files with 69 additions and 3 deletions

View File

@ -46,8 +46,8 @@ public class RESTServlet implements Constants {
private final UserGroupInformation realUser; private final UserGroupInformation realUser;
private final JvmPauseMonitor pauseMonitor; private final JvmPauseMonitor pauseMonitor;
static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval"; public static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime"; public static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
static final String HBASE_REST_SUPPORT_PROXYUSER = "hbase.rest.support.proxyuser"; static final String HBASE_REST_SUPPORT_PROXYUSER = "hbase.rest.support.proxyuser";
UserGroupInformation getRealUser() { UserGroupInformation getRealUser() {
@ -62,6 +62,13 @@ public class RESTServlet implements Constants {
return INSTANCE; return INSTANCE;
} }
/**
* @return the ConnectionCache instance
*/
public ConnectionCache getConnectionCache() {
return connectionCache;
}
/** /**
* @param conf Existing configuration to use in rest servlet * @param conf Existing configuration to use in rest servlet
* @param userProvider the login user provider * @param userProvider the login user provider

View File

@ -82,6 +82,9 @@ public class ScannerInstanceResource extends ResourceBase {
return Response.status(Response.Status.NOT_FOUND) return Response.status(Response.Status.NOT_FOUND)
.type(MIMETYPE_TEXT).entity("Not found" + CRLF) .type(MIMETYPE_TEXT).entity("Not found" + CRLF)
.build(); .build();
} else {
// Updated the connection access time for each client next() call
RESTServlet.getInstance().getConnectionCache().updateConnectionAccessTime();
} }
CellSetModel model = new CellSetModel(); CellSetModel model = new CellSetModel();
RowModel rowModel = null; RowModel rowModel = null;

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.rest.HBaseRESTTestingUtility; import org.apache.hadoop.hbase.rest.HBaseRESTTestingUtility;
import org.apache.hadoop.hbase.rest.RESTServlet;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RestTests; import org.apache.hadoop.hbase.testclassification.RestTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -576,5 +577,60 @@ public class TestRemoteTable {
assertTrue(response.hasBody()); assertTrue(response.hasBody());
} }
} /**
* Tests keeping a HBase scanner alive for long periods of time. Each call to next() should reset
* the ConnectionCache timeout for the scanner's connection
* @throws Exception
*/
@Test
public void testLongLivedScan() throws Exception {
int numTrials = 6;
int trialPause = 1000;
int cleanUpInterval = 100;
// Shutdown the Rest Servlet container
REST_TEST_UTIL.shutdownServletContainer();
// Set the ConnectionCache timeout to trigger halfway through the trials
TEST_UTIL.getConfiguration().setLong(RESTServlet.MAX_IDLETIME, (numTrials / 2) * trialPause);
TEST_UTIL.getConfiguration().setLong(RESTServlet.CLEANUP_INTERVAL, cleanUpInterval);
// Start the Rest Servlet container
REST_TEST_UTIL.startServletContainer(TEST_UTIL.getConfiguration());
// Truncate the test table for inserting test scenarios rows keys
TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
TEST_UTIL.getHBaseAdmin().truncateTable(TABLE, false);
remoteTable = new RemoteHTable(
new Client(new Cluster().add("localhost", REST_TEST_UTIL.getServletPort())),
TEST_UTIL.getConfiguration(), TABLE.toBytes());
String row = "testrow";
try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) {
List<Put> puts = new ArrayList<Put>();
Put put = null;
for (int i = 1; i <= numTrials; i++) {
put = new Put(Bytes.toBytes(row + i));
put.addColumn(COLUMN_1, QUALIFIER_1, TS_2, Bytes.toBytes("testvalue" + i));
puts.add(put);
}
table.put(puts);
}
Scan scan = new Scan();
scan.setCaching(1);
scan.setBatch(1);
ResultScanner scanner = remoteTable.getScanner(scan);
Result result = null;
// get scanner and rows
for (int i = 1; i <= numTrials; i++) {
// Make sure that the Scanner doesn't throw an exception after the ConnectionCache timeout
result = scanner.next();
assertEquals(row + i, Bytes.toString(result.getRow()));
Thread.sleep(trialPause);
}
}
}