HBASE-22929 - MemStoreLAB ChunkCreator may memory leak(ram) (#614)

This commit is contained in:
ramkrish86 2019-09-13 10:01:46 +05:30 committed by GitHub
parent 3330e5c2e2
commit c0908d4be4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 0 deletions

View File

@ -270,6 +270,11 @@ public class MemStoreLABImpl implements MemStoreLAB {
} }
} }
@VisibleForTesting
int getOpenScannerCount() {
return this.openScannerCount.get();
}
/** /**
* Called when opening a scanner on the data of this MemStoreLAB * Called when opening a scanner on the data of this MemStoreLAB
*/ */

View File

@ -895,12 +895,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// need for the updateReaders() to happen. // need for the updateReaders() to happen.
LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders"); LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
// no lock acquired. // no lock acquired.
clearAndClose(memStoreScanners);
return; return;
} }
// lock acquired // lock acquired
updateReaders = true; updateReaders = true;
if (this.closing) { if (this.closing) {
LOG.debug("StoreScanner already closing. There is no need to updateReaders"); LOG.debug("StoreScanner already closing. There is no need to updateReaders");
clearAndClose(memStoreScanners);
return; return;
} }
flushed = true; flushed = true;

View File

@ -29,6 +29,7 @@ import java.util.NavigableSet;
import java.util.Random; import java.util.Random;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
@ -113,6 +115,61 @@ public class TestStoreScannerClosure {
region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd); region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
} }
@Test
public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception {
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
region.put(p);
// create the store scanner here.
// for easiness, use Long.MAX_VALUE as read pt
try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
new Scan(), getCols("q1"), Long.MAX_VALUE)) {
p = new Put(Bytes.toBytes("row1"));
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
region.put(p);
HStore store = region.getStore(fam);
ReentrantReadWriteLock lock = store.lock;
// use the lock to manually get a new memstore scanner. this is what
// HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
//since it is just a testcase).
lock.readLock().lock();
final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
lock.readLock().unlock();
Thread closeThread = new Thread() {
public void run() {
// close should be completed
scan.close(false, true);
}
};
closeThread.start();
Thread updateThread = new Thread() {
public void run() {
try {
// use the updated memstoreScanners and pass it to updateReaders
scan.updateReaders(true, Collections.emptyList(), memScanners);
} catch (IOException e) {
e.printStackTrace();
}
}
};
updateThread.start();
// wait for close and updateThread to complete
closeThread.join();
updateThread.join();
MemStoreLAB memStoreLAB;
for (KeyValueScanner scanner : memScanners) {
if (scanner instanceof SegmentScanner) {
memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
if (memStoreLAB != null) {
// There should be no unpooled chunks
int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0);
}
}
}
}
}
@Test @Test
public void testScannerCloseAndUpdateReaders1() throws Exception { public void testScannerCloseAndUpdateReaders1() throws Exception {
testScannerCloseAndUpdateReaderInternal(true, false); testScannerCloseAndUpdateReaderInternal(true, false);