diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index 4aff949c6f7..9c94990f258 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -126,7 +126,8 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener { private final AtomicInteger refCount = new AtomicInteger(0); // Set implementation must be of concurrent type - private final Set streamReaders; + @VisibleForTesting + final Set streamReaders; private final boolean noReadahead; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index 3fbddf21a7d..d9008b2b1d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -186,6 +186,9 @@ public class StoreFileReader { if (!shared) { try { reader.close(false); + if (this.listener != null) { + this.listener.storeFileReaderClosed(this); + } } catch (IOException e) { LOG.warn("failed to close stream reader", e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java index 815643d441e..c1cecf8b590 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java @@ -23,8 +23,13 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -33,6 +38,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; @@ -41,8 +47,9 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Test; @@ -67,8 +74,8 @@ public class TestSwitchToStreamRead { private static HRegion REGION; - @BeforeClass - public static void setUp() throws IOException { + @Before + public void setUp() throws IOException { UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); StringBuilder sb = new StringBuilder(256); for (int i = 0; i < 255; i++) { @@ -92,12 +99,55 @@ public class TestSwitchToStreamRead { } } - @AfterClass - public static void tearDown() throws IOException { + @After + public void tearDown() throws IOException { REGION.close(true); UTIL.cleanupTestDir(); } + private Set getStreamReaders() { + List stores = REGION.getStores(); + Assert.assertEquals(1, stores.size()); + HStore firstStore = stores.get(0); + Assert.assertNotNull(firstStore); + Collection storeFiles = firstStore.getStorefiles(); + Assert.assertEquals(1, storeFiles.size()); + HStoreFile firstSToreFile = storeFiles.iterator().next(); + Assert.assertNotNull(firstSToreFile); + return Collections.unmodifiableSet(firstSToreFile.streamReaders); + } + + /** + * Test Case for HBASE-21551 + */ + @Test + public void testStreamReadersCleanup() throws IOException { + Set streamReaders = getStreamReaders(); + Assert.assertEquals(0, getStreamReaders().size()); + try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) { + StoreScanner storeScanner = + (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); + List sfScanners = storeScanner.getAllScannersForTesting().stream() + .filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs) + .collect(Collectors.toList()); + Assert.assertEquals(1, sfScanners.size()); + StoreFileScanner sfScanner = sfScanners.get(0); + Assert.assertFalse(sfScanner.getReader().shared); + + // There should be a stream reader + Assert.assertEquals(1, getStreamReaders().size()); + } + Assert.assertEquals(0, getStreamReaders().size()); + + // The streamsReader should be clear after region close even if there're some opened stream + // scanner. + RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM)); + Assert.assertNotNull(scanner); + Assert.assertEquals(1, getStreamReaders().size()); + REGION.close(); + Assert.assertEquals(0, streamReaders.size()); + } + @Test public void test() throws IOException { try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {