HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#3984)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Yutong Xiao 2022-01-02 21:19:31 +08:00 committed by Duo Zhang
parent 4d5fe404f4
commit 9059f90ca3
2 changed files with 48 additions and 15 deletions

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.thrift2;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
@ -50,9 +52,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@ -93,6 +94,9 @@ import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
import org.apache.hadoop.hbase.thrift2.generated.TTableName;
import org.apache.hadoop.hbase.thrift2.generated.TThriftServerType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.RemovalListener;
import org.apache.thrift.TException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -110,9 +114,8 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
// nextScannerId and scannerMap are used to manage scanner state
// TODO: Cleanup thread for Scanners, Scanner id wrap
private final AtomicInteger nextScannerId = new AtomicInteger(0);
private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>();
private final Cache<Integer, ResultScanner> scannerMap;
private static final IOException ioe
= new DoNotRetryIOException("Thrift Server is in Read-only mode.");
@ -156,7 +159,14 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
public ThriftHBaseServiceHandler(final Configuration conf,
final UserProvider userProvider) throws IOException {
super(conf, userProvider);
long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT);
scannerMap = CacheBuilder.newBuilder()
.expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
.removalListener((RemovalListener<Integer, ResultScanner>) removalNotification ->
removalNotification.getValue().close())
.build();
}
@Override
@ -208,16 +218,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
* @return a Scanner, or null if the Id is invalid
*/
private ResultScanner getScanner(int id) {
return scannerMap.get(id);
return scannerMap.getIfPresent(id);
}
/**
* Removes the scanner associated with the specified ID from the internal HashMap.
* @param id of the Scanner to remove
* @return the removed Scanner, or <code>null</code> if the Id is invalid
*/
protected ResultScanner removeScanner(int id) {
return scannerMap.remove(id);
protected void removeScanner(int id) {
scannerMap.invalidate(id);
}
@Override
@ -461,18 +470,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
return results;
}
@Override
public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
LOG.debug("scannerClose: id=" + scannerId);
ResultScanner scanner = getScanner(scannerId);
if (scanner == null) {
String message = "scanner ID is invalid";
LOG.warn(message);
TIllegalArgument ex = new TIllegalArgument();
ex.setMessage("Invalid scanner Id");
throw ex;
LOG.warn("scanner ID: " + scannerId + "is invalid");
// While the scanner could be already expired,
// we should not throw exception here. Just log and return.
return;
}
scanner.close();
removeScanner(scannerId);

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.thrift2;
import static java.nio.ByteBuffer.wrap;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.CLEANUP_INTERVAL;
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.MAX_IDLETIME;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
@ -1021,6 +1023,31 @@ public class TestThriftHBaseServiceHandler {
}
}
@Test
public void testExpiredScanner() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1000);
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
TScan scan = new TScan();
ByteBuffer table = wrap(tableAname);
int scannerId = handler.openScanner(table, scan);
handler.getScannerRows(scannerId, 1);
Thread.sleep(1000);
try {
handler.getScannerRows(scannerId, 1);
fail("The scanner should be expired and have an TIllegalArgument exception here.");
} catch (TIllegalArgument e) {
assertEquals("Invalid scanner Id", e.getMessage());
} finally {
conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
}
}
@Test
public void testPutTTL() throws Exception {
ThriftHBaseServiceHandler handler = createHandler();