HBASE-21551 Memory leak when use scan with STREAM at server side

This commit is contained in:
huzheng 2018-12-05 22:57:49 +08:00
parent f49baf259e
commit 3b854859f6
3 changed files with 55 additions and 1 deletions

View File

@ -126,7 +126,8 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
private final AtomicInteger refCount = new AtomicInteger(0); private final AtomicInteger refCount = new AtomicInteger(0);
// Set implementation must be of concurrent type // Set implementation must be of concurrent type
private final Set<StoreFileReader> streamReaders; @VisibleForTesting
final Set<StoreFileReader> streamReaders;
private final boolean noReadahead; private final boolean noReadahead;

View File

@ -186,6 +186,9 @@ public class StoreFileReader {
if (!shared) { if (!shared) {
try { try {
reader.close(false); reader.close(false);
if (this.listener != null) {
this.listener.storeFileReaderClosed(this);
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("failed to close stream reader", e); LOG.warn("failed to close stream reader", e);
} }

View File

@ -23,8 +23,13 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -33,6 +38,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore; import org.junit.Ignore;
@ -98,6 +105,49 @@ public class TestSwitchToStreamRead {
UTIL.cleanupTestDir(); UTIL.cleanupTestDir();
} }
private Set<StoreFileReader> getStreamReaders() {
List<HStore> stores = REGION.getStores();
Assert.assertEquals(1, stores.size());
HStore firstStore = stores.get(0);
Assert.assertNotNull(firstStore);
Collection<HStoreFile> storeFiles = firstStore.getStorefiles();
Assert.assertEquals(1, storeFiles.size());
HStoreFile firstSToreFile = storeFiles.iterator().next();
Assert.assertNotNull(firstSToreFile);
return Collections.unmodifiableSet(firstSToreFile.streamReaders);
}
/**
* Test Case for HBASE-21551
*/
@Test
public void testStreamReadersCleanup() throws IOException {
Set<StoreFileReader> streamReaders = getStreamReaders();
Assert.assertEquals(0, getStreamReaders().size());
try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) {
StoreScanner storeScanner =
(StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream()
.filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs)
.collect(Collectors.toList());
Assert.assertEquals(1, sfScanners.size());
StoreFileScanner sfScanner = sfScanners.get(0);
Assert.assertFalse(sfScanner.getReader().shared);
// There should be a stream reader
Assert.assertEquals(1, getStreamReaders().size());
}
Assert.assertEquals(0, getStreamReaders().size());
// The streamsReader should be clear after region close even if there're some opened stream
// scanner.
RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM));
Assert.assertNotNull(scanner);
Assert.assertEquals(1, getStreamReaders().size());
REGION.close();
Assert.assertEquals(0, streamReaders.size());
}
@Test @Test
public void test() throws IOException { public void test() throws IOException {
try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) { try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {