diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java index 8936a9f4844..a62e94b8c0a 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.thrift2; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift; @@ -50,9 +52,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -93,6 +94,9 @@ import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor; import org.apache.hadoop.hbase.thrift2.generated.TTableName; import org.apache.hadoop.hbase.thrift2.generated.TThriftServerType; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hbase.thirdparty.com.google.common.cache.Cache; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.RemovalListener; import org.apache.thrift.TException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -110,9 +114,8 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class); // nextScannerId and scannerMap are used to manage scanner state - // TODO: Cleanup thread for Scanners, Scanner id wrap private final AtomicInteger nextScannerId = new AtomicInteger(0); - private final Map scannerMap = new ConcurrentHashMap<>(); + private final Cache scannerMap; private static final IOException ioe = new DoNotRetryIOException("Thrift Server is in Read-only mode."); @@ -156,7 +159,14 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH public ThriftHBaseServiceHandler(final Configuration conf, final UserProvider userProvider) throws IOException { super(conf, userProvider); + long cacheTimeout = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT); + scannerMap = CacheBuilder.newBuilder() + .expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS) + .removalListener((RemovalListener) removalNotification -> + removalNotification.getValue().close()) + .build(); } @Override @@ -208,16 +218,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH * @return a Scanner, or null if the Id is invalid */ private ResultScanner getScanner(int id) { - return scannerMap.get(id); + return scannerMap.getIfPresent(id); } /** * Removes the scanner associated with the specified ID from the internal HashMap. * @param id of the Scanner to remove - * @return the removed Scanner, or null if the Id is invalid */ - protected ResultScanner removeScanner(int id) { - return scannerMap.remove(id); + protected void removeScanner(int id) { + scannerMap.invalidate(id); } @Override @@ -461,18 +470,15 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH return results; } - - @Override public void closeScanner(int scannerId) throws TIOError, TIllegalArgument, TException { LOG.debug("scannerClose: id=" + scannerId); ResultScanner scanner = getScanner(scannerId); if (scanner == null) { - String message = "scanner ID is invalid"; - LOG.warn(message); - TIllegalArgument ex = new TIllegalArgument(); - ex.setMessage("Invalid scanner Id"); - throw ex; + LOG.warn("scanner ID: " + scannerId + "is invalid"); + // While the scanner could be already expired, + // we should not throw exception here. Just log and return. + return; } scanner.close(); removeScanner(scannerId); diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java index 66ea65caa25..a0acb87df77 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.thrift2; import static java.nio.ByteBuffer.wrap; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.CLEANUP_INTERVAL; import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.MAX_IDLETIME; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift; @@ -1021,6 +1023,31 @@ public class TestThriftHBaseServiceHandler { } } + @Test + public void testExpiredScanner() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1000); + ThriftHBaseServiceHandler handler = + new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf)); + + TScan scan = new TScan(); + ByteBuffer table = wrap(tableAname); + + int scannerId = handler.openScanner(table, scan); + handler.getScannerRows(scannerId, 1); + Thread.sleep(1000); + + try { + handler.getScannerRows(scannerId, 1); + fail("The scanner should be expired and have an TIllegalArgument exception here."); + } catch (TIllegalArgument e) { + assertEquals("Invalid scanner Id", e.getMessage()); + } finally { + conf.setLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + } + } + @Test public void testPutTTL() throws Exception { ThriftHBaseServiceHandler handler = createHandler();