HBASE-16643 - Reverse scanner heap creation may not allow MSLAB closure
due to improper ref counting of segments (Ram)
This commit is contained in:
parent
db394f57de
commit
f196a8c331
|
@ -65,7 +65,7 @@ public class MemStoreCompactorIterator implements Iterator<Cell> {
|
|||
scanners.add(segment.getScanner(store.getSmallestReadPoint()));
|
||||
}
|
||||
|
||||
scanner = new MemStoreScanner(comparator, scanners, MemStoreScanner.Type.COMPACT_FORWARD);
|
||||
scanner = new MemStoreScanner(comparator, scanners, true);
|
||||
|
||||
// reinitialize the compacting scanner for each instance of iterator
|
||||
compactingScanner = createScanner(store, scanner);
|
||||
|
@ -101,7 +101,6 @@ public class MemStoreCompactorIterator implements Iterator<Cell> {
|
|||
public void close() {
|
||||
compactingScanner.close();
|
||||
compactingScanner = null;
|
||||
scanner.close();
|
||||
scanner = null;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -37,67 +39,90 @@ import org.apache.htrace.Trace;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||
/**
|
||||
* Types of cell MemStoreScanner
|
||||
*/
|
||||
static public enum Type {
|
||||
UNDEFINED,
|
||||
COMPACT_FORWARD,
|
||||
USER_SCAN_FORWARD,
|
||||
USER_SCAN_BACKWARD
|
||||
}
|
||||
|
||||
// heap of scanners used for traversing forward
|
||||
private KeyValueHeap forwardHeap;
|
||||
// reversed scanners heap for traversing backward
|
||||
private ReversedKeyValueHeap backwardHeap;
|
||||
// heap of scanners, lazily initialized
|
||||
private KeyValueHeap heap;
|
||||
|
||||
// The type of the scan is defined by constructor
|
||||
// or according to the first usage
|
||||
private Type type = Type.UNDEFINED;
|
||||
// indicates if the scanner is created for inmemoryCompaction
|
||||
private boolean inmemoryCompaction;
|
||||
|
||||
// remember the initial version of the scanners list
|
||||
List<KeyValueScanner> scanners;
|
||||
|
||||
private final CellComparator comparator;
|
||||
|
||||
private boolean closed;
|
||||
|
||||
/**
|
||||
* If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
|
||||
* After constructor only one heap is going to be initialized for entire lifespan
|
||||
* of the MemStoreScanner. A specific scanner can only be one directional!
|
||||
*
|
||||
* Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan
|
||||
* and the heap is lazily initialized
|
||||
* @param comparator Cell Comparator
|
||||
* @param scanners List of scanners, from which the heap will be built
|
||||
* @param type The scan type COMPACT_FORWARD should be used for compaction
|
||||
* @param scanners List of scanners, from which the heap will be built
|
||||
* @param inmemoryCompaction true if used for inmemoryCompaction.
|
||||
* In this case, creates a forward heap always.
|
||||
*/
|
||||
public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners, Type type)
|
||||
throws IOException {
|
||||
public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners,
|
||||
boolean inmemoryCompaction) throws IOException {
|
||||
super();
|
||||
this.type = type;
|
||||
switch (type) {
|
||||
case UNDEFINED:
|
||||
case USER_SCAN_FORWARD:
|
||||
case COMPACT_FORWARD:
|
||||
this.forwardHeap = new KeyValueHeap(scanners, comparator);
|
||||
break;
|
||||
case USER_SCAN_BACKWARD:
|
||||
this.backwardHeap = new ReversedKeyValueHeap(scanners, comparator);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner");
|
||||
}
|
||||
this.comparator = comparator;
|
||||
this.scanners = scanners;
|
||||
if (Trace.isTracing() && Trace.currentSpan() != null) {
|
||||
Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
|
||||
}
|
||||
this.inmemoryCompaction = inmemoryCompaction;
|
||||
if (inmemoryCompaction) {
|
||||
// init the forward scanner in case of inmemoryCompaction
|
||||
initForwardKVHeapIfNeeded(comparator, scanners);
|
||||
}
|
||||
}
|
||||
|
||||
/* Constructor used only when the scan usage is unknown
|
||||
and need to be defined according to the first move */
|
||||
/**
|
||||
* Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan
|
||||
* and the heap is lazily initialized
|
||||
* @param comparator Cell Comparator
|
||||
* @param scanners List of scanners, from which the heap will be built
|
||||
*/
|
||||
public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
this(comparator, scanners, Type.UNDEFINED);
|
||||
this(comparator, scanners, false);
|
||||
}
|
||||
|
||||
private void initForwardKVHeapIfNeeded(CellComparator comparator, List<KeyValueScanner> scanners)
|
||||
throws IOException {
|
||||
if (heap == null) {
|
||||
// lazy init
|
||||
// In a normal scan case, at the StoreScanner level before the KVHeap is
|
||||
// created we do a seek or reseek. So that will happen
|
||||
// on all the scanners that the StoreScanner is
|
||||
// made of. So when we get any of those call to this scanner we init the
|
||||
// heap here with normal forward KVHeap.
|
||||
this.heap = new KeyValueHeap(scanners, comparator);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean initReverseKVHeapIfNeeded(Cell seekKey, CellComparator comparator,
|
||||
List<KeyValueScanner> scanners) throws IOException {
|
||||
boolean res = false;
|
||||
if (heap == null) {
|
||||
// lazy init
|
||||
// In a normal reverse scan case, at the ReversedStoreScanner level before the
|
||||
// ReverseKeyValueheap is
|
||||
// created we do a seekToLastRow or backwardSeek. So that will happen
|
||||
// on all the scanners that the ReversedStoreSCanner is
|
||||
// made of. So when we get any of those call to this scanner we init the
|
||||
// heap here with ReversedKVHeap.
|
||||
if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
res |= scanner.seekToLastRow();
|
||||
}
|
||||
} else {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
res |= scanner.backwardSeek(seekKey);
|
||||
}
|
||||
}
|
||||
this.heap = new ReversedKeyValueHeap(scanners, comparator);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -105,30 +130,29 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
* The backward traversal is assumed, only if specified explicitly
|
||||
*/
|
||||
@Override
|
||||
public synchronized Cell peek() {
|
||||
if (type == Type.USER_SCAN_BACKWARD) {
|
||||
return backwardHeap.peek();
|
||||
public Cell peek() {
|
||||
if (this.heap != null) {
|
||||
return this.heap.peek();
|
||||
}
|
||||
return forwardHeap.peek();
|
||||
// Doing this way in case some test cases tries to peek directly to avoid NPE
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the next cell from the top-most scanner. Assumed forward scanning.
|
||||
*/
|
||||
@Override
|
||||
public synchronized Cell next() throws IOException {
|
||||
KeyValueHeap heap = (Type.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap;
|
||||
|
||||
// loop over till the next suitable value
|
||||
// take next value from the heap
|
||||
for (Cell currentCell = heap.next();
|
||||
currentCell != null;
|
||||
currentCell = heap.next()) {
|
||||
|
||||
// all the logic of presenting cells is inside the internal KeyValueScanners
|
||||
// located inside the heap
|
||||
|
||||
return currentCell;
|
||||
public Cell next() throws IOException {
|
||||
if(this.heap != null) {
|
||||
// loop over till the next suitable value
|
||||
// take next value from the heap
|
||||
for (Cell currentCell = heap.next();
|
||||
currentCell != null;
|
||||
currentCell = heap.next()) {
|
||||
// all the logic of presenting cells is inside the internal KeyValueScanners
|
||||
// located inside the heap
|
||||
return currentCell;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -142,15 +166,15 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
* @return false if the key is null or if there is no data
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean seek(Cell cell) throws IOException {
|
||||
assertForward();
|
||||
public boolean seek(Cell cell) throws IOException {
|
||||
initForwardKVHeapIfNeeded(comparator, scanners);
|
||||
|
||||
if (cell == null) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
|
||||
return forwardHeap.seek(cell);
|
||||
return heap.seek(cell);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,7 +184,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
* @return true if there is at least one KV to read, false otherwise
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean reseek(Cell cell) throws IOException {
|
||||
public boolean reseek(Cell cell) throws IOException {
|
||||
/*
|
||||
* See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
|
||||
* This code is executed concurrently with flush and puts, without locks.
|
||||
|
@ -175,8 +199,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
*
|
||||
* TODO: The above comment copied from the original MemStoreScanner
|
||||
*/
|
||||
assertForward();
|
||||
return forwardHeap.reseek(cell);
|
||||
initForwardKVHeapIfNeeded(comparator, scanners);
|
||||
return heap.reseek(cell);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -190,22 +214,21 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
|
||||
if (forwardHeap != null) {
|
||||
assert ((type == Type.USER_SCAN_FORWARD) ||
|
||||
(type == Type.COMPACT_FORWARD) || (type == Type.UNDEFINED));
|
||||
forwardHeap.close();
|
||||
forwardHeap = null;
|
||||
if (backwardHeap != null) {
|
||||
backwardHeap.close();
|
||||
backwardHeap = null;
|
||||
}
|
||||
} else if (backwardHeap != null) {
|
||||
assert (type == Type.USER_SCAN_BACKWARD);
|
||||
backwardHeap.close();
|
||||
backwardHeap = null;
|
||||
public void close() {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
// Ensuring that all the segment scanners are closed
|
||||
if (heap != null) {
|
||||
heap.close();
|
||||
// It is safe to do close as no new calls will be made to this scanner.
|
||||
heap = null;
|
||||
} else {
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
closed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -215,9 +238,11 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
* @return false if the key is null or if there is no data
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean backwardSeek(Cell cell) throws IOException {
|
||||
initBackwardHeapIfNeeded(cell, false);
|
||||
return backwardHeap.backwardSeek(cell);
|
||||
public boolean backwardSeek(Cell cell) throws IOException {
|
||||
// The first time when this happens it sets the scanners to the seek key
|
||||
// passed by the incoming scan's start row
|
||||
initReverseKVHeapIfNeeded(cell, comparator, scanners);
|
||||
return heap.backwardSeek(cell);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -227,22 +252,17 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
* @return false if the key is null or if there is no data
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean seekToPreviousRow(Cell cell) throws IOException {
|
||||
initBackwardHeapIfNeeded(cell, false);
|
||||
if (backwardHeap.peek() == null) {
|
||||
public boolean seekToPreviousRow(Cell cell) throws IOException {
|
||||
initReverseKVHeapIfNeeded(cell, comparator, scanners);
|
||||
if (heap.peek() == null) {
|
||||
restartBackwardHeap(cell);
|
||||
}
|
||||
return backwardHeap.seekToPreviousRow(cell);
|
||||
return heap.seekToPreviousRow(cell);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean seekToLastRow() throws IOException {
|
||||
// TODO: it looks like this is how it should be, however ReversedKeyValueHeap class doesn't
|
||||
// implement seekToLastRow() method :(
|
||||
// however seekToLastRow() was implemented in internal MemStoreScanner
|
||||
// so I wonder whether we need to come with our own workaround, or to update
|
||||
// ReversedKeyValueHeap
|
||||
return initBackwardHeapIfNeeded(KeyValue.LOWESTKEY, true);
|
||||
public boolean seekToLastRow() throws IOException {
|
||||
return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -250,9 +270,9 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
* @return False if the key definitely does not exist in this Memstore
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
|
||||
|
||||
if (type == Type.COMPACT_FORWARD) {
|
||||
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
|
||||
// TODO : Check if this can be removed.
|
||||
if (inmemoryCompaction) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -286,58 +306,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
|||
for (KeyValueScanner scan : scanners) {
|
||||
res |= scan.seekToPreviousRow(cell);
|
||||
}
|
||||
this.backwardHeap =
|
||||
this.heap =
|
||||
new ReversedKeyValueHeap(scanners, comparator);
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the type of the scan suits the assumption of moving backward
|
||||
*/
|
||||
private boolean initBackwardHeapIfNeeded(Cell cell, boolean toLast) throws IOException {
|
||||
boolean res = false;
|
||||
if (toLast && (type != Type.UNDEFINED)) {
|
||||
throw new IllegalStateException(
|
||||
"Wrong usage of initBackwardHeapIfNeeded in parameters. The type is:" + type.toString());
|
||||
}
|
||||
if (type == Type.UNDEFINED) {
|
||||
// In case we started from peek, release the forward heap
|
||||
// and build backward. Set the correct type. Thus this turn
|
||||
// can happen only once
|
||||
if ((backwardHeap == null) && (forwardHeap != null)) {
|
||||
forwardHeap.close();
|
||||
forwardHeap = null;
|
||||
// before building the heap seek for the relevant key on the scanners,
|
||||
// for the heap to be built from the scanners correctly
|
||||
for (KeyValueScanner scan : scanners) {
|
||||
if (toLast) {
|
||||
res |= scan.seekToLastRow();
|
||||
} else {
|
||||
res |= scan.backwardSeek(cell);
|
||||
}
|
||||
}
|
||||
this.backwardHeap =
|
||||
new ReversedKeyValueHeap(scanners, comparator);
|
||||
type = Type.USER_SCAN_BACKWARD;
|
||||
}
|
||||
}
|
||||
|
||||
if (type == Type.USER_SCAN_FORWARD) {
|
||||
throw new IllegalStateException("Traversing backward with forward scan");
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the type of the scan suits the assumption of moving forward
|
||||
*/
|
||||
private void assertForward() throws IllegalStateException {
|
||||
if (type == Type.UNDEFINED) {
|
||||
type = Type.USER_SCAN_FORWARD;
|
||||
}
|
||||
|
||||
if (type == Type.USER_SCAN_BACKWARD) {
|
||||
throw new IllegalStateException("Traversing forward with backward scan");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,11 +67,11 @@ public class SegmentScanner implements KeyValueScanner {
|
|||
protected SegmentScanner(Segment segment, long readPoint, long scannerOrder) {
|
||||
this.segment = segment;
|
||||
this.readPoint = readPoint;
|
||||
//increase the reference count so the underlying structure will not be de-allocated
|
||||
this.segment.incScannerCount();
|
||||
iter = segment.iterator();
|
||||
// the initialization of the current is required for working with heap of SegmentScanners
|
||||
current = getNext();
|
||||
//increase the reference count so the underlying structure will not be de-allocated
|
||||
this.segment.incScannerCount();
|
||||
this.scannerOrder = scannerOrder;
|
||||
}
|
||||
|
||||
|
|
|
@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
|||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
|
||||
|
|
|
@ -22,12 +22,9 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -36,9 +33,6 @@ import org.junit.Test;
|
|||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.MemoryMXBean;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
|
@ -281,13 +275,14 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
|
|||
Threads.sleep(10);
|
||||
}
|
||||
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
|
||||
MemStoreScanner scanner = new MemStoreScanner(CellComparator.COMPARATOR, scanners);
|
||||
// seek
|
||||
scanners.get(0).seek(KeyValue.LOWESTKEY);
|
||||
int count = 0;
|
||||
while (scanner.next() != null) {
|
||||
while (scanners.get(0).next() != null) {
|
||||
count++;
|
||||
}
|
||||
assertEquals("the count should be ", count, 150);
|
||||
scanner.close();
|
||||
scanners.get(0).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -345,17 +340,4 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
|
|||
}
|
||||
regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().size() - size);//
|
||||
}
|
||||
|
||||
private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
|
||||
long t = 1234;
|
||||
|
||||
@Override public long currentTime() {
|
||||
return t;
|
||||
}
|
||||
|
||||
public void setCurrentTimeMillis(long t) {
|
||||
this.t = t;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TestMobSnapshotCloneIndependence;
|
||||
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
|
||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
||||
|
@ -103,7 +102,6 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
|
|||
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.master.procedure.TestMasterFailoverWithProcedures;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||
|
|
Loading…
Reference in New Issue