HBASE-21551 Memory leak when use scan with STREAM at server side
This commit is contained in:
parent
96f8e0cbe9
commit
1a1a65b565
|
@ -126,7 +126,8 @@ public class HStoreFile implements StoreFile, StoreFileReader.Listener {
|
|||
private final AtomicInteger refCount = new AtomicInteger(0);
|
||||
|
||||
// Set implementation must be of concurrent type
|
||||
private final Set<StoreFileReader> streamReaders;
|
||||
@VisibleForTesting
|
||||
final Set<StoreFileReader> streamReaders;
|
||||
|
||||
private final boolean noReadahead;
|
||||
|
||||
|
|
|
@ -186,6 +186,9 @@ public class StoreFileReader {
|
|||
if (!shared) {
|
||||
try {
|
||||
reader.close(false);
|
||||
if (this.listener != null) {
|
||||
this.listener.storeFileReaderClosed(this);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("failed to close stream reader", e);
|
||||
}
|
||||
|
|
|
@ -23,8 +23,13 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -33,6 +38,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
|
@ -41,8 +47,9 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
|||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
@ -67,8 +74,8 @@ public class TestSwitchToStreamRead {
|
|||
|
||||
private static HRegion REGION;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws IOException {
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
|
||||
StringBuilder sb = new StringBuilder(256);
|
||||
for (int i = 0; i < 255; i++) {
|
||||
|
@ -92,12 +99,55 @@ public class TestSwitchToStreamRead {
|
|||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws IOException {
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
REGION.close(true);
|
||||
UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
private Set<StoreFileReader> getStreamReaders() {
|
||||
List<HStore> stores = REGION.getStores();
|
||||
Assert.assertEquals(1, stores.size());
|
||||
HStore firstStore = stores.get(0);
|
||||
Assert.assertNotNull(firstStore);
|
||||
Collection<HStoreFile> storeFiles = firstStore.getStorefiles();
|
||||
Assert.assertEquals(1, storeFiles.size());
|
||||
HStoreFile firstSToreFile = storeFiles.iterator().next();
|
||||
Assert.assertNotNull(firstSToreFile);
|
||||
return Collections.unmodifiableSet(firstSToreFile.streamReaders);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Case for HBASE-21551
|
||||
*/
|
||||
@Test
|
||||
public void testStreamReadersCleanup() throws IOException {
|
||||
Set<StoreFileReader> streamReaders = getStreamReaders();
|
||||
Assert.assertEquals(0, getStreamReaders().size());
|
||||
try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) {
|
||||
StoreScanner storeScanner =
|
||||
(StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
|
||||
List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream()
|
||||
.filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs)
|
||||
.collect(Collectors.toList());
|
||||
Assert.assertEquals(1, sfScanners.size());
|
||||
StoreFileScanner sfScanner = sfScanners.get(0);
|
||||
Assert.assertFalse(sfScanner.getReader().shared);
|
||||
|
||||
// There should be a stream reader
|
||||
Assert.assertEquals(1, getStreamReaders().size());
|
||||
}
|
||||
Assert.assertEquals(0, getStreamReaders().size());
|
||||
|
||||
// The streamsReader should be clear after region close even if there're some opened stream
|
||||
// scanner.
|
||||
RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM));
|
||||
Assert.assertNotNull(scanner);
|
||||
Assert.assertEquals(1, getStreamReaders().size());
|
||||
REGION.close();
|
||||
Assert.assertEquals(0, streamReaders.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
|
||||
|
|
Loading…
Reference in New Issue