diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java index 0f27e0e3214..d25f96065e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NavigableSet; @@ -192,14 +191,6 @@ public abstract class AbstractMemStore implements MemStore { return getActive().getSize(); } - /** - * @return a list containing a single memstore scanner. - */ - @Override - public List getScanners(long readPt) throws IOException { - return Collections. singletonList(new MemStoreScanner(this, readPt)); - } - @Override public long getSnapshotSize() { return getSnapshot().getSize(); @@ -440,13 +431,6 @@ public abstract class AbstractMemStore implements MemStore { */ protected abstract void checkActiveSize(); - /** - * Returns a list of Store segment scanners, one per each store segment - * @param readPt the version number required to initialize the scanners - * @return a list of Store segment scanners, one per each store segment - */ - protected abstract List getListOfScanners(long readPt) throws IOException; - /** * Returns an ordered list of segments from most recent to oldest in memstore * @return an ordered list of segments from most recent to oldest in memstore diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index ec5684dcda1..cd923f9c12e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; @@ -226,17 +228,20 @@ public class CompactingMemStore extends AbstractMemStore { /* * Scanners are ordered from 0 (oldest) to newest in increasing order. */ - protected List getListOfScanners(long readPt) throws IOException { + public List getScanners(long readPt) throws IOException { List pipelineList = pipeline.getSegments(); long order = pipelineList.size(); - LinkedList list = new LinkedList(); - list.add(getActive().getSegmentScanner(readPt, order+1)); + // The list of elements in pipeline + the active element + the snapshot segment + // TODO : This will change when the snapshot is made of more than one element + List list = new ArrayList(pipelineList.size() + 2); + list.add(getActive().getSegmentScanner(readPt, order + 1)); for (Segment item : pipelineList) { list.add(item.getSegmentScanner(readPt, order)); order--; } list.add(getSnapshot().getSegmentScanner(readPt, order)); - return list; + return Collections. singletonList( + new MemStoreScanner((AbstractMemStore) this, list, readPt)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index cdc910e486b..c21dbb5deb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -113,18 +114,19 @@ public class DefaultMemStore extends AbstractMemStore { /* * Scanners are ordered from 0 (oldest) to newest in increasing order. */ - protected List getListOfScanners(long readPt) throws IOException { - List list = new ArrayList(2); - list.add(0, getActive().getSegmentScanner(readPt, 1)); - list.add(1, getSnapshot().getSegmentScanner(readPt, 0)); - return list; + public List getScanners(long readPt) throws IOException { + List list = new ArrayList(2); + list.add(getActive().getSegmentScanner(readPt, 1)); + list.add(getSnapshot().getSegmentScanner(readPt, 0)); + return Collections. singletonList( + new MemStoreScanner((AbstractMemStore) this, list, readPt)); } @Override protected List getSegments() throws IOException { List list = new ArrayList(2); - list.add(0, getActive()); - list.add(1, getSnapshot()); + list.add(getActive()); + list.add(getSnapshot()); return list; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 5b2876d1dff..691ebb9c507 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -67,7 +67,7 @@ class MemStoreCompactor { public boolean startCompaction() throws IOException { if (!compactingMemStore.hasCompactibleSegments()) return false; // no compaction on empty - List scanners = new ArrayList(); + List scanners = new ArrayList(); // get the list of segments from the pipeline versionedList = compactingMemStore.getCompactibleSegments(); // the list is marked with specific version diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java index 01a7ff3f073..3d31d2a39e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -57,35 +57,39 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { private long readPoint; // remember the initial version of the scanners list - List scanners; + List scanners; // pointer back to the relevant MemStore // is needed for shouldSeek() method private AbstractMemStore backwardReferenceToMemStore; /** - * Constructor. * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! * After constructor only one heap is going to be initialized for entire lifespan * of the MemStoreScanner. A specific scanner can only be one directional! * * @param ms Pointer back to the MemStore - * @param readPoint Read point below which we can safely remove duplicate KVs + * @param scanners List of scanners over the segments + * @param readPt Read point below which we can safely remove duplicate KVs + */ + public MemStoreScanner(AbstractMemStore ms, List scanners, long readPt) + throws IOException { + this(ms, scanners, readPt, Type.UNDEFINED); + } + + /** + * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! + * After constructor only one heap is going to be initialized for entire lifespan + * of the MemStoreScanner. A specific scanner can only be one directional! + * + * @param ms Pointer back to the MemStore + * @param scanners List of scanners over the segments + * @param readPt Read point below which we can safely remove duplicate KVs * @param type The scan type COMPACT_FORWARD should be used for compaction */ - public MemStoreScanner(AbstractMemStore ms, long readPoint, Type type) throws IOException { - this(ms, ms.getListOfScanners(readPoint), readPoint, type); - } - - /* Constructor used only when the scan usage is unknown - and need to be defined according to the first move */ - public MemStoreScanner(AbstractMemStore ms, long readPt) throws IOException { - this(ms, readPt, Type.UNDEFINED); - } - - public MemStoreScanner(AbstractMemStore ms, List scanners, long readPoint, + public MemStoreScanner(AbstractMemStore ms, List scanners, long readPt, Type type) throws IOException { super(); - this.readPoint = readPoint; + this.readPoint = readPt; this.type = type; switch (type) { case UNDEFINED: @@ -262,8 +266,8 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { return true; } - for (SegmentScanner sc : scanners) { - if (sc.shouldSeek(scan, oldestUnexpiredTS)) { + for (KeyValueScanner sc : scanners) { + if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) { return true; } } @@ -275,7 +279,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { public String toString() { StringBuffer buf = new StringBuffer(); int i = 1; - for (SegmentScanner scanner : scanners) { + for (KeyValueScanner scanner : scanners) { buf.append("scanner (" + i + ") " + scanner.toString() + " ||| "); i++; } @@ -289,7 +293,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { */ private boolean restartBackwardHeap(Cell cell) throws IOException { boolean res = false; - for (SegmentScanner scan : scanners) { + for (KeyValueScanner scan : scanners) { res |= scan.seekToPreviousRow(cell); } this.backwardHeap = @@ -315,7 +319,7 @@ public class MemStoreScanner extends NonLazyKeyValueScanner { forwardHeap = null; // before building the heap seek for the relevant key on the scanners, // for the heap to be built from the scanners correctly - for (SegmentScanner scan : scanners) { + for (KeyValueScanner scan : scanners) { if (toLast) { res |= scan.seekToLastRow(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index a04c1da049f..1191f30427e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -243,7 +243,7 @@ public class SegmentScanner implements KeyValueScanner { */ @Override public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return true; + return getSegment().shouldSeek(scan,oldestUnexpiredTS); } /** * This scanner is working solely on the in-memory MemStore therefore this @@ -305,14 +305,6 @@ public class SegmentScanner implements KeyValueScanner { // do nothing } - /** - * Returns whether the given scan should seek in this segment - * @return whether the given scan should seek in this segment - */ - public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { - return getSegment().shouldSeek(scan,oldestUnexpiredTS); - } - protected Segment getSegment(){ return segment; }