HBASE-26629 Add expiration for long time vacant scanners in Thrift2 (#3984)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
1b8e577cc6
commit
c811acdcab
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.thrift2;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
|
||||
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
|
||||
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
|
||||
|
@ -50,9 +52,8 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
@ -97,6 +98,9 @@ import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
|
|||
import org.apache.hadoop.hbase.thrift2.generated.TTableName;
|
||||
import org.apache.hadoop.hbase.thrift2.generated.TThriftServerType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.RemovalListener;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -114,9 +118,8 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
|
|||
private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
|
||||
|
||||
// nextScannerId and scannerMap are used to manage scanner state
|
||||
// TODO: Cleanup thread for Scanners, Scanner id wrap
|
||||
private final AtomicInteger nextScannerId = new AtomicInteger(0);
|
||||
private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>();
|
||||
private final Cache<Integer, ResultScanner> scannerMap;
|
||||
|
||||
private static final IOException ioe
|
||||
= new DoNotRetryIOException("Thrift Server is in Read-only mode.");
|
||||
|
@ -160,7 +163,14 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
|
|||
public ThriftHBaseServiceHandler(final Configuration conf,
|
||||
final UserProvider userProvider) throws IOException {
|
||||
super(conf, userProvider);
|
||||
long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||
isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT);
|
||||
scannerMap = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS)
|
||||
.removalListener((RemovalListener<Integer, ResultScanner>) removalNotification ->
|
||||
removalNotification.getValue().close())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -212,16 +222,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
|
|||
* @return a Scanner, or null if the Id is invalid
|
||||
*/
|
||||
private ResultScanner getScanner(int id) {
|
||||
return scannerMap.get(id);
|
||||
return scannerMap.getIfPresent(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the scanner associated with the specified ID from the internal HashMap.
|
||||
* @param id of the Scanner to remove
|
||||
* @return the removed Scanner, or <code>null</code> if the Id is invalid
|
||||
*/
|
||||
protected ResultScanner removeScanner(int id) {
|
||||
return scannerMap.remove(id);
|
||||
protected void removeScanner(int id) {
|
||||
scannerMap.invalidate(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -466,18 +475,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
|
|||
return results;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException {
|
||||
LOG.debug("scannerClose: id=" + scannerId);
|
||||
ResultScanner scanner = getScanner(scannerId);
|
||||
if (scanner == null) {
|
||||
String message = "scanner ID is invalid";
|
||||
LOG.warn(message);
|
||||
TIllegalArgument ex = new TIllegalArgument();
|
||||
ex.setMessage("Invalid scanner Id");
|
||||
throw ex;
|
||||
LOG.warn("scanner ID: " + scannerId + "is invalid");
|
||||
// While the scanner could be already expired,
|
||||
// we should not throw exception here. Just log and return.
|
||||
return;
|
||||
}
|
||||
scanner.close();
|
||||
removeScanner(scannerId);
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.hbase.thrift2;
|
||||
|
||||
import static java.nio.ByteBuffer.wrap;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
|
||||
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.CLEANUP_INTERVAL;
|
||||
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.MAX_IDLETIME;
|
||||
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
|
||||
|
@ -1040,6 +1042,31 @@ public class TestThriftHBaseServiceHandler {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpiredScanner() throws Exception {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1000);
|
||||
ThriftHBaseServiceHandler handler =
|
||||
new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
|
||||
|
||||
TScan scan = new TScan();
|
||||
ByteBuffer table = wrap(tableAname);
|
||||
|
||||
int scannerId = handler.openScanner(table, scan);
|
||||
handler.getScannerRows(scannerId, 1);
|
||||
Thread.sleep(1000);
|
||||
|
||||
try {
|
||||
handler.getScannerRows(scannerId, 1);
|
||||
fail("The scanner should be expired and have an TIllegalArgument exception here.");
|
||||
} catch (TIllegalArgument e) {
|
||||
assertEquals("Invalid scanner Id", e.getMessage());
|
||||
} finally {
|
||||
conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
|
||||
DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutTTL() throws Exception {
|
||||
ThriftHBaseServiceHandler handler = createHandler();
|
||||
|
|
Loading…
Reference in New Issue