HBASE-16221 Have ThriftScanner update the ConnectionCache's last used time overtime getScannerRow() to keep the connection alive for long lived scanners
Signed-off-by: Elliott Clark <eclark@apache.org>
This commit is contained in:
parent
cc766df28b
commit
03fe257a64
|
@ -189,6 +189,20 @@ public class ConnectionCache {
|
|||
return connInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the access time for the current connection. Used to keep Connections alive for
|
||||
* long-lived scanners.
|
||||
* @return whether we successfully updated the last access time
|
||||
*/
|
||||
public boolean updateConnectionAccessTime() {
|
||||
String userName = getEffectiveUser();
|
||||
ConnectionInfo connInfo = connections.get(userName);
|
||||
if (connInfo != null) {
|
||||
return connInfo.updateAccessTime();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
class ConnectionInfo {
|
||||
final Connection connection;
|
||||
final String userName;
|
||||
|
|
|
@ -383,8 +383,8 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
|
|||
ex.setMessage("Invalid scanner Id");
|
||||
throw ex;
|
||||
}
|
||||
|
||||
try {
|
||||
connectionCache.updateConnectionAccessTime();
|
||||
return resultsFromHBase(scanner.next(numRows));
|
||||
} catch (IOException e) {
|
||||
throw getTIOError(e);
|
||||
|
|
|
@ -589,6 +589,57 @@ public class TestThriftHBaseServiceHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests keeping a HBase scanner alive for long periods of time. Each call to getScannerRow()
|
||||
* should reset the ConnectionCache timeout for the scanner's connection
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testLongLivedScan() throws Exception {
|
||||
int numTrials = 6;
|
||||
int trialPause = 1000;
|
||||
int cleanUpInterval = 100;
|
||||
Configuration conf = new Configuration(UTIL.getConfiguration());
|
||||
// Set the ConnectionCache timeout to trigger halfway through the trials
|
||||
conf.setInt(ThriftHBaseServiceHandler.MAX_IDLETIME, (numTrials / 2) * trialPause);
|
||||
conf.setInt(ThriftHBaseServiceHandler.CLEANUP_INTERVAL, cleanUpInterval);
|
||||
ThriftHBaseServiceHandler handler = new ThriftHBaseServiceHandler(conf,
|
||||
UserProvider.instantiate(conf));
|
||||
|
||||
ByteBuffer table = wrap(tableAname);
|
||||
// insert data
|
||||
TColumnValue columnValue = new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
||||
wrap(valueAname));
|
||||
List<TColumnValue> columnValues = new ArrayList<TColumnValue>();
|
||||
columnValues.add(columnValue);
|
||||
for (int i = 0; i < numTrials; i++) {
|
||||
TPut put = new TPut(wrap(("testScan" + i).getBytes()), columnValues);
|
||||
handler.put(table, put);
|
||||
}
|
||||
|
||||
// create scan instance
|
||||
TScan scan = new TScan();
|
||||
List<TColumn> columns = new ArrayList<TColumn>();
|
||||
TColumn column = new TColumn();
|
||||
column.setFamily(familyAname);
|
||||
column.setQualifier(qualifierAname);
|
||||
columns.add(column);
|
||||
scan.setColumns(columns);
|
||||
scan.setStartRow("testScan".getBytes());
|
||||
scan.setStopRow("testScan\uffff".getBytes());
|
||||
// Prevent the scanner from caching results
|
||||
scan.setCaching(1);
|
||||
|
||||
// get scanner and rows
|
||||
int scanId = handler.openScanner(table, scan);
|
||||
for (int i = 0; i < numTrials; i++) {
|
||||
// Make sure that the Scanner doesn't throw an exception after the ConnectionCache timeout
|
||||
List<TResult> results = handler.getScannerRows(scanId, 1);
|
||||
assertArrayEquals(("testScan" + i).getBytes(), results.get(0).getRow());
|
||||
Thread.sleep(trialPause);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReverseScan() throws Exception {
|
||||
ThriftHBaseServiceHandler handler = createHandler();
|
||||
|
|
Loading…
Reference in New Issue