HBASE-18019 Close redundant memstore scanners

This commit is contained in:
Chia-Ping Tsai 2017-05-18 16:07:21 +08:00
parent 62d7323023
commit 32d2062b5c
2 changed files with 79 additions and 7 deletions

View File

@ -146,10 +146,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private volatile boolean flushed = false;
// generally we get one file from a flush
private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
// generally we get one memstore scanner from a flush
private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
// Since CompactingMemstore is now default, we get three memstore scanners from a flush
private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3);
// The current list of scanners
private final List<KeyValueScanner> currentScanners = new ArrayList<>();
@VisibleForTesting
final List<KeyValueScanner> currentScanners = new ArrayList<>();
// flush update lock
private final ReentrantLock flushLock = new ReentrantLock();
@ -876,9 +877,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Seek the new scanners to the last key
seekScanners(scanners, lastTop, false, parallelSeekEnabled);
// remove the older memstore scanner
for (int i = 0; i < currentScanners.size(); i++) {
for (int i = currentScanners.size() - 1; i >=0; i--) {
if (!currentScanners.get(i).isFileScanner()) {
currentScanners.remove(i).close();
} else {
// we add the memstore scanner to the end of currentScanners
break;
}
}

View File

@ -32,6 +32,7 @@ import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@ -44,9 +45,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.IntStream;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -958,6 +957,76 @@ public class TestStore {
verify(spiedStore, times(0)).replaceStoreFiles(null, null);
}
private long countMemStoreScanner(StoreScanner scanner) {
if (scanner.currentScanners == null) {
return 0;
}
return scanner.currentScanners.stream()
.filter(s -> !s.isFileScanner())
.count();
}
@Test
public void testNumberOfMemStoreScannersAfterFlush() throws IOException {
long seqId = 100;
long timestamp = System.currentTimeMillis();
Cell cell0 = CellUtil.createCell(row, family, qf1, timestamp,
KeyValue.Type.Put.getCode(), qf1);
CellUtil.setSequenceId(cell0, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Collections.EMPTY_LIST);
Cell cell1 = CellUtil.createCell(row, family, qf2, timestamp,
KeyValue.Type.Put.getCode(), qf1);
CellUtil.setSequenceId(cell1, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1));
seqId = 101;
timestamp = System.currentTimeMillis();
Cell cell2 = CellUtil.createCell(row2, family, qf2, timestamp,
KeyValue.Type.Put.getCode(), qf1);
CellUtil.setSequenceId(cell2, seqId);
testNumberOfMemStoreScannersAfterFlush(Arrays.asList(cell0), Arrays.asList(cell1, cell2));
}
private void testNumberOfMemStoreScannersAfterFlush(List<Cell> inputCellsBeforeSnapshot,
List<Cell> inputCellsAfterSnapshot) throws IOException {
init(this.name.getMethodName() + "-" + inputCellsBeforeSnapshot.size());
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
long seqId = Long.MIN_VALUE;
for (Cell c : inputCellsBeforeSnapshot) {
quals.add(CellUtil.cloneQualifier(c));
seqId = Math.max(seqId, c.getSequenceId());
}
for (Cell c : inputCellsAfterSnapshot) {
quals.add(CellUtil.cloneQualifier(c));
seqId = Math.max(seqId, c.getSequenceId());
}
inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
storeFlushCtx.prepare();
inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
int numberOfMemScannersWhenScaning = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
try (StoreScanner s = (StoreScanner) store.getScanner(new Scan(), quals, seqId)) {
// snaptshot + active (if it isn't empty)
assertEquals(numberOfMemScannersWhenScaning, countMemStoreScanner(s));
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
boolean more;
int cellCount = 0;
do {
List<Cell> cells = new ArrayList<>();
more = s.next(cells);
cellCount += cells.size();
assertEquals(more ? numberOfMemScannersWhenScaning : 0, countMemStoreScanner(s));
} while (more);
assertEquals("The number of cells added before snapshot is " + inputCellsBeforeSnapshot.size()
+ ", The number of cells added after snapshot is " + inputCellsAfterSnapshot.size(),
inputCellsBeforeSnapshot.size() + inputCellsAfterSnapshot.size(), cellCount);
// the current scanners is cleared
assertEquals(0, countMemStoreScanner(s));
}
}
private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws IOException {
Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(), value);
CellUtil.setSequenceId(c, sequenceId);