HBASE-22929 - MemStoreLAB ChunkCreator may memory leak(ram) (#614)
This commit is contained in:
parent
e68f45941a
commit
5c55fc6e3d
|
@ -272,6 +272,11 @@ public class MemStoreLABImpl implements MemStoreLAB {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getOpenScannerCount() {
|
||||||
|
return this.openScannerCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when opening a scanner on the data of this MemStoreLAB
|
* Called when opening a scanner on the data of this MemStoreLAB
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -896,12 +896,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// need for the updateReaders() to happen.
|
// need for the updateReaders() to happen.
|
||||||
LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
|
LOG.debug("StoreScanner already has the close lock. There is no need to updateReaders");
|
||||||
// no lock acquired.
|
// no lock acquired.
|
||||||
|
clearAndClose(memStoreScanners);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// lock acquired
|
// lock acquired
|
||||||
updateReaders = true;
|
updateReaders = true;
|
||||||
if (this.closing) {
|
if (this.closing) {
|
||||||
LOG.debug("StoreScanner already closing. There is no need to updateReaders");
|
LOG.debug("StoreScanner already closing. There is no need to updateReaders");
|
||||||
|
clearAndClose(memStoreScanners);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
flushed = true;
|
flushed = true;
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.util.NavigableSet;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
@ -113,6 +115,61 @@ public class TestStoreScannerClosure {
|
||||||
region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
|
region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScannerCloseAndUpdateReadersWithMemstoreScanner() throws Exception {
|
||||||
|
Put p = new Put(Bytes.toBytes("row"));
|
||||||
|
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
|
||||||
|
region.put(p);
|
||||||
|
// create the store scanner here.
|
||||||
|
// for easiness, use Long.MAX_VALUE as read pt
|
||||||
|
try (ExtendedStoreScanner scan = new ExtendedStoreScanner(region.getStore(fam), scanInfo,
|
||||||
|
new Scan(), getCols("q1"), Long.MAX_VALUE)) {
|
||||||
|
p = new Put(Bytes.toBytes("row1"));
|
||||||
|
p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
|
||||||
|
region.put(p);
|
||||||
|
HStore store = region.getStore(fam);
|
||||||
|
ReentrantReadWriteLock lock = store.lock;
|
||||||
|
// use the lock to manually get a new memstore scanner. this is what
|
||||||
|
// HStore#notifyChangedReadersObservers does under the lock.(lock is not needed here
|
||||||
|
//since it is just a testcase).
|
||||||
|
lock.readLock().lock();
|
||||||
|
final List<KeyValueScanner> memScanners = store.memstore.getScanners(Long.MAX_VALUE);
|
||||||
|
lock.readLock().unlock();
|
||||||
|
Thread closeThread = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
// close should be completed
|
||||||
|
scan.close(false, true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
closeThread.start();
|
||||||
|
Thread updateThread = new Thread() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
// use the updated memstoreScanners and pass it to updateReaders
|
||||||
|
scan.updateReaders(true, Collections.emptyList(), memScanners);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
updateThread.start();
|
||||||
|
// wait for close and updateThread to complete
|
||||||
|
closeThread.join();
|
||||||
|
updateThread.join();
|
||||||
|
MemStoreLAB memStoreLAB;
|
||||||
|
for (KeyValueScanner scanner : memScanners) {
|
||||||
|
if (scanner instanceof SegmentScanner) {
|
||||||
|
memStoreLAB = ((SegmentScanner) scanner).segment.getMemStoreLAB();
|
||||||
|
if (memStoreLAB != null) {
|
||||||
|
// There should be no unpooled chunks
|
||||||
|
int openScannerCount = ((MemStoreLABImpl) memStoreLAB).getOpenScannerCount();
|
||||||
|
assertTrue("The memstore should not have unpooled chunks", openScannerCount == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testScannerCloseAndUpdateReaders1() throws Exception {
|
public void testScannerCloseAndUpdateReaders1() throws Exception {
|
||||||
testScannerCloseAndUpdateReaderInternal(true, false);
|
testScannerCloseAndUpdateReaderInternal(true, false);
|
||||||
|
|
Loading…
Reference in New Issue