HBASE-17655 Removing MemStoreScanner and SnapshotScanner

This commit is contained in:
eshcar 2017-03-21 12:32:59 +02:00
parent cc59fe4e91
commit 8f4ae0a0dc
30 changed files with 262 additions and 641 deletions

View File

@ -188,7 +188,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) {
// take default action
@ -196,7 +196,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
}
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
return new StoreScanner(store, scanInfo, scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.coprocessor;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
@ -128,16 +129,16 @@ public interface RegionObserver extends Coprocessor {
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being flushed
* @param memstoreScanner the scanner for the memstore that is flushed
* @param scanners the scanners for the memstore that is flushed
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used.
* @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner,
* @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, List,
* InternalScanner, long)}
*/
@Deprecated
default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
final Store store, final List<KeyValueScanner> scanners, final InternalScanner s)
throws IOException {
return s;
}
@ -151,16 +152,32 @@ public interface RegionObserver extends Coprocessor {
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being flushed
* @param memstoreScanner the scanner for the memstore that is flushed
* @param scanners the scanners for the memstore that is flushed
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param readPoint the readpoint to create scanner
* @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used.
*/
default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s,
final Store store, final List<KeyValueScanner> scanners, final InternalScanner s,
final long readPoint) throws IOException {
return preFlushScannerOpen(c, store, memstoreScanner, s);
return preFlushScannerOpen(c, store, scanners, s);
}
/**
* Maintain backward compatibility.
* @param c the environment provided by the region server
* @param store the store being flushed
* @param scanner the scanner for the memstore that is flushed
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param readPoint the readpoint to create scanner
* @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used.
*/
default InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final KeyValueScanner scanner, final InternalScanner s,
final long readPoint) throws IOException {
return preFlushScannerOpen(c, store, Collections.singletonList(scanner), s, readPoint);
}
/**
@ -1113,8 +1130,7 @@ public interface RegionObserver extends Coprocessor {
* Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened.
* <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner,
* long)} and {@link #preCompactScannerOpen(ObserverContext,
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext,
* Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
* to override scanners created for flushes or compactions, resp.
* <p>
@ -1145,8 +1161,7 @@ public interface RegionObserver extends Coprocessor {
* Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened.
* <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner,
* long)} and {@link #preCompactScannerOpen(ObserverContext,
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} and {@link #preCompactScannerOpen(ObserverContext,
* Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
* to override scanners created for flushes or compactions, resp.
* <p>

View File

@ -104,7 +104,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}

View File

@ -60,6 +60,20 @@ public abstract class AbstractMemStore implements MemStore {
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD;
public static long addToScanners(List<? extends Segment> segments, long readPt, long order,
List<KeyValueScanner> scanners) {
for (Segment item : segments) {
order = addToScanners(item, readPt, order, scanners);
}
return order;
}
protected static long addToScanners(Segment segment, long readPt, long order,
List<KeyValueScanner> scanners) {
scanners.add(segment.getScanner(readPt, order));
return order - 1;
}
protected AbstractMemStore(final Configuration conf, final CellComparator c) {
this.conf = conf;
this.comparator = c;

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
@ -318,21 +317,15 @@ public class CompactingMemStore extends AbstractMemStore {
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<? extends Segment> pipelineList = pipeline.getSegments();
int order = pipelineList.size() + snapshot.getNumOfSegments();
List<? extends Segment> snapshotList = snapshot.getAllSegments();
long order = 1 + pipelineList.size() + snapshotList.size();
// The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element
// The order is the Segment ordinal
List<KeyValueScanner> list = new ArrayList<>(order+1);
list.add(this.active.getScanner(readPt, order + 1));
for (Segment item : pipelineList) {
list.add(item.getScanner(readPt, order));
order--;
}
for (Segment item : snapshot.getAllSegments()) {
list.add(item.getScanner(readPt, order));
order--;
}
return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(getComparator(), list));
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>((int) order);
order = addToScanners(active, readPt, order, list);
order = addToScanners(pipelineList, readPt, order, list);
addToScanners(snapshotList, readPt, order, list);
return list;
}
/**

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@ -71,16 +70,6 @@ public class CompositeImmutableSegment extends ImmutableSegment {
return segments.size();
}
/**
* Builds a special scanner for the MemStoreSnapshot object that is different than the
* general segment scanner.
* @return a special scanner for the MemStoreSnapshot object
*/
@Override
public KeyValueScanner getSnapshotScanner() {
return getScanner(Long.MAX_VALUE, Long.MAX_VALUE);
}
/**
* @return whether the segment has any cells
*/
@ -148,8 +137,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
*/
@Override
public KeyValueScanner getScanner(long readPoint) {
// Long.MAX_VALUE is DEFAULT_SCANNER_ORDER
return getScanner(readPoint,Long.MAX_VALUE);
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
/**
@ -158,19 +146,14 @@ public class CompositeImmutableSegment extends ImmutableSegment {
*/
@Override
public KeyValueScanner getScanner(long readPoint, long order) {
KeyValueScanner resultScanner;
throw new IllegalStateException("Not supported by CompositeImmutableScanner");
}
@Override
public List<KeyValueScanner> getScanners(long readPoint, long order) {
List<KeyValueScanner> list = new ArrayList<>(segments.size());
for (ImmutableSegment s : segments) {
list.add(s.getScanner(readPoint, order));
}
try {
resultScanner = new MemStoreScanner(getComparator(), list);
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
return resultScanner;
AbstractMemStore.addToScanners(segments, readPoint, order, list);
return list;
}
@Override

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@ -75,10 +74,6 @@ public class DefaultMemStore extends AbstractMemStore {
super(conf, c);
}
void dump() {
super.dump(LOG);
}
/**
* Creates a snapshot of the current memstore.
* Snapshot must be cleared by call to {@link #clearSnapshot(long)}
@ -129,11 +124,11 @@ public class DefaultMemStore extends AbstractMemStore {
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<KeyValueScanner> list = new ArrayList<>(2);
list.add(this.active.getScanner(readPt, 1));
list.add(this.snapshot.getScanner(readPt, 0));
return Collections.<KeyValueScanner> singletonList(
new MemStoreScanner(getComparator(), list));
List<KeyValueScanner> list = new ArrayList<>();
long order = snapshot.getNumOfSegments();
order = addToScanners(active, readPt, order, list);
addToScanners(snapshot.getAllSegments(), readPt, order, list);
return list;
}
@Override

View File

@ -52,7 +52,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}

View File

@ -34,9 +34,7 @@ import java.util.List;
/**
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
* and is not needed for a {@link MutableSegment}. Specifically, the method
* {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the
* {@link MemStoreSnapshot} object.
* and is not needed for a {@link MutableSegment}.
*/
@InterfaceAudience.Private
public class ImmutableSegment extends Segment {
@ -130,14 +128,6 @@ public class ImmutableSegment extends Segment {
}
///////////////////// PUBLIC METHODS /////////////////////
/**
* Builds a special scanner for the MemStoreSnapshot object that is different than the
* general segment scanner.
* @return a special scanner for the MemStoreSnapshot object
*/
public KeyValueScanner getSnapshotScanner() {
return new SnapshotScanner(this);
}
@Override
public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {

View File

@ -252,7 +252,7 @@ public class MemStoreCompactor {
iterator =
new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
compactingMemStore.getComparator(),
compactionKVMax, compactingMemStore.getStore());
compactionKVMax);
result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.client.Scan;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -50,11 +49,16 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
List<ImmutableSegment> segments,
CellComparator comparator, int compactionKVMax, Store store
) throws IOException {
super(segments,comparator,compactionKVMax,store);
super(compactionKVMax);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
// create the list of scanners to traverse over all the data
// no dirty reads here as these are immutable segments
int order = segments.size();
AbstractMemStore.addToScanners(segments, Integer.MAX_VALUE, order, scanners);
// build the scanner based on Query Matcher
// reinitialize the compacting scanner for each instance of iterator
compactingScanner = createScanner(store, scanner);
compactingScanner = createScanner(store, scanners);
hasMore = compactingScanner.next(kvs, scannerContext);
@ -93,7 +97,6 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
public void close() {
compactingScanner.close();
compactingScanner = null;
scanner = null;
}
@Override
@ -106,13 +109,13 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
*
* @return the scanner
*/
private StoreScanner createScanner(Store store, KeyValueScanner scanner)
private StoreScanner createScanner(Store store, List<KeyValueScanner> scanners)
throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(); //Get all available versions
StoreScanner internalScanner =
new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner),
new StoreScanner(store, store.getScanInfo(), scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
@ -146,4 +149,4 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
}
return hasMore;
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
@ -33,36 +34,67 @@ import java.util.List;
@InterfaceAudience.Private
public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator {
// heap of scanners, lazily initialized
private KeyValueHeap heap = null;
// remember the initial version of the scanners list
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
private boolean closed = false;
// C-tor
public MemStoreMergerSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator,
int compactionKVMax, Store store
) throws IOException {
super(segments,comparator,compactionKVMax,store);
int compactionKVMax) throws IOException {
super(compactionKVMax);
// create the list of scanners to traverse over all the data
// no dirty reads here as these are immutable segments
int order = segments.size();
AbstractMemStore.addToScanners(segments, Integer.MAX_VALUE, order, scanners);
heap = new KeyValueHeap(scanners, comparator);
}
@Override
public boolean hasNext() {
return (scanner.peek()!=null);
if (closed) {
return false;
}
if (this.heap != null) {
return (this.heap.peek() != null);
}
// Doing this way in case some test cases tries to peek directly
return false;
}
@Override
public Cell next() {
Cell result = null;
try { // try to get next
result = scanner.next();
if (!closed && heap != null) {
return heap.next();
}
} catch (IOException ie) {
throw new IllegalStateException(ie);
}
return result;
return null;
}
public void close() {
scanner.close();
scanner = null;
if (closed) {
return;
}
// Ensuring that all the segment scanners are closed
if (heap != null) {
heap.close();
// It is safe to do close as no new calls will be made to this scanner.
heap = null;
} else {
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
}
closed = true;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}

View File

@ -1,334 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.htrace.Trace;
/**
* This is the scanner for any MemStore implementation, derived from MemStore.
* The MemStoreScanner combines KeyValueScanner from different Segments and
* uses the key-value heap and the reversed key-value heap for the aggregated key-values set.
* It is assumed that only traversing forward or backward is used (without zigzagging in between)
*/
@InterfaceAudience.Private
public class MemStoreScanner extends NonLazyKeyValueScanner {
// heap of scanners, lazily initialized
private KeyValueHeap heap;
// indicates if the scanner is created for inmemoryCompaction
private boolean inmemoryCompaction;
// remember the initial version of the scanners list
List<KeyValueScanner> scanners;
private final CellComparator comparator;
private boolean closed;
/**
* Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan
* and the heap is lazily initialized
* @param comparator Cell Comparator
* @param scanners List of scanners, from which the heap will be built
* @param inmemoryCompaction true if used for inmemoryCompaction.
* In this case, creates a forward heap always.
*/
public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners,
boolean inmemoryCompaction) throws IOException {
super();
this.comparator = comparator;
this.scanners = scanners;
if (Trace.isTracing() && Trace.currentSpan() != null) {
Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
}
this.inmemoryCompaction = inmemoryCompaction;
if (inmemoryCompaction) {
// init the forward scanner in case of inmemoryCompaction
initForwardKVHeapIfNeeded(comparator, scanners);
}
}
/**
* Creates either a forward KeyValue heap or Reverse KeyValue heap based on the type of scan
* and the heap is lazily initialized
* @param comparator Cell Comparator
* @param scanners List of scanners, from which the heap will be built
*/
public MemStoreScanner(CellComparator comparator, List<KeyValueScanner> scanners)
throws IOException {
this(comparator, scanners, false);
}
private void initForwardKVHeapIfNeeded(CellComparator comparator, List<KeyValueScanner> scanners)
throws IOException {
if (heap == null) {
// lazy init
// In a normal scan case, at the StoreScanner level before the KVHeap is
// created we do a seek or reseek. So that will happen
// on all the scanners that the StoreScanner is
// made of. So when we get any of those call to this scanner we init the
// heap here with normal forward KVHeap.
this.heap = new KeyValueHeap(scanners, comparator);
}
}
private boolean initReverseKVHeapIfNeeded(Cell seekKey, CellComparator comparator,
List<KeyValueScanner> scanners) throws IOException {
boolean res = false;
if (heap == null) {
// lazy init
// In a normal reverse scan case, at the ReversedStoreScanner level before the
// ReverseKeyValueheap is
// created we do a seekToLastRow or backwardSeek. So that will happen
// on all the scanners that the ReversedStoreSCanner is
// made of. So when we get any of those call to this scanner we init the
// heap here with ReversedKVHeap.
if (CellUtil.matchingRow(seekKey, HConstants.EMPTY_START_ROW)) {
for (KeyValueScanner scanner : scanners) {
res |= scanner.seekToLastRow();
}
} else {
for (KeyValueScanner scanner : scanners) {
res |= scanner.backwardSeek(seekKey);
}
}
this.heap = new ReversedKeyValueHeap(scanners, comparator);
}
return res;
}
/**
* Returns the cell from the top-most scanner without advancing the iterator.
* The backward traversal is assumed, only if specified explicitly
*/
@Override
public Cell peek() {
if (closed) {
return null;
}
if (this.heap != null) {
return this.heap.peek();
}
// Doing this way in case some test cases tries to peek directly to avoid NPE
return null;
}
/**
* Gets the next cell from the top-most scanner. Assumed forward scanning.
*/
@Override
public Cell next() throws IOException {
if (closed) {
return null;
}
if(this.heap != null) {
// loop over till the next suitable value
// take next value from the heap
for (Cell currentCell = heap.next();
currentCell != null;
currentCell = heap.next()) {
// all the logic of presenting cells is inside the internal KeyValueScanners
// located inside the heap
return currentCell;
}
}
return null;
}
/**
* Set the scanner at the seek key. Assumed forward scanning.
* Must be called only once: there is no thread safety between the scanner
* and the memStore.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public boolean seek(Cell cell) throws IOException {
if (closed) {
return false;
}
initForwardKVHeapIfNeeded(comparator, scanners);
if (cell == null) {
close();
return false;
}
return heap.seek(cell);
}
/**
* Move forward on the sub-lists set previously by seek. Assumed forward scanning.
*
* @param cell seek value (should be non-null)
* @return true if there is at least one KV to read, false otherwise
*/
@Override
public boolean reseek(Cell cell) throws IOException {
/*
* See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
* This code is executed concurrently with flush and puts, without locks.
* Two points must be known when working on this code:
* 1) It's not possible to use the 'kvTail' and 'snapshot'
* variables, as they are modified during a flush.
* 2) The ideal implementation for performance would use the sub skip list
* implicitly pointed by the iterators 'kvsetIt' and
* 'snapshotIt'. Unfortunately the Java API does not offer a method to
* get it. So we remember the last keys we iterated to and restore
* the reseeked set to at least that point.
*
* TODO: The above comment copied from the original MemStoreScanner
*/
if (closed) {
return false;
}
initForwardKVHeapIfNeeded(comparator, scanners);
return heap.reseek(cell);
}
/**
* MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all
* scanners.
* @see KeyValueScanner#getScannerOrder()
*/
@Override
public long getScannerOrder() {
return Long.MAX_VALUE;
}
@Override
public void close() {
if (closed) {
return;
}
// Ensuring that all the segment scanners are closed
if (heap != null) {
heap.close();
// It is safe to do close as no new calls will be made to this scanner.
heap = null;
} else {
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
}
closed = true;
}
/**
* Set the scanner at the seek key. Assumed backward scanning.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public boolean backwardSeek(Cell cell) throws IOException {
// The first time when this happens it sets the scanners to the seek key
// passed by the incoming scan's start row
if (closed) {
return false;
}
initReverseKVHeapIfNeeded(cell, comparator, scanners);
return heap.backwardSeek(cell);
}
/**
* Assumed backward scanning.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public boolean seekToPreviousRow(Cell cell) throws IOException {
if (closed) {
return false;
}
initReverseKVHeapIfNeeded(cell, comparator, scanners);
if (heap.peek() == null) {
restartBackwardHeap(cell);
}
return heap.seekToPreviousRow(cell);
}
@Override
public boolean seekToLastRow() throws IOException {
if (closed) {
return false;
}
return initReverseKVHeapIfNeeded(KeyValue.LOWESTKEY, comparator, scanners);
}
/**
* Check if this memstore may contain the required keys
* @return False if the key definitely does not exist in this Memstore
*/
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
// TODO : Check if this can be removed.
if (inmemoryCompaction) {
return true;
}
for (KeyValueScanner sc : scanners) {
if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) {
return true;
}
}
return false;
}
// debug method
@Override
public String toString() {
StringBuffer buf = new StringBuffer();
int i = 1;
for (KeyValueScanner scanner : scanners) {
buf.append("scanner (" + i + ") " + scanner.toString() + " ||| ");
i++;
}
return buf.toString();
}
/****************** Private methods ******************/
/**
* Restructure the ended backward heap after rerunning a seekToPreviousRow()
* on each scanner
* @return false if given Cell does not exist in any scanner
*/
private boolean restartBackwardHeap(Cell cell) throws IOException {
boolean res = false;
for (KeyValueScanner scan : scanners) {
res |= scan.seekToPreviousRow(cell);
}
this.heap =
new ReversedKeyValueHeap(scanners, comparator);
return res;
}
}

View File

@ -20,11 +20,10 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.io.IOException;
import java.util.*;
import java.util.Iterator;
/**
* The MemStoreSegmentsIterator is designed to perform one iteration over given list of segments
@ -35,29 +34,11 @@ import java.util.*;
@InterfaceAudience.Private
public abstract class MemStoreSegmentsIterator implements Iterator<Cell> {
// scanner for full or partial pipeline (heap of segment scanners)
// we need to keep those scanners in order to close them at the end
protected KeyValueScanner scanner;
protected final ScannerContext scannerContext;
// C-tor
public MemStoreSegmentsIterator(List<ImmutableSegment> segments, CellComparator comparator,
int compactionKVMax, Store store) throws IOException {
public MemStoreSegmentsIterator(int compactionKVMax) throws IOException {
this.scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
// list of Scanners of segments in the pipeline, when compaction starts
List<KeyValueScanner> scanners = new ArrayList<>();
// create the list of scanners to traverse over all the data
// no dirty reads here as these are immutable segments
for (ImmutableSegment segment : segments) {
scanners.add(segment.getScanner(Integer.MAX_VALUE));
}
scanner = new MemStoreScanner(comparator, scanners, true);
}
public abstract void close();

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import java.util.List;
/**
* Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
* count of cells in it and total memory size occupied by all the cells, timestamp information of
@ -31,7 +32,7 @@ public class MemStoreSnapshot {
private final long dataSize;
private final long heapSize;
private final TimeRangeTracker timeRangeTracker;
private final KeyValueScanner scanner;
private final List<KeyValueScanner> scanners;
private final boolean tagsPresent;
public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
@ -40,7 +41,7 @@ public class MemStoreSnapshot {
this.dataSize = snapshot.keySize();
this.heapSize = snapshot.heapSize();
this.timeRangeTracker = snapshot.getTimeRangeTracker();
this.scanner = snapshot.getSnapshotScanner();
this.scanners = snapshot.getScanners(Long.MAX_VALUE, Long.MAX_VALUE);
this.tagsPresent = snapshot.isTagsPresent();
}
@ -66,21 +67,21 @@ public class MemStoreSnapshot {
}
public long getHeapSize() {
return this.heapSize;
return heapSize;
}
/**
* @return {@link TimeRangeTracker} for all the Cells in the snapshot.
*/
public TimeRangeTracker getTimeRangeTracker() {
return this.timeRangeTracker;
return timeRangeTracker;
}
/**
* @return {@link KeyValueScanner} for iterating over the snapshot
*/
public KeyValueScanner getScanner() {
return this.scanner;
public List<KeyValueScanner> getScanners() {
return scanners;
}
/**
@ -89,4 +90,4 @@ public class MemStoreSnapshot {
public boolean isTagsPresent() {
return this.tagsPresent;
}
}
}

View File

@ -629,17 +629,16 @@ public class RegionCoprocessorHost
/**
* See
* {@link RegionObserver#preFlushScannerOpen(ObserverContext,
* Store, KeyValueScanner, InternalScanner, long)}
* {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
*/
public InternalScanner preFlushScannerOpen(final Store store,
final KeyValueScanner memstoreScanner, final long readPoint) throws IOException {
final List<KeyValueScanner> scanners, final long readPoint) throws IOException {
return execOperationWithResult(null,
coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult(), readPoint));
setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint));
}
});
}

View File

@ -18,7 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
@ -102,7 +102,7 @@ public abstract class Segment {
* Creates the scanner for the given read point
* @return a scanner for the given read point
*/
public KeyValueScanner getScanner(long readPoint) {
protected KeyValueScanner getScanner(long readPoint) {
return new SegmentScanner(this, readPoint);
}
@ -115,9 +115,7 @@ public abstract class Segment {
}
public List<KeyValueScanner> getScanners(long readPoint, long order) {
List<KeyValueScanner> scanners = new ArrayList<>(1);
scanners.add(getScanner(readPoint, order));
return scanners;
return Collections.singletonList(new SegmentScanner(this, readPoint, order));
}
/**

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -280,16 +281,11 @@ public class SegmentScanner implements KeyValueScanner {
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return getSegment().shouldSeek(scan,oldestUnexpiredTS);
}
/**
* This scanner is working solely on the in-memory MemStore therefore this
* interface is not relevant.
*/
@Override
public boolean requestSeek(Cell c, boolean forward, boolean useBloom)
throws IOException {
throw new IllegalStateException(
"requestSeek cannot be called on MutableCellSetSegmentScanner");
return NonLazyKeyValueScanner.doRealSeek(this, c, forward);
}
/**
@ -309,8 +305,7 @@ public class SegmentScanner implements KeyValueScanner {
*/
@Override
public void enforceSeek() throws IOException {
throw new IllegalStateException(
"enforceSeek cannot be called on MutableCellSetSegmentScanner");
throw new NotImplementedException("enforceSeek cannot be called on a SegmentScanner");
}
/**

View File

@ -1,105 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
/**
* Scans the snapshot. Acts as a simple scanner that just iterates over all the cells
* in the segment
*/
@InterfaceAudience.Private
public class SnapshotScanner extends SegmentScanner {
public SnapshotScanner(Segment immutableSegment) {
// Snapshot scanner does not need readpoint. It should read all the cells in the
// segment
super(immutableSegment, Long.MAX_VALUE);
}
@Override
public Cell peek() { // sanity check, the current should be always valid
if (closed) {
return null;
}
return current;
}
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return true;
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
throw new NotImplementedException(
"backwardSeek must not be called on a " + "non-reversed scanner");
}
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
throw new NotImplementedException(
"seekToPreviousRow must not be called on a " + "non-reversed scanner");
}
@Override
public boolean seekToLastRow() throws IOException {
throw new NotImplementedException(
"seekToLastRow must not be called on a " + "non-reversed scanner");
}
@Override
protected Iterator<Cell> getIterator(Cell cell) {
return segment.iterator();
}
@Override
protected void updateCurrent() {
if (iter.hasNext()) {
current = iter.next();
} else {
current = null;
}
}
@Override
public boolean seek(Cell seekCell) {
// restart iterator
iter = getIterator(seekCell);
return reseek(seekCell);
}
@Override
public boolean reseek(Cell seekCell) {
while (iter.hasNext()) {
Cell next = iter.next();
int ret = segment.getComparator().compare(next, seekCell);
if (ret >= 0) {
current = next;
return true;
}
}
return false;
}
}

View File

@ -74,22 +74,22 @@ abstract class StoreFlusher {
/**
* Creates the scanner for flushing snapshot. Also calls coprocessors.
* @param snapshotScanner
* @param snapshotScanners
* @param smallestReadPoint
* @return The scanner; null if coprocessor is canceling the flush.
*/
protected InternalScanner createScanner(KeyValueScanner snapshotScanner,
protected InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
long smallestReadPoint) throws IOException {
InternalScanner scanner = null;
if (store.getCoprocessorHost() != null) {
scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner,
scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanners,
smallestReadPoint);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(store.getScanInfo().getMaxVersions());
scanner = new StoreScanner(store, store.getScanInfo(), scan,
Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES,
snapshotScanners, ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
assert scanner != null;

View File

@ -62,7 +62,7 @@ public class StripeStoreFlusher extends StoreFlusher {
if (cellsCount == 0) return result; // don't flush if there are no entries
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}

View File

@ -187,7 +187,7 @@ public class SimpleRegionObserver implements RegionObserver {
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
ctPreFlushScannerOpen.incrementAndGet();
return null;
}

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@ -50,7 +49,6 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
@ -122,11 +120,11 @@ public class TestRegionObserverScannerOpenHook {
public static class NoDataFromFlush implements RegionObserver {
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
Scan scan = new Scan();
scan.setFilter(new NoDataFilter());
return new StoreScanner(store, store.getScanInfo(), scan,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
scanners, ScanType.COMPACT_RETAIN_DELETES,
store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
}

View File

@ -43,13 +43,13 @@ public class NoOpScanPolicyObserver implements RegionObserver {
*/
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
ScanInfo oldSI = store.getScanInfo();
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getFamily(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
return new StoreScanner(store, scanInfo, scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}

View File

@ -384,8 +384,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanner
snapshot.getScanner().close();
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
@ -426,8 +428,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the scanner
snapshot.getScanner().close();
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@ -455,8 +459,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the scanner
snapshot.getScanner().close();
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
@ -524,8 +530,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
// Creating another snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
// close the scanner
snapshot.getScanner().close();
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
snapshot = memstore.snapshot();
@ -540,8 +548,10 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the scanner
snapshot.getScanner().close();
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}

View File

@ -316,13 +316,17 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
// seek
scanners.get(0).seek(KeyValue.LOWESTKEY);
int count = 0;
while (scanners.get(0).next() != null) {
count++;
for(int i = 0; i < scanners.size(); i++) {
scanners.get(i).seek(KeyValue.LOWESTKEY);
while (scanners.get(i).next() != null) {
count++;
}
}
assertEquals("the count should be ", count, 150);
scanners.get(0).close();
for(int i = 0; i < scanners.size(); i++) {
scanners.get(i).close();
}
}
@Test
@ -337,7 +341,7 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
// Just doing the cnt operation here
MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
CellComparator.COMPARATOR, 10, ((CompactingMemStore) memstore).getStore());
CellComparator.COMPARATOR, 10);
int cnt = 0;
try {
while (itr.next() != null) {
@ -398,8 +402,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the scanner
snapshot.getScanner().close();
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@ -427,8 +433,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the scanner
snapshot.getScanner().close();
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
@ -458,8 +466,10 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanner
snapshot.getScanner().close();
// close the scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();

View File

@ -264,12 +264,20 @@ public class TestDefaultMemStore {
protected void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2)
throws IOException {
List<KeyValueScanner> memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
assertEquals(1, memstorescanners.size());
final KeyValueScanner scanner = memstorescanners.get(0);
scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
assertEquals(kv1, scanner.next());
assertEquals(kv2, scanner.next());
assertNull(scanner.next());
assertEquals(2, memstorescanners.size());
final KeyValueScanner scanner0 = memstorescanners.get(0);
final KeyValueScanner scanner1 = memstorescanners.get(1);
scanner0.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
scanner1.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
Cell n0 = scanner0.next();
Cell n1 = scanner1.next();
assertTrue(kv1.equals(n0) || kv1.equals(n1));
assertTrue(kv2.equals(n0)
|| kv2.equals(n1)
|| kv2.equals(scanner0.next())
|| kv2.equals(scanner1.next()));
assertNull(scanner0.next());
assertNull(scanner1.next());
}
protected void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected)

View File

@ -138,7 +138,9 @@ public class TestMemStoreChunkPool {
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanner - this is how the snapshot will be used
snapshot.getScanner().close();
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
@ -182,7 +184,9 @@ public class TestMemStoreChunkPool {
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the snapshot scanner
snapshot.getScanner().close();
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@ -209,8 +213,10 @@ public class TestMemStoreChunkPool {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the snapshot scanner
snapshot.getScanner().close();
// close the snapshot scanners
for(KeyValueScanner scanner : snapshot.getScanners()) {
scanner.close();
}
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}

View File

@ -120,7 +120,7 @@ public class TestReversibleScanners {
LOG.info("Setting read point to " + readPoint);
scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true, false, false, readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
}
}
@ -135,7 +135,7 @@ public class TestReversibleScanners {
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
scanners = memstore.getScanners(readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners, readPoint);
}
}
@ -560,38 +560,68 @@ public class TestReversibleScanners {
}
private void seekTestOfReversibleKeyValueScannerWithMVCC(
KeyValueScanner scanner, int readPoint) throws IOException {
/**
* Test with MVCC
*/
// Test seek to last row
KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
ROWSIZE - 1, 0, readPoint);
assertEquals(expectedKey != null, scanner.seekToLastRow());
assertEquals(expectedKey, scanner.peek());
List<? extends KeyValueScanner> scanners, int readPoint) throws IOException {
/**
* Test with MVCC
*/
// Test seek to last row
KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
ROWSIZE - 1, 0, readPoint);
boolean res = false;
for (KeyValueScanner scanner : scanners) {
res |= scanner.seekToLastRow();
}
assertEquals(expectedKey != null, res);
res = false;
for (KeyValueScanner scanner : scanners) {
res |= (expectedKey.equals(scanner.peek()));
}
assertTrue(res);
// Test backward seek in two cases
// Case1: seek in the same row in backwardSeek
expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
QUALSIZE - 2, readPoint);
assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey));
assertEquals(expectedKey, scanner.peek());
res = false;
for (KeyValueScanner scanner : scanners) {
res |= scanner.backwardSeek(expectedKey);
}
assertEquals(expectedKey != null, res);
res = false;
for (KeyValueScanner scanner : scanners) {
res |= (expectedKey.equals(scanner.peek()));
}
assertTrue(res);
// Case2: seek to the previous row in backwardSeek
int seekRowNum = ROWSIZE - 3;
KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]);
expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
readPoint);
assertEquals(expectedKey != null, scanner.backwardSeek(seekKey));
assertEquals(expectedKey, scanner.peek());
res = false;
for (KeyValueScanner scanner : scanners) {
res |= scanner.backwardSeek(expectedKey);
}
res = false;
for (KeyValueScanner scanner : scanners) {
res |= (expectedKey.equals(scanner.peek()));
}
assertTrue(res);
// Test seek to previous row
seekRowNum = ROWSIZE - 4;
expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
readPoint);
assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValueUtil
.createFirstOnRow(ROWS[seekRowNum])));
assertEquals(expectedKey, scanner.peek());
res = false;
for (KeyValueScanner scanner : scanners) {
res |= scanner.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[seekRowNum]));
}
assertEquals(expectedKey != null, res);
res = false;
for (KeyValueScanner scanner : scanners) {
res |= (expectedKey.equals(scanner.peek()));
}
assertTrue(res);
}
private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,

View File

@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -238,7 +237,7 @@ public class TestCoprocessorScanPolicy {
@Override
public InternalScanner preFlushScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName());
if (newTtl != null) {
System.out.println("PreFlush:" + newTtl);
@ -253,7 +252,7 @@ public class TestCoprocessorScanPolicy {
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
return new StoreScanner(store, scanInfo, scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
}