HBASE-16143 Change MemstoreScanner constructor to accept
List<KeyValueScanner> (Ram)
This commit is contained in:
parent
294c2dae9e
commit
9b1ecb31f0
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
@ -192,14 +191,6 @@ public abstract class AbstractMemStore implements MemStore {
|
||||||
return getActive().getSize();
|
return getActive().getSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a list containing a single memstore scanner.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
|
|
||||||
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(this, readPt));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getSnapshotSize() {
|
public long getSnapshotSize() {
|
||||||
return getSnapshot().getSize();
|
return getSnapshot().getSize();
|
||||||
|
@ -440,13 +431,6 @@ public abstract class AbstractMemStore implements MemStore {
|
||||||
*/
|
*/
|
||||||
protected abstract void checkActiveSize();
|
protected abstract void checkActiveSize();
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a list of Store segment scanners, one per each store segment
|
|
||||||
* @param readPt the version number required to initialize the scanners
|
|
||||||
* @return a list of Store segment scanners, one per each store segment
|
|
||||||
*/
|
|
||||||
protected abstract List<SegmentScanner> getListOfScanners(long readPt) throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an ordered list of segments from most recent to oldest in memstore
|
* Returns an ordered list of segments from most recent to oldest in memstore
|
||||||
* @return an ordered list of segments from most recent to oldest in memstore
|
* @return an ordered list of segments from most recent to oldest in memstore
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
@ -226,17 +228,20 @@ public class CompactingMemStore extends AbstractMemStore {
|
||||||
/*
|
/*
|
||||||
* Scanners are ordered from 0 (oldest) to newest in increasing order.
|
* Scanners are ordered from 0 (oldest) to newest in increasing order.
|
||||||
*/
|
*/
|
||||||
protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException {
|
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
|
||||||
List<Segment> pipelineList = pipeline.getSegments();
|
List<Segment> pipelineList = pipeline.getSegments();
|
||||||
long order = pipelineList.size();
|
long order = pipelineList.size();
|
||||||
LinkedList<SegmentScanner> list = new LinkedList<SegmentScanner>();
|
// The list of elements in pipeline + the active element + the snapshot segment
|
||||||
|
// TODO : This will change when the snapshot is made of more than one element
|
||||||
|
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
|
||||||
list.add(getActive().getSegmentScanner(readPt, order + 1));
|
list.add(getActive().getSegmentScanner(readPt, order + 1));
|
||||||
for (Segment item : pipelineList) {
|
for (Segment item : pipelineList) {
|
||||||
list.add(item.getSegmentScanner(readPt, order));
|
list.add(item.getSegmentScanner(readPt, order));
|
||||||
order--;
|
order--;
|
||||||
}
|
}
|
||||||
list.add(getSnapshot().getSegmentScanner(readPt, order));
|
list.add(getSnapshot().getSegmentScanner(readPt, order));
|
||||||
return list;
|
return Collections.<KeyValueScanner> singletonList(
|
||||||
|
new MemStoreScanner((AbstractMemStore) this, list, readPt));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.lang.management.RuntimeMXBean;
|
import java.lang.management.RuntimeMXBean;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -113,18 +114,19 @@ public class DefaultMemStore extends AbstractMemStore {
|
||||||
/*
|
/*
|
||||||
* Scanners are ordered from 0 (oldest) to newest in increasing order.
|
* Scanners are ordered from 0 (oldest) to newest in increasing order.
|
||||||
*/
|
*/
|
||||||
protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException {
|
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
|
||||||
List<SegmentScanner> list = new ArrayList<SegmentScanner>(2);
|
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(2);
|
||||||
list.add(0, getActive().getSegmentScanner(readPt, 1));
|
list.add(getActive().getSegmentScanner(readPt, 1));
|
||||||
list.add(1, getSnapshot().getSegmentScanner(readPt, 0));
|
list.add(getSnapshot().getSegmentScanner(readPt, 0));
|
||||||
return list;
|
return Collections.<KeyValueScanner> singletonList(
|
||||||
|
new MemStoreScanner((AbstractMemStore) this, list, readPt));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<Segment> getSegments() throws IOException {
|
protected List<Segment> getSegments() throws IOException {
|
||||||
List<Segment> list = new ArrayList<Segment>(2);
|
List<Segment> list = new ArrayList<Segment>(2);
|
||||||
list.add(0, getActive());
|
list.add(getActive());
|
||||||
list.add(1, getSnapshot());
|
list.add(getSnapshot());
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,7 @@ class MemStoreCompactor {
|
||||||
public boolean startCompaction() throws IOException {
|
public boolean startCompaction() throws IOException {
|
||||||
if (!compactingMemStore.hasCompactibleSegments()) return false; // no compaction on empty
|
if (!compactingMemStore.hasCompactibleSegments()) return false; // no compaction on empty
|
||||||
|
|
||||||
List<SegmentScanner> scanners = new ArrayList<SegmentScanner>();
|
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
|
||||||
// get the list of segments from the pipeline
|
// get the list of segments from the pipeline
|
||||||
versionedList = compactingMemStore.getCompactibleSegments();
|
versionedList = compactingMemStore.getCompactibleSegments();
|
||||||
// the list is marked with specific version
|
// the list is marked with specific version
|
||||||
|
|
|
@ -57,35 +57,39 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
|
|
||||||
private long readPoint;
|
private long readPoint;
|
||||||
// remember the initial version of the scanners list
|
// remember the initial version of the scanners list
|
||||||
List<SegmentScanner> scanners;
|
List<KeyValueScanner> scanners;
|
||||||
// pointer back to the relevant MemStore
|
// pointer back to the relevant MemStore
|
||||||
// is needed for shouldSeek() method
|
// is needed for shouldSeek() method
|
||||||
private AbstractMemStore backwardReferenceToMemStore;
|
private AbstractMemStore backwardReferenceToMemStore;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
|
||||||
* If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
|
* If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
|
||||||
* After constructor only one heap is going to be initialized for entire lifespan
|
* After constructor only one heap is going to be initialized for entire lifespan
|
||||||
* of the MemStoreScanner. A specific scanner can only be one directional!
|
* of the MemStoreScanner. A specific scanner can only be one directional!
|
||||||
*
|
*
|
||||||
* @param ms Pointer back to the MemStore
|
* @param ms Pointer back to the MemStore
|
||||||
* @param readPoint Read point below which we can safely remove duplicate KVs
|
* @param scanners List of scanners over the segments
|
||||||
|
* @param readPt Read point below which we can safely remove duplicate KVs
|
||||||
|
*/
|
||||||
|
public MemStoreScanner(AbstractMemStore ms, List<KeyValueScanner> scanners, long readPt)
|
||||||
|
throws IOException {
|
||||||
|
this(ms, scanners, readPt, Type.UNDEFINED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
|
||||||
|
* After constructor only one heap is going to be initialized for entire lifespan
|
||||||
|
* of the MemStoreScanner. A specific scanner can only be one directional!
|
||||||
|
*
|
||||||
|
* @param ms Pointer back to the MemStore
|
||||||
|
* @param scanners List of scanners over the segments
|
||||||
|
* @param readPt Read point below which we can safely remove duplicate KVs
|
||||||
* @param type The scan type COMPACT_FORWARD should be used for compaction
|
* @param type The scan type COMPACT_FORWARD should be used for compaction
|
||||||
*/
|
*/
|
||||||
public MemStoreScanner(AbstractMemStore ms, long readPoint, Type type) throws IOException {
|
public MemStoreScanner(AbstractMemStore ms, List<KeyValueScanner> scanners, long readPt,
|
||||||
this(ms, ms.getListOfScanners(readPoint), readPoint, type);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Constructor used only when the scan usage is unknown
|
|
||||||
and need to be defined according to the first move */
|
|
||||||
public MemStoreScanner(AbstractMemStore ms, long readPt) throws IOException {
|
|
||||||
this(ms, readPt, Type.UNDEFINED);
|
|
||||||
}
|
|
||||||
|
|
||||||
public MemStoreScanner(AbstractMemStore ms, List<SegmentScanner> scanners, long readPoint,
|
|
||||||
Type type) throws IOException {
|
Type type) throws IOException {
|
||||||
super();
|
super();
|
||||||
this.readPoint = readPoint;
|
this.readPoint = readPt;
|
||||||
this.type = type;
|
this.type = type;
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case UNDEFINED:
|
case UNDEFINED:
|
||||||
|
@ -262,8 +266,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (SegmentScanner sc : scanners) {
|
for (KeyValueScanner sc : scanners) {
|
||||||
if (sc.shouldSeek(scan, oldestUnexpiredTS)) {
|
if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -275,7 +279,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuffer buf = new StringBuffer();
|
StringBuffer buf = new StringBuffer();
|
||||||
int i = 1;
|
int i = 1;
|
||||||
for (SegmentScanner scanner : scanners) {
|
for (KeyValueScanner scanner : scanners) {
|
||||||
buf.append("scanner (" + i + ") " + scanner.toString() + " ||| ");
|
buf.append("scanner (" + i + ") " + scanner.toString() + " ||| ");
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
@ -289,7 +293,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
*/
|
*/
|
||||||
private boolean restartBackwardHeap(Cell cell) throws IOException {
|
private boolean restartBackwardHeap(Cell cell) throws IOException {
|
||||||
boolean res = false;
|
boolean res = false;
|
||||||
for (SegmentScanner scan : scanners) {
|
for (KeyValueScanner scan : scanners) {
|
||||||
res |= scan.seekToPreviousRow(cell);
|
res |= scan.seekToPreviousRow(cell);
|
||||||
}
|
}
|
||||||
this.backwardHeap =
|
this.backwardHeap =
|
||||||
|
@ -315,7 +319,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner {
|
||||||
forwardHeap = null;
|
forwardHeap = null;
|
||||||
// before building the heap seek for the relevant key on the scanners,
|
// before building the heap seek for the relevant key on the scanners,
|
||||||
// for the heap to be built from the scanners correctly
|
// for the heap to be built from the scanners correctly
|
||||||
for (SegmentScanner scan : scanners) {
|
for (KeyValueScanner scan : scanners) {
|
||||||
if (toLast) {
|
if (toLast) {
|
||||||
res |= scan.seekToLastRow();
|
res |= scan.seekToLastRow();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -243,7 +243,7 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
|
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
|
||||||
return true;
|
return getSegment().shouldSeek(scan,oldestUnexpiredTS);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* This scanner is working solely on the in-memory MemStore therefore this
|
* This scanner is working solely on the in-memory MemStore therefore this
|
||||||
|
@ -305,14 +305,6 @@ public class SegmentScanner implements KeyValueScanner {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns whether the given scan should seek in this segment
|
|
||||||
* @return whether the given scan should seek in this segment
|
|
||||||
*/
|
|
||||||
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
|
|
||||||
return getSegment().shouldSeek(scan,oldestUnexpiredTS);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Segment getSegment(){
|
protected Segment getSegment(){
|
||||||
return segment;
|
return segment;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue