HBASE-4811 Support reverse Scan

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1546878 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
zjushch 2013-12-02 02:20:35 +00:00
parent 5d348605a2
commit 7b4b061bcf
34 changed files with 3279 additions and 98 deletions

View File

@ -54,7 +54,7 @@ public class ClientScanner extends AbstractClientScanner {
// Current region scanner is against. Gets cleared if current region goes // Current region scanner is against. Gets cleared if current region goes
// wonky: e.g. if it splits on us. // wonky: e.g. if it splits on us.
protected HRegionInfo currentRegion = null; protected HRegionInfo currentRegion = null;
private ScannerCallable callable = null; protected ScannerCallable callable = null;
protected final LinkedList<Result> cache = new LinkedList<Result>(); protected final LinkedList<Result> cache = new LinkedList<Result>();
protected final int caching; protected final int caching;
protected long lastNext; protected long lastNext;
@ -219,7 +219,7 @@ public class ClientScanner extends AbstractClientScanner {
* @param nbRows * @param nbRows
* @param done Server-side says we're done scanning. * @param done Server-side says we're done scanning.
*/ */
private boolean nextScanner(int nbRows, final boolean done) protected boolean nextScanner(int nbRows, final boolean done)
throws IOException { throws IOException {
// Close the previous scanner if it's open // Close the previous scanner if it's open
if (this.callable != null) { if (this.callable != null) {

View File

@ -713,9 +713,12 @@ public class HTable implements HTableInterface {
if (scan.getCaching() <= 0) { if (scan.getCaching() <= 0) {
scan.setCaching(getScannerCaching()); scan.setCaching(getScannerCaching());
} }
if (scan.isSmall()) { if (scan.isSmall() && !scan.isReversed()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(), return new ClientSmallScanner(getConfiguration(), scan, getName(),
this.connection); this.connection);
} else if (scan.isReversed()) {
return new ReversedClientScanner(getConfiguration(), scan, getName(),
this.connection);
} }
return new ClientScanner(getConfiguration(), scan, return new ClientScanner(getConfiguration(), scan,
getName(), this.connection); getName(), this.connection);

View File

@ -43,10 +43,10 @@ import org.apache.hadoop.hbase.util.Bytes;
public abstract class RegionServerCallable<T> implements RetryingCallable<T> { public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
// Public because used outside of this package over in ipc. // Public because used outside of this package over in ipc.
static final Log LOG = LogFactory.getLog(RegionServerCallable.class); static final Log LOG = LogFactory.getLog(RegionServerCallable.class);
private final HConnection connection; protected final HConnection connection;
private final TableName tableName; protected final TableName tableName;
private final byte [] row; protected final byte[] row;
private HRegionLocation location; protected HRegionLocation location;
private ClientService.BlockingInterface stub; private ClientService.BlockingInterface stub;
protected final static int MIN_WAIT_DEAD_SERVER = 10000; protected final static int MIN_WAIT_DEAD_SERVER = 10000;

View File

@ -0,0 +1,171 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A reversed client scanner which support backward scanning
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ReversedClientScanner extends ClientScanner {
private static final Log LOG = LogFactory.getLog(ReversedClientScanner.class);
// A byte array in which all elements are the max byte, and it is used to
// construct closest front row
static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
/**
* Create a new ReversibleClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed.
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to scan
* @param connection Connection identifying the cluster
* @throws IOException
*/
public ReversedClientScanner(Configuration conf, Scan scan,
TableName tableName, HConnection connection) throws IOException {
super(conf, scan, tableName, connection);
}
@Override
protected boolean nextScanner(int nbRows, final boolean done)
throws IOException {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
this.caller.callWithRetries(callable);
this.callable = null;
}
// Where to start the next scanner
byte[] localStartKey;
boolean locateTheClosestFrontRow = true;
// if we're at start of table, close and return false to stop iterating
if (this.currentRegion != null) {
byte[] startKey = this.currentRegion.getStartKey();
if (startKey == null
|| Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
|| checkScanStopRow(startKey) || done) {
close();
if (LOG.isDebugEnabled()) {
LOG.debug("Finished " + this.currentRegion);
}
return false;
}
localStartKey = startKey;
if (LOG.isDebugEnabled()) {
LOG.debug("Finished " + this.currentRegion);
}
} else {
localStartKey = this.scan.getStartRow();
if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) {
locateTheClosestFrontRow = false;
}
}
if (LOG.isDebugEnabled() && this.currentRegion != null) {
// Only worth logging if NOT first region in scan.
LOG.debug("Advancing internal scanner to startKey at '"
+ Bytes.toStringBinary(localStartKey) + "'");
}
try {
// In reversed scan, we want to locate the previous region through current
// region's start key. In order to get that previous region, first we
// create a closest row before the start key of current region, then
// locate all the regions from the created closest row to start key of
// current region, thus the last one of located regions should be the
// previous region of current region. The related logic of locating
// regions is implemented in ReversedScannerCallable
byte[] locateStartRow = locateTheClosestFrontRow ? createClosestRowBefore(localStartKey)
: null;
callable = getScannerCallable(localStartKey, nbRows, locateStartRow);
// Open a scanner on the region server starting at the
// beginning of the region
this.caller.callWithRetries(callable);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
} catch (IOException e) {
close();
throw e;
}
return true;
}
protected ScannerCallable getScannerCallable(byte[] localStartKey,
int nbRows, byte[] locateStartRow) {
scan.setStartRow(localStartKey);
ScannerCallable s = new ReversedScannerCallable(getConnection(),
getTable(), scan, this.scanMetrics, locateStartRow);
s.setCaching(nbRows);
return s;
}
@Override
// returns true if stopRow >= passed region startKey
protected boolean checkScanStopRow(final byte[] startKey) {
if (this.scan.getStopRow().length > 0) {
// there is a stop row, check to see if we are past it.
byte[] stopRow = scan.getStopRow();
int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, startKey, 0,
startKey.length);
if (cmp >= 0) {
// stopRow >= startKey (stopRow is equals to or larger than endKey)
// This is a stop.
return true;
}
}
return false; // unlikely.
}
/**
* Create the closest row before the specified row
* @param row
* @return a new byte array which is the closest front row of the specified one
*/
private byte[] createClosestRowBefore(byte[] row) {
if (row == null) {
throw new IllegalArgumentException("The passed row is empty");
}
if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) {
return MAX_BYTE_ARRAY;
}
if (row[row.length - 1] == 0) {
return Arrays.copyOf(row, row.length - 1);
} else {
byte[] closestFrontRow = Arrays.copyOf(row, row.length);
closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1);
closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY);
return closestFrontRow;
}
}
}

View File

@ -0,0 +1,141 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A reversed ScannerCallable which supports backward scanning.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ReversedScannerCallable extends ScannerCallable {
/**
* The start row for locating regions. In reversed scanner, may locate the
* regions for a range of keys when doing
* {@link ReversedClientScanner#nextScanner(int, boolean)}
*/
protected final byte[] locateStartRow;
/**
*
* @param connection
* @param tableName
* @param scan
* @param scanMetrics
* @param locateStartRow The start row for locating regions
*/
public ReversedScannerCallable(HConnection connection, TableName tableName,
Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
super(connection, tableName, scan, scanMetrics);
this.locateStartRow = locateStartRow;
}
/**
* @param reload force reload of server location
* @throws IOException
*/
@Override
public void prepare(boolean reload) throws IOException {
if (!instantiated || reload) {
if (locateStartRow == null) {
// Just locate the region with the row
this.location = connection.getRegionLocation(tableName, row, reload);
if (this.location == null) {
throw new IOException("Failed to find location, tableName="
+ tableName + ", row=" + Bytes.toString(row) + ", reload="
+ reload);
}
} else {
// Need to locate the regions with the range, and the target location is
// the last one which is the previous region of last region scanner
List<HRegionLocation> locatedRegions = locateRegionsInRange(
locateStartRow, row, reload);
if (locatedRegions.isEmpty()) {
throw new DoNotRetryIOException(
"Does hbase:meta exist hole? Couldn't get regions for the range from "
+ Bytes.toStringBinary(locateStartRow) + " to "
+ Bytes.toStringBinary(row));
}
this.location = locatedRegions.get(locatedRegions.size() - 1);
}
setStub(getConnection().getClient(getLocation().getServerName()));
checkIfRegionServerIsRemote();
instantiated = true;
}
// check how often we retry.
// HConnectionManager will call instantiateServer with reload==true
// if and only if for retries.
if (reload && this.scanMetrics != null) {
this.scanMetrics.countOfRPCRetries.incrementAndGet();
if (isRegionServerRemote) {
this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
}
}
}
/**
* Get the corresponding regions for an arbitrary range of keys.
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range, exclusive
* @param reload force reload of server location
* @return A list of HRegionLocation corresponding to the regions that contain
* the specified range
* @throws IOException
*/
private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
byte[] endKey, boolean reload) throws IOException {
final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
throw new IllegalArgumentException("Invalid range: "
+ Bytes.toStringBinary(startKey) + " > "
+ Bytes.toStringBinary(endKey));
}
List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
byte[] currentKey = startKey;
do {
HRegionLocation regionLocation = connection.getRegionLocation(tableName,
currentKey, reload);
if (regionLocation.getRegionInfo().containsRow(currentKey)) {
regionList.add(regionLocation);
} else {
throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
+ Bytes.toStringBinary(currentKey) + " returns incorrect region "
+ regionLocation.getRegionInfo());
}
currentKey = regionLocation.getRegionInfo().getEndKey();
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
&& (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
return regionList;
}
}

View File

@ -110,6 +110,7 @@ public class Scan extends Query {
private int caching = -1; private int caching = -1;
private long maxResultSize = -1; private long maxResultSize = -1;
private boolean cacheBlocks = true; private boolean cacheBlocks = true;
private boolean reversed = false;
private TimeRange tr = new TimeRange(); private TimeRange tr = new TimeRange();
private Map<byte [], NavigableSet<byte []>> familyMap = private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR); new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
@ -549,6 +550,27 @@ public class Scan extends Query {
return cacheBlocks; return cacheBlocks;
} }
/**
* Set whether this scan is a reversed one
* <p>
* This is false by default which means forward(normal) scan.
*
* @param reversed if true, scan will be backward order
* @return this
*/
public Scan setReversed(boolean reversed) {
this.reversed = reversed;
return this;
}
/**
* Get whether this scan is a reversed one.
* @return true if backward scan, false if forward(default) scan
*/
public boolean isReversed() {
return reversed;
}
/** /**
* Set the value indicating whether loading CFs on demand should be allowed (cluster * Set the value indicating whether loading CFs on demand should be allowed (cluster
* default is false). On-demand CF loading doesn't load column families until necessary, e.g. * default is false). On-demand CF loading doesn't load column families until necessary, e.g.

View File

@ -62,11 +62,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
public static final Log LOG = LogFactory.getLog(ScannerCallable.class); public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
private long scannerId = -1L; private long scannerId = -1L;
private boolean instantiated = false; protected boolean instantiated = false;
private boolean closed = false; private boolean closed = false;
private Scan scan; private Scan scan;
private int caching = 1; private int caching = 1;
private ScanMetrics scanMetrics; protected ScanMetrics scanMetrics;
private boolean logScannerActivity = false; private boolean logScannerActivity = false;
private int logCutOffLatency = 1000; private int logCutOffLatency = 1000;
private static String myAddress; private static String myAddress;
@ -79,7 +79,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} }
// indicate if it is a remote server call // indicate if it is a remote server call
private boolean isRegionServerRemote = true; protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0; private long nextCallSeq = 0;
/** /**
@ -135,7 +135,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
* compare the local machine hostname with region server's hostname * compare the local machine hostname with region server's hostname
* to decide if hbase client connects to a remote region server * to decide if hbase client connects to a remote region server
*/ */
private void checkIfRegionServerIsRemote() { protected void checkIfRegionServerIsRemote() {
if (getLocation().getHostname().equalsIgnoreCase(myAddress)) { if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
isRegionServerRemote = false; isRegionServerRemote = false;
} else { } else {

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public abstract class Filter { public abstract class Filter {
protected boolean reversed;
/** /**
* Reset the state of the filter between rows. * Reset the state of the filter between rows.
* *
@ -277,4 +278,16 @@ public abstract class Filter {
* @throws IOException in case an I/O or an filter specific failure needs to be signaled. * @throws IOException in case an I/O or an filter specific failure needs to be signaled.
*/ */
abstract boolean areSerializedFieldsEqual(Filter other); abstract boolean areSerializedFieldsEqual(Filter other);
/**
* alter the reversed scan flag
* @param reversed flag
*/
public void setReversed(boolean reversed) {
this.reversed = reversed;
}
public boolean isReversed() {
return this.reversed;
}
} }

View File

@ -159,6 +159,11 @@ final public class FilterList extends Filter {
* @param filter another filter * @param filter another filter
*/ */
public void addFilter(Filter filter) { public void addFilter(Filter filter) {
if (this.isReversed() != filter.isReversed()) {
throw new IllegalArgumentException(
"Filters in the list must have the same reversed flag, this.reversed="
+ this.isReversed());
}
this.filters.add(filter); this.filters.add(filter);
} }
@ -463,6 +468,14 @@ final public class FilterList extends Filter {
return false; return false;
} }
@Override
public void setReversed(boolean reversed) {
for (Filter filter : filters) {
filter.setReversed(reversed);
}
this.reversed = reversed;
}
@Override @Override
public String toString() { public String toString() {
return toString(MAX_LOG_FILTERS); return toString(MAX_LOG_FILTERS);

View File

@ -59,7 +59,7 @@ public class PrefixFilter extends FilterBase {
// if we are passed the prefix, set flag // if we are passed the prefix, set flag
int cmp = Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0, int cmp = Bytes.compareTo(buffer, offset, this.prefix.length, this.prefix, 0,
this.prefix.length); this.prefix.length);
if(cmp > 0) { if ((!isReversed() && cmp > 0) || (isReversed() && cmp < 0)) {
passedPrefix = true; passedPrefix = true;
} }
filterRow = (cmp != 0); filterRow = (cmp != 0);

View File

@ -850,6 +850,9 @@ public final class ProtobufUtil {
if (scan.getRowOffsetPerColumnFamily() > 0) { if (scan.getRowOffsetPerColumnFamily() > 0) {
scanBuilder.setStoreOffset(scan.getRowOffsetPerColumnFamily()); scanBuilder.setStoreOffset(scan.getRowOffsetPerColumnFamily());
} }
if (scan.isReversed()) {
scanBuilder.setReversed(scan.isReversed());
}
return scanBuilder.build(); return scanBuilder.build();
} }
@ -926,6 +929,9 @@ public final class ProtobufUtil {
} }
} }
} }
if (proto.hasReversed()) {
scan.setReversed(proto.getReversed());
}
return scan; return scan;
} }

View File

@ -1985,6 +1985,37 @@ public class Bytes {
System.arraycopy(buf, 0, b, offset, length); System.arraycopy(buf, 0, b, offset, length);
} }
/**
* Create a max byte array with the specified max byte count
* @param maxByteCount the length of returned byte array
* @return the created max byte array
*/
public static byte[] createMaxByteArray(int maxByteCount) {
byte[] maxByteArray = new byte[maxByteCount];
for (int i = 0; i < maxByteArray.length; i++) {
maxByteArray[i] = (byte) 0xff;
}
return maxByteArray;
}
/**
* Create a byte array which is multiple given bytes
* @param srcBytes
* @param multiNum
* @return byte array
*/
public static byte[] multiple(byte[] srcBytes, int multiNum) {
if (multiNum <= 0) {
return new byte[0];
}
byte[] result = new byte[srcBytes.length * multiNum];
for (int i = 0; i < multiNum; i++) {
System.arraycopy(srcBytes, 0, result, i * srcBytes.length,
srcBytes.length);
}
return result;
}
/** /**
* Convert a byte array into a hex string * Convert a byte array into a hex string
* @param b * @param b

View File

@ -13330,6 +13330,16 @@ public final class ClientProtos {
* <code>optional bool small = 14;</code> * <code>optional bool small = 14;</code>
*/ */
boolean getSmall(); boolean getSmall();
// optional bool reversed = 15 [default = false];
/**
* <code>optional bool reversed = 15 [default = false];</code>
*/
boolean hasReversed();
/**
* <code>optional bool reversed = 15 [default = false];</code>
*/
boolean getReversed();
} }
/** /**
* Protobuf type {@code Scan} * Protobuf type {@code Scan}
@ -13485,6 +13495,11 @@ public final class ClientProtos {
small_ = input.readBool(); small_ = input.readBool();
break; break;
} }
case 120: {
bitField0_ |= 0x00001000;
reversed_ = input.readBool();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -13815,6 +13830,22 @@ public final class ClientProtos {
return small_; return small_;
} }
// optional bool reversed = 15 [default = false];
public static final int REVERSED_FIELD_NUMBER = 15;
private boolean reversed_;
/**
* <code>optional bool reversed = 15 [default = false];</code>
*/
public boolean hasReversed() {
return ((bitField0_ & 0x00001000) == 0x00001000);
}
/**
* <code>optional bool reversed = 15 [default = false];</code>
*/
public boolean getReversed() {
return reversed_;
}
private void initFields() { private void initFields() {
column_ = java.util.Collections.emptyList(); column_ = java.util.Collections.emptyList();
attribute_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList();
@ -13830,6 +13861,7 @@ public final class ClientProtos {
storeOffset_ = 0; storeOffset_ = 0;
loadColumnFamiliesOnDemand_ = false; loadColumnFamiliesOnDemand_ = false;
small_ = false; small_ = false;
reversed_ = false;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -13903,6 +13935,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000800) == 0x00000800)) { if (((bitField0_ & 0x00000800) == 0x00000800)) {
output.writeBool(14, small_); output.writeBool(14, small_);
} }
if (((bitField0_ & 0x00001000) == 0x00001000)) {
output.writeBool(15, reversed_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -13968,6 +14003,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(14, small_); .computeBoolSize(14, small_);
} }
if (((bitField0_ & 0x00001000) == 0x00001000)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(15, reversed_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -14055,6 +14094,11 @@ public final class ClientProtos {
result = result && (getSmall() result = result && (getSmall()
== other.getSmall()); == other.getSmall());
} }
result = result && (hasReversed() == other.hasReversed());
if (hasReversed()) {
result = result && (getReversed()
== other.getReversed());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -14124,6 +14168,10 @@ public final class ClientProtos {
hash = (37 * hash) + SMALL_FIELD_NUMBER; hash = (37 * hash) + SMALL_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getSmall()); hash = (53 * hash) + hashBoolean(getSmall());
} }
if (hasReversed()) {
hash = (37 * hash) + REVERSED_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getReversed());
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -14292,6 +14340,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00001000); bitField0_ = (bitField0_ & ~0x00001000);
small_ = false; small_ = false;
bitField0_ = (bitField0_ & ~0x00002000); bitField0_ = (bitField0_ & ~0x00002000);
reversed_ = false;
bitField0_ = (bitField0_ & ~0x00004000);
return this; return this;
} }
@ -14394,6 +14444,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000800; to_bitField0_ |= 0x00000800;
} }
result.small_ = small_; result.small_ = small_;
if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
to_bitField0_ |= 0x00001000;
}
result.reversed_ = reversed_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -14498,6 +14552,9 @@ public final class ClientProtos {
if (other.hasSmall()) { if (other.hasSmall()) {
setSmall(other.getSmall()); setSmall(other.getSmall());
} }
if (other.hasReversed()) {
setReversed(other.getReversed());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -15609,6 +15666,39 @@ public final class ClientProtos {
return this; return this;
} }
// optional bool reversed = 15 [default = false];
private boolean reversed_ ;
/**
* <code>optional bool reversed = 15 [default = false];</code>
*/
public boolean hasReversed() {
return ((bitField0_ & 0x00004000) == 0x00004000);
}
/**
* <code>optional bool reversed = 15 [default = false];</code>
*/
public boolean getReversed() {
return reversed_;
}
/**
* <code>optional bool reversed = 15 [default = false];</code>
*/
public Builder setReversed(boolean value) {
bitField0_ |= 0x00004000;
reversed_ = value;
onChanged();
return this;
}
/**
* <code>optional bool reversed = 15 [default = false];</code>
*/
public Builder clearReversed() {
bitField0_ = (bitField0_ & ~0x00004000);
reversed_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:Scan) // @@protoc_insertion_point(builder_scope:Scan)
} }

View File

@ -232,6 +232,7 @@ message Scan {
optional uint32 store_offset = 12; optional uint32 store_offset = 12;
optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */ optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */
optional bool small = 14; optional bool small = 14;
optional bool reversed = 15 [default = false];
} }
/** /**

View File

@ -1220,6 +1220,13 @@ public class HRegion implements HeapSize { // , Writable{
return size; return size;
} }
/**
* @return KeyValue Comparator
*/
public KeyValue.KVComparator getComparator() {
return this.comparator;
}
/* /*
* Do preparation for pending compaction. * Do preparation for pending compaction.
* @throws IOException * @throws IOException
@ -1769,6 +1776,12 @@ public class HRegion implements HeapSize { // , Writable{
protected RegionScanner instantiateRegionScanner(Scan scan, protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException { List<KeyValueScanner> additionalScanners) throws IOException {
if (scan.isReversed()) {
if (scan.getFilter() != null) {
scan.getFilter().setReversed(true);
}
return new ReversedRegionScannerImpl(scan, additionalScanners, this);
}
return new RegionScannerImpl(scan, additionalScanners, this); return new RegionScannerImpl(scan, additionalScanners, this);
} }
@ -3508,17 +3521,17 @@ public class HRegion implements HeapSize { // , Writable{
/** /**
* If the joined heap data gathering is interrupted due to scan limits, this will * If the joined heap data gathering is interrupted due to scan limits, this will
* contain the row for which we are populating the values.*/ * contain the row for which we are populating the values.*/
private KeyValue joinedContinuationRow = null; protected KeyValue joinedContinuationRow = null;
// KeyValue indicating that limit is reached when scanning // KeyValue indicating that limit is reached when scanning
private final KeyValue KV_LIMIT = new KeyValue(); private final KeyValue KV_LIMIT = new KeyValue();
private final byte [] stopRow; protected final byte[] stopRow;
private Filter filter; private Filter filter;
private int batch; private int batch;
private int isScan; protected int isScan;
private boolean filterClosed = false; private boolean filterClosed = false;
private long readPt; private long readPt;
private long maxResultSize; private long maxResultSize;
private HRegion region; protected HRegion region;
@Override @Override
public HRegionInfo getRegionInfo() { public HRegionInfo getRegionInfo() {
@ -3573,16 +3586,22 @@ public class HRegion implements HeapSize { // , Writable{
joinedScanners.add(scanner); joinedScanners.add(scanner);
} }
} }
this.storeHeap = new KeyValueHeap(scanners, comparator); initializeKVHeap(scanners, joinedScanners, region);
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
}
} }
RegionScannerImpl(Scan scan, HRegion region) throws IOException { RegionScannerImpl(Scan scan, HRegion region) throws IOException {
this(scan, null, region); this(scan, null, region);
} }
protected void initializeKVHeap(List<KeyValueScanner> scanners,
List<KeyValueScanner> joinedScanners, HRegion region)
throws IOException {
this.storeHeap = new KeyValueHeap(scanners, region.comparator);
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
}
}
@Override @Override
public long getMaxResultSize() { public long getMaxResultSize() {
return maxResultSize; return maxResultSize;
@ -3854,7 +3873,7 @@ public class HRegion implements HeapSize { // , Writable{
currentRow); currentRow);
} }
private boolean isStopRow(byte [] currentRow, int offset, short length) { protected boolean isStopRow(byte[] currentRow, int offset, short length) {
return currentRow == null || return currentRow == null ||
(stopRow != null && (stopRow != null &&
comparator.compareRows(stopRow, 0, stopRow.length, comparator.compareRows(stopRow, 0, stopRow.length,

View File

@ -1750,7 +1750,9 @@ public class HStore implements Store {
scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
} }
if (scanner == null) { if (scanner == null) {
scanner = new StoreScanner(this, getScanInfo(), scan, targetCols, readPt); scanner = scan.isReversed() ? new ReversedStoreScanner(this,
getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
getScanInfo(), scan, targetCols, readPt);
} }
return scanner; return scanner;
} finally { } finally {

View File

@ -42,9 +42,9 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator;
* as an InternalScanner at the Store level, you will get runtime exceptions. * as an InternalScanner at the Store level, you will get runtime exceptions.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class KeyValueHeap extends NonLazyKeyValueScanner public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner { implements KeyValueScanner, InternalScanner {
private PriorityQueue<KeyValueScanner> heap = null; protected PriorityQueue<KeyValueScanner> heap = null;
/** /**
* The current sub-scanner, i.e. the one that contains the next key/value * The current sub-scanner, i.e. the one that contains the next key/value
@ -56,9 +56,9 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
* Bloom filter optimization, which is OK to propagate to StoreScanner. In * Bloom filter optimization, which is OK to propagate to StoreScanner. In
* order to ensure that, always use {@link #pollRealKV()} to update current. * order to ensure that, always use {@link #pollRealKV()} to update current.
*/ */
private KeyValueScanner current = null; protected KeyValueScanner current = null;
private KVScannerComparator comparator; protected KVScannerComparator comparator;
/** /**
* Constructor. This KeyValueHeap will handle closing of passed in * Constructor. This KeyValueHeap will handle closing of passed in
@ -68,7 +68,18 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
*/ */
public KeyValueHeap(List<? extends KeyValueScanner> scanners, public KeyValueHeap(List<? extends KeyValueScanner> scanners,
KVComparator comparator) throws IOException { KVComparator comparator) throws IOException {
this.comparator = new KVScannerComparator(comparator); this(scanners, new KVScannerComparator(comparator));
}
/**
* Constructor.
* @param scanners
* @param comparator
* @throws IOException
*/
KeyValueHeap(List<? extends KeyValueScanner> scanners,
KVScannerComparator comparator) throws IOException {
this.comparator = comparator;
if (!scanners.isEmpty()) { if (!scanners.isEmpty()) {
this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(), this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
this.comparator); this.comparator);
@ -158,8 +169,8 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
return next(result, -1); return next(result, -1);
} }
private static class KVScannerComparator implements Comparator<KeyValueScanner> { protected static class KVScannerComparator implements Comparator<KeyValueScanner> {
private KVComparator kvComparator; protected KVComparator kvComparator;
/** /**
* Constructor * Constructor
* @param kvComparator * @param kvComparator
@ -325,7 +336,7 @@ public class KeyValueHeap extends NonLazyKeyValueScanner
* this scanner heap if (1) it has done a real seek and (2) its KV is the top * this scanner heap if (1) it has done a real seek and (2) its KV is the top
* among all top KVs (some of which are fake) in the scanner heap. * among all top KVs (some of which are fake) in the scanner heap.
*/ */
private KeyValueScanner pollRealKV() throws IOException { protected KeyValueScanner pollRealKV() throws IOException {
KeyValueScanner kvScanner = heap.poll(); KeyValueScanner kvScanner = heap.poll();
if (kvScanner == null) { if (kvScanner == null) {
return null; return null;

View File

@ -123,4 +123,37 @@ public interface KeyValueScanner {
* assumed. * assumed.
*/ */
boolean isFileScanner(); boolean isFileScanner();
// Support for "Reversed Scanner"
/**
* Seek the scanner at or before the row of specified KeyValue, it firstly
* tries to seek the scanner at or after the specified KeyValue, return if
* peek KeyValue of scanner has the same row with specified KeyValue,
* otherwise seek the scanner at the first KeyValue of the row which is the
* previous row of specified KeyValue
*
* @param key seek KeyValue
* @return true if the scanner is at the valid KeyValue, false if such
* KeyValue does not exist
*
*/
public boolean backwardSeek(KeyValue key) throws IOException;
/**
* Seek the scanner at the first KeyValue of the row which is the previous row
* of specified key
* @param key seek value
* @return true if the scanner at the first valid KeyValue of previous row,
* false if not existing such KeyValue
*/
public boolean seekToPreviousRow(KeyValue key) throws IOException;
/**
* Seek the scanner at the first KeyValue of last row
*
* @return true if scanner has values left, false if the underlying data is
* empty
* @throws IOException
*/
public boolean seekToLastRow() throws IOException;
} }

View File

@ -682,6 +682,10 @@ public class MemStore implements HeapSize {
volatile MemStoreLAB allocatorAtCreation; volatile MemStoreLAB allocatorAtCreation;
volatile MemStoreLAB snapshotAllocatorAtCreation; volatile MemStoreLAB snapshotAllocatorAtCreation;
// A flag represents whether could stop skipping KeyValues for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingKVsIfNextRow = false;
private long readPoint; private long readPoint;
/* /*
@ -722,6 +726,7 @@ public class MemStore implements HeapSize {
} }
private KeyValue getNext(Iterator<KeyValue> it) { private KeyValue getNext(Iterator<KeyValue> it) {
KeyValue startKV = theNext;
KeyValue v = null; KeyValue v = null;
try { try {
while (it.hasNext()) { while (it.hasNext()) {
@ -729,6 +734,10 @@ public class MemStore implements HeapSize {
if (v.getMvccVersion() <= this.readPoint) { if (v.getMvccVersion() <= this.readPoint) {
return v; return v;
} }
if (stopSkippingKVsIfNextRow && startKV != null
&& comparator.compareRows(v, startKV) > 0) {
return null;
}
} }
return null; return null;
@ -907,6 +916,70 @@ public class MemStore implements HeapSize {
long oldestUnexpiredTS) { long oldestUnexpiredTS) {
return shouldSeek(scan, oldestUnexpiredTS); return shouldSeek(scan, oldestUnexpiredTS);
} }
/**
* Seek scanner to the given key first. If it returns false(means
* peek()==null) or scanner's peek row is bigger than row of given key, seek
* the scanner to the previous row of given key
*/
@Override
public synchronized boolean backwardSeek(KeyValue key) {
seek(key);
if (peek() == null || comparator.compareRows(peek(), key) > 0) {
return seekToPreviousRow(key);
}
return true;
}
/**
* Separately get the KeyValue before the specified key from kvset and
* snapshotset, and use the row of higher one as the previous row of
* specified key, then seek to the first KeyValue of previous row
*/
@Override
public synchronized boolean seekToPreviousRow(KeyValue key) {
KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
SortedSet<KeyValue> kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
SortedSet<KeyValue> snapshotHead = snapshotAtCreation
.headSet(firstKeyOnRow);
KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
.last();
KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
if (lastKVBeforeRow == null) {
theNext = null;
return false;
}
KeyValue firstKeyOnPreviousRow = KeyValue
.createFirstOnRow(lastKVBeforeRow.getRow());
this.stopSkippingKVsIfNextRow = true;
seek(firstKeyOnPreviousRow);
this.stopSkippingKVsIfNextRow = false;
if (peek() == null
|| comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
return seekToPreviousRow(lastKVBeforeRow);
}
return true;
}
@Override
public synchronized boolean seekToLastRow() {
KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
.last();
KeyValue second = snapshotAtCreation.isEmpty() ? null
: snapshotAtCreation.last();
KeyValue higherKv = getHighest(first, second);
if (higherKv == null) {
return false;
}
KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
if (seek(firstKvOnLastRow)) {
return true;
} else {
return seekToPreviousRow(higherKv);
}
}
} }
public final static long FIXED_OVERHEAD = ClassSize.align( public final static long FIXED_OVERHEAD = ClassSize.align(

View File

@ -0,0 +1,54 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
/**
* A "non-reversed & non-lazy" scanner which does not support backward scanning
* and always does a real seek operation. Most scanners are inherited from this
* class.
*/
@InterfaceAudience.Private
public abstract class NonReversedNonLazyKeyValueScanner extends
NonLazyKeyValueScanner {
@Override
public boolean backwardSeek(KeyValue key) throws IOException {
throw new NotImplementedException("backwardSeek must not be called on a "
+ "non-reversed scanner");
}
@Override
public boolean seekToPreviousRow(KeyValue key) throws IOException {
throw new NotImplementedException("seekToPreviousRow must not be called on a "
+ "non-reversed scanner");
}
@Override
public boolean seekToLastRow() throws IOException {
throw new NotImplementedException("seekToLastRow must not be called on a "
+ "non-reversed scanner");
}
}

View File

@ -0,0 +1,192 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
/**
* ReversedKeyValueHeap is used for supporting reversed scanning. Compared with
* KeyValueHeap, its scanner comparator is a little different (see
* ReversedKVScannerComparator), all seek is backward seek(see
* {@link KeyValueScanner#backwardSeek}), and it will jump to the previous row
* if it is already at the end of one row when calling next().
*/
@InterfaceAudience.Private
public class ReversedKeyValueHeap extends KeyValueHeap {
/**
* @param scanners
* @param comparator
* @throws IOException
*/
public ReversedKeyValueHeap(List<? extends KeyValueScanner> scanners,
KVComparator comparator) throws IOException {
super(scanners, new ReversedKVScannerComparator(comparator));
}
@Override
public boolean seek(KeyValue seekKey) throws IOException {
throw new IllegalStateException(
"seek cannot be called on ReversedKeyValueHeap");
}
@Override
public boolean reseek(KeyValue seekKey) throws IOException {
throw new IllegalStateException(
"reseek cannot be called on ReversedKeyValueHeap");
}
@Override
public boolean requestSeek(KeyValue key, boolean forward, boolean useBloom)
throws IOException {
throw new IllegalStateException(
"requestSeek cannot be called on ReversedKeyValueHeap");
}
@Override
public boolean seekToPreviousRow(KeyValue seekKey) throws IOException {
if (current == null) {
return false;
}
heap.add(current);
current = null;
KeyValueScanner scanner;
while ((scanner = heap.poll()) != null) {
KeyValue topKey = scanner.peek();
if (comparator.getComparator().compareRows(topKey.getBuffer(),
topKey.getRowOffset(), topKey.getRowLength(), seekKey.getBuffer(),
seekKey.getRowOffset(), seekKey.getRowLength()) < 0) {
// Row of Top KeyValue is before Seek row.
heap.add(scanner);
current = pollRealKV();
return current != null;
}
if (!scanner.seekToPreviousRow(seekKey)) {
scanner.close();
} else {
heap.add(scanner);
}
}
// Heap is returning empty, scanner is done
return false;
}
@Override
public boolean backwardSeek(KeyValue seekKey) throws IOException {
if (current == null) {
return false;
}
heap.add(current);
current = null;
KeyValueScanner scanner;
while ((scanner = heap.poll()) != null) {
KeyValue topKey = scanner.peek();
if ((comparator.getComparator().matchingRows(seekKey, topKey) && comparator
.getComparator().compare(seekKey, topKey) <= 0)
|| comparator.getComparator().compareRows(seekKey, topKey) > 0) {
heap.add(scanner);
current = pollRealKV();
return current != null;
}
if (!scanner.backwardSeek(seekKey)) {
scanner.close();
} else {
heap.add(scanner);
}
}
return false;
}
@Override
public KeyValue next() throws IOException {
if (this.current == null) {
return null;
}
KeyValue kvReturn = this.current.next();
KeyValue kvNext = this.current.peek();
if (kvNext == null
|| this.comparator.kvComparator.compareRows(kvNext, kvReturn) > 0) {
if (this.current.seekToPreviousRow(kvReturn)) {
this.heap.add(this.current);
} else {
this.current.close();
}
this.current = pollRealKV();
} else {
KeyValueScanner topScanner = this.heap.peek();
if (topScanner != null
&& this.comparator.compare(this.current, topScanner) > 0) {
this.heap.add(this.current);
this.current = pollRealKV();
}
}
return kvReturn;
}
/**
* In ReversedKVScannerComparator, we compare the row of scanners' peek values
* first, sort bigger one before the smaller one. Then compare the KeyValue if
* they have the equal row, sort smaller one before the bigger one
*/
private static class ReversedKVScannerComparator extends
KVScannerComparator {
/**
* Constructor
* @param kvComparator
*/
public ReversedKVScannerComparator(KVComparator kvComparator) {
super(kvComparator);
}
@Override
public int compare(KeyValueScanner left, KeyValueScanner right) {
int rowComparison = compareRows(left.peek(), right.peek());
if (rowComparison != 0) {
return -rowComparison;
}
return super.compare(left, right);
}
/**
* Compares rows of two KeyValue
* @param left
* @param right
* @return less than 0 if left is smaller, 0 if equal etc..
*/
public int compareRows(KeyValue left, KeyValue right) {
return super.kvComparator.compareRows(left, right);
}
}
@Override
public boolean seekToLastRow() throws IOException {
throw new NotImplementedException("Not implemented");
}
}

View File

@ -0,0 +1,81 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
/**
* ReversibleRegionScannerImpl extends from RegionScannerImpl, and is used to
* support reversed scanning.
*/
@InterfaceAudience.Private
class ReversedRegionScannerImpl extends RegionScannerImpl {
/**
* @param scan
* @param additionalScanners
* @param region
* @throws IOException
*/
ReversedRegionScannerImpl(Scan scan,
List<KeyValueScanner> additionalScanners, HRegion region)
throws IOException {
region.super(scan, additionalScanners, region);
}
@Override
protected void initializeKVHeap(List<KeyValueScanner> scanners,
List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
this.storeHeap = new ReversedKeyValueHeap(scanners, region.getComparator());
if (!joinedScanners.isEmpty()) {
this.joinedHeap = new ReversedKeyValueHeap(joinedScanners,
region.getComparator());
}
}
@Override
protected boolean isStopRow(byte[] currentRow, int offset, short length) {
return currentRow == null
|| (super.stopRow != null && region.getComparator().compareRows(
stopRow, 0, stopRow.length, currentRow, offset, length) >= super.isScan);
}
@Override
protected boolean nextRow(byte[] currentRow, int offset, short length)
throws IOException {
assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
byte row[] = new byte[length];
System.arraycopy(currentRow, offset, row, 0, length);
this.storeHeap.seekToPreviousRow(KeyValue.createFirstOnRow(row));
resetFilters();
// Calling the hook in CP which allows it to do a fast forward
if (this.region.getCoprocessorHost() != null) {
return this.region.getCoprocessorHost().postScannerFilterRow(this,
currentRow);
}
return true;
}
}

View File

@ -0,0 +1,145 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Scan;
/**
* ReversedStoreScanner extends from StoreScanner, and is used to support
* reversed scanning.
*/
@InterfaceAudience.Private
class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
* are not in a compaction.
*
* @param store who we scan
* @param scanInfo
* @param scan the spec
* @param columns which columns we are scanning
* @throws IOException
*/
ReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
NavigableSet<byte[]> columns, long readPt)
throws IOException {
super(store, scanInfo, scan, columns, readPt);
}
/** Constructor for testing. */
ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
final NavigableSet<byte[]> columns, final List<KeyValueScanner> scanners)
throws IOException {
super(scan, scanInfo, scanType, columns, scanners,
HConstants.LATEST_TIMESTAMP);
}
@Override
protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
KVComparator comparator) throws IOException {
// Combine all seeked scanners with a heap
heap = new ReversedKeyValueHeap(scanners, comparator);
}
@Override
protected void seekScanners(List<? extends KeyValueScanner> scanners,
KeyValue seekKey, boolean isLazy, boolean isParallelSeek)
throws IOException {
// Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the previous matching Row).
if (seekKey.matchingRow(HConstants.EMPTY_START_ROW)) {
for (KeyValueScanner scanner : scanners) {
scanner.seekToLastRow();
}
} else {
for (KeyValueScanner scanner : scanners) {
scanner.backwardSeek(seekKey);
}
}
}
@Override
protected boolean seekToNextRow(KeyValue kv) throws IOException {
return seekToPreviousRow(kv);
}
/**
* Do a backwardSeek in a reversed StoreScanner(scan backward)
*/
@Override
protected boolean seekAsDirection(KeyValue kv) throws IOException {
return backwardSeek(kv);
}
@Override
protected void checkScanOrder(KeyValue prevKV, KeyValue kv,
KeyValue.KVComparator comparator) throws IOException {
// Check that the heap gives us KVs in an increasing order for same row and
// decreasing order for different rows.
assert prevKV == null || comparator == null || comparator.compareRows(kv, prevKV) < 0
|| (comparator.matchingRows(kv, prevKV) && comparator.compare(kv,
prevKV) >= 0) : "Key " + prevKV
+ " followed by a " + "error order key " + kv + " in cf " + store
+ " in reversed scan";
}
@Override
public boolean reseek(KeyValue kv) throws IOException {
throw new IllegalStateException(
"reseek cannot be called on ReversedStoreScanner");
}
@Override
public boolean seek(KeyValue key) throws IOException {
throw new IllegalStateException(
"seek cannot be called on ReversedStoreScanner");
}
@Override
public boolean seekToPreviousRow(KeyValue key) throws IOException {
lock.lock();
try {
checkReseek();
return this.heap.seekToPreviousRow(key);
} finally {
lock.unlock();
}
}
@Override
public boolean backwardSeek(KeyValue key) throws IOException {
lock.lock();
try {
checkReseek();
return this.heap.backwardSeek(key);
} finally {
lock.unlock();
}
}
}

View File

@ -132,6 +132,8 @@ public class ScanQueryMatcher {
private final boolean isUserScan; private final boolean isUserScan;
private final boolean isReversed;
/** /**
* Construct a QueryMatcher for a scan * Construct a QueryMatcher for a scan
* @param scan * @param scan
@ -186,6 +188,7 @@ public class ScanQueryMatcher {
this.columns = new ExplicitColumnTracker(columns, this.columns = new ExplicitColumnTracker(columns,
scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS); scanInfo.getMinVersions(), maxVersions, oldestUnexpiredTS);
} }
this.isReversed = scan.isReversed();
} }
/** /**
@ -258,15 +261,24 @@ public class ScanQueryMatcher {
int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength, int ret = this.rowComparator.compareRows(row, this.rowOffset, this.rowLength,
bytes, offset, rowLength); bytes, offset, rowLength);
if (ret <= -1) { if (!this.isReversed) {
return MatchCode.DONE; if (ret <= -1) {
} else if (ret >= 1) { return MatchCode.DONE;
// could optimize this, if necessary? } else if (ret >= 1) {
// Could also be called SEEK_TO_CURRENT_ROW, but this // could optimize this, if necessary?
// should be rare/never happens. // Could also be called SEEK_TO_CURRENT_ROW, but this
return MatchCode.SEEK_NEXT_ROW; // should be rare/never happens.
return MatchCode.SEEK_NEXT_ROW;
}
} else {
if (ret <= -1) {
return MatchCode.SEEK_NEXT_ROW;
} else if (ret >= 1) {
return MatchCode.DONE;
}
} }
// optimize case. // optimize case.
if (this.stickyNextRow) if (this.stickyNextRow)
return MatchCode.SEEK_NEXT_ROW; return MatchCode.SEEK_NEXT_ROW;
@ -454,6 +466,14 @@ public class ScanQueryMatcher {
} }
public boolean moreRowsMayExistAfter(KeyValue kv) { public boolean moreRowsMayExistAfter(KeyValue kv) {
if (this.isReversed) {
if (rowComparator.compareRows(kv.getBuffer(), kv.getRowOffset(),
kv.getRowLength(), stopRow, 0, stopRow.length) <= 0) {
return false;
} else {
return true;
}
}
if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) && if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(), rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) { kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) {

View File

@ -1315,11 +1315,18 @@ public class StoreFile {
&& Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
return true; return true;
} }
KeyValue startKeyValue = KeyValue.createFirstOnRow(scan.getStartRow()); KeyValue smallestScanKeyValue = scan.isReversed() ? KeyValue
KeyValue stopKeyValue = KeyValue.createLastOnRow(scan.getStopRow()); .createFirstOnRow(scan.getStopRow()) : KeyValue.createFirstOnRow(scan
boolean nonOverLapping = (getComparator().compareFlatKey(this.getFirstKey(), .getStartRow());
stopKeyValue.getKey()) > 0 && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) KeyValue largestScanKeyValue = scan.isReversed() ? KeyValue
|| getComparator().compareFlatKey(this.getLastKey(), startKeyValue.getKey()) < 0; .createLastOnRow(scan.getStartRow()) : KeyValue.createLastOnRow(scan
.getStopRow());
boolean nonOverLapping = (getComparator().compareFlatKey(
this.getFirstKey(), largestScanKeyValue.getKey()) > 0 && !Bytes
.equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(),
HConstants.EMPTY_END_ROW))
|| getComparator().compareFlatKey(this.getLastKey(),
smallestScanKeyValue.getKey()) < 0;
return !nonOverLapping; return !nonOverLapping;
} }
@ -1426,6 +1433,10 @@ public class StoreFile {
return reader.getLastKey(); return reader.getLastKey();
} }
public byte[] getLastRowKey() {
return reader.getLastRowKey();
}
public byte[] midkey() throws IOException { public byte[] midkey() throws IOException {
return reader.midkey(); return reader.midkey();
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.util.Bytes;
/** /**
* KeyValueScanner adaptor over the Reader. It also provides hooks into * KeyValueScanner adaptor over the Reader. It also provides hooks into
@ -54,6 +55,9 @@ public class StoreFileScanner implements KeyValueScanner {
private boolean enforceMVCC = false; private boolean enforceMVCC = false;
private boolean hasMVCCInfo = false; private boolean hasMVCCInfo = false;
// A flag represents whether could stop skipping KeyValues for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingKVsIfNextRow = false;
private static AtomicLong seekCount; private static AtomicLong seekCount;
@ -186,11 +190,18 @@ public class StoreFileScanner implements KeyValueScanner {
protected boolean skipKVsNewerThanReadpoint() throws IOException { protected boolean skipKVsNewerThanReadpoint() throws IOException {
// We want to ignore all key-values that are newer than our current // We want to ignore all key-values that are newer than our current
// readPoint // readPoint
KeyValue startKV = cur;
while(enforceMVCC while(enforceMVCC
&& cur != null && cur != null
&& (cur.getMvccVersion() > readPt)) { && (cur.getMvccVersion() > readPt)) {
hfs.next(); hfs.next();
cur = hfs.getKeyValue(); cur = hfs.getKeyValue();
if (this.stopSkippingKVsIfNextRow
&& Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(),
cur.getRowLength(), startKV.getBuffer(), startKV.getRowOffset(),
startKV.getRowLength()) > 0) {
return false;
}
} }
if (cur == null) { if (cur == null) {
@ -389,4 +400,76 @@ public class StoreFileScanner implements KeyValueScanner {
return reader.passesTimerangeFilter(scan, oldestUnexpiredTS) return reader.passesTimerangeFilter(scan, oldestUnexpiredTS)
&& reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns); && reader.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, columns);
} }
@Override
public boolean seekToPreviousRow(KeyValue key) throws IOException {
try {
try {
KeyValue seekKey = KeyValue.createFirstOnRow(key.getRow());
if (seekCount != null) seekCount.incrementAndGet();
if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
seekKey.getKeyLength())) {
close();
return false;
}
KeyValue firstKeyOfPreviousRow = KeyValue.createFirstOnRow(hfs
.getKeyValue().getRow());
if (seekCount != null) seekCount.incrementAndGet();
if (!seekAtOrAfter(hfs, firstKeyOfPreviousRow)) {
close();
return false;
}
cur = hfs.getKeyValue();
this.stopSkippingKVsIfNextRow = true;
boolean resultOfSkipKVs;
try {
resultOfSkipKVs = skipKVsNewerThanReadpoint();
} finally {
this.stopSkippingKVsIfNextRow = false;
}
if (!resultOfSkipKVs
|| Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(),
cur.getRowLength(), firstKeyOfPreviousRow.getBuffer(),
firstKeyOfPreviousRow.getRowOffset(),
firstKeyOfPreviousRow.getRowLength()) > 0) {
return seekToPreviousRow(firstKeyOfPreviousRow);
}
return true;
} finally {
realSeekDone = true;
}
} catch (IOException ioe) {
throw new IOException("Could not seekToPreviousRow " + this + " to key "
+ key, ioe);
}
}
@Override
public boolean seekToLastRow() throws IOException {
byte[] lastRow = reader.getLastRowKey();
if (lastRow == null) {
return false;
}
KeyValue seekKey = KeyValue.createFirstOnRow(lastRow);
if (seek(seekKey)) {
return true;
} else {
return seekToPreviousRow(seekKey);
}
}
@Override
public boolean backwardSeek(KeyValue key) throws IOException {
seek(key);
if (cur == null
|| Bytes.compareTo(cur.getBuffer(), cur.getRowOffset(),
cur.getRowLength(), key.getBuffer(), key.getRowOffset(),
key.getRowLength()) > 0) {
return seekToPreviousRow(key);
}
return true;
}
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -48,7 +49,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* into List<KeyValue> for a single row. * into List<KeyValue> for a single row.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class StoreScanner extends NonLazyKeyValueScanner public class StoreScanner extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver { implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class); static final Log LOG = LogFactory.getLog(StoreScanner.class);
protected Store store; protected Store store;
@ -97,7 +98,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
// A flag whether use pread for scan // A flag whether use pread for scan
private boolean scanUsePread = false; private boolean scanUsePread = false;
private ReentrantLock lock = new ReentrantLock(); protected ReentrantLock lock = new ReentrantLock();
private final long readPt; private final long readPt;
@ -172,19 +173,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
// key does not exist, then to the start of the next matching Row). // key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete // Always check bloom filter to optimize the top row seek for delete
// family marker. // family marker.
if (explicitColumnQuery && lazySeekEnabledGlobally) { seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery
for (KeyValueScanner scanner : scanners) { && lazySeekEnabledGlobally, isParallelSeekEnabled);
scanner.requestSeek(matcher.getStartKey(), false, true);
}
} else {
if (!isParallelSeekEnabled) {
for (KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
} else {
parallelSeek(scanners, matcher.getStartKey());
}
}
// set storeLimit // set storeLimit
this.storeLimit = scan.getMaxResultsPerColumnFamily(); this.storeLimit = scan.getMaxResultsPerColumnFamily();
@ -193,7 +183,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
this.storeOffset = scan.getRowOffsetPerColumnFamily(); this.storeOffset = scan.getRowOffsetPerColumnFamily();
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.getComparator()); resetKVHeap(scanners, store.getComparator());
} }
/** /**
@ -249,16 +239,10 @@ public class StoreScanner extends NonLazyKeyValueScanner
scanners = selectScannersFrom(scanners); scanners = selectScannersFrom(scanners);
// Seek all scanners to the initial key // Seek all scanners to the initial key
if (!isParallelSeekEnabled) { seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
for (KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
} else {
parallelSeek(scanners, matcher.getStartKey());
}
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.getComparator()); resetKVHeap(scanners, store.getComparator());
} }
/** Constructor for testing. */ /** Constructor for testing. */
@ -295,14 +279,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
this.store.addChangedReaderObserver(this); this.store.addChangedReaderObserver(this);
} }
// Seek all scanners to the initial key // Seek all scanners to the initial key
if (!isParallelSeekEnabled) { seekScanners(scanners, matcher.getStartKey(), false, isParallelSeekEnabled);
for (KeyValueScanner scanner : scanners) { resetKVHeap(scanners, scanInfo.getComparator());
scanner.seek(matcher.getStartKey());
}
} else {
parallelSeek(scanners, matcher.getStartKey());
}
heap = new KeyValueHeap(scanners, scanInfo.getComparator());
} }
/** /**
@ -316,6 +294,42 @@ public class StoreScanner extends NonLazyKeyValueScanner
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt)); isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt));
} }
/**
* Seek the specified scanners with the given key
* @param scanners
* @param seekKey
* @param isLazy true if using lazy seek
* @param isParallelSeek true if using parallel seek
* @throws IOException
*/
protected void seekScanners(List<? extends KeyValueScanner> scanners,
KeyValue seekKey, boolean isLazy, boolean isParallelSeek)
throws IOException {
// Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete
// family marker.
if (isLazy) {
for (KeyValueScanner scanner : scanners) {
scanner.requestSeek(seekKey, false, true);
}
} else {
if (!isParallelSeek) {
for (KeyValueScanner scanner : scanners) {
scanner.seek(seekKey);
}
} else {
parallelSeek(scanners, seekKey);
}
}
}
protected void resetKVHeap(List<? extends KeyValueScanner> scanners,
KVComparator comparator) throws IOException {
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, comparator);
}
/** /**
* Filters the given list of scanners using Bloom filter, time range, and * Filters the given list of scanners using Bloom filter, time range, and
* TTL. * TTL.
@ -451,9 +465,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
int count = 0; int count = 0;
LOOP: while((kv = this.heap.peek()) != null) { LOOP: while((kv = this.heap.peek()) != null) {
if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap. if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
// Check that the heap gives us KVs in an increasing order. checkScanOrder(prevKV, kv, comparator);
assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
"Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
prevKV = kv; prevKV = kv;
ScanQueryMatcher.MatchCode qcode = matcher.match(kv); ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
@ -475,7 +487,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
if (!matcher.moreRowsMayExistAfter(kv)) { if (!matcher.moreRowsMayExistAfter(kv)) {
return false; return false;
} }
reseek(matcher.getKeyForNextRow(kv)); seekToNextRow(kv);
break LOOP; break LOOP;
} }
@ -490,9 +502,9 @@ public class StoreScanner extends NonLazyKeyValueScanner
if (!matcher.moreRowsMayExistAfter(kv)) { if (!matcher.moreRowsMayExistAfter(kv)) {
return false; return false;
} }
reseek(matcher.getKeyForNextRow(kv)); seekToNextRow(kv);
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
reseek(matcher.getKeyForNextColumn(kv)); seekAsDirection(matcher.getKeyForNextColumn(kv));
} else { } else {
this.heap.next(); this.heap.next();
} }
@ -516,11 +528,11 @@ public class StoreScanner extends NonLazyKeyValueScanner
return false; return false;
} }
reseek(matcher.getKeyForNextRow(kv)); seekToNextRow(kv);
break; break;
case SEEK_NEXT_COL: case SEEK_NEXT_COL:
reseek(matcher.getKeyForNextColumn(kv)); seekAsDirection(matcher.getKeyForNextColumn(kv));
break; break;
case SKIP: case SKIP:
@ -531,7 +543,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
// TODO convert resee to Cell? // TODO convert resee to Cell?
KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv)); KeyValue nextKV = KeyValueUtil.ensureKeyValue(matcher.getNextKeyHint(kv));
if (nextKV != null) { if (nextKV != null) {
reseek(nextKV); seekAsDirection(nextKV);
} else { } else {
heap.next(); heap.next();
} }
@ -619,16 +631,11 @@ public class StoreScanner extends NonLazyKeyValueScanner
* could have done it now by storing the scan object from the constructor */ * could have done it now by storing the scan object from the constructor */
List<KeyValueScanner> scanners = getScannersNoCompaction(); List<KeyValueScanner> scanners = getScannersNoCompaction();
if (!isParallelSeekEnabled) { // Seek all scanners to the initial key
for (KeyValueScanner scanner : scanners) { seekScanners(scanners, lastTopKey, false, isParallelSeekEnabled);
scanner.seek(lastTopKey);
}
} else {
parallelSeek(scanners, lastTopKey);
}
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.getComparator()); resetKVHeap(scanners, store.getComparator());
// Reset the state of the Query Matcher and set to top row. // Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the // Only reset and call setRow if the row changes; avoids confusing the
@ -648,6 +655,36 @@ public class StoreScanner extends NonLazyKeyValueScanner
} }
} }
/**
* Check whether scan as expected order
* @param prevKV
* @param kv
* @param comparator
* @throws IOException
*/
protected void checkScanOrder(KeyValue prevKV, KeyValue kv,
KeyValue.KVComparator comparator) throws IOException {
// Check that the heap gives us KVs in an increasing order.
assert prevKV == null || comparator == null
|| comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV
+ " followed by a " + "smaller key " + kv + " in cf " + store;
}
protected boolean seekToNextRow(KeyValue kv) throws IOException {
return reseek(matcher.getKeyForNextRow(kv));
}
/**
* Do a reseek in a normal StoreScanner(scan forward)
* @param kv
* @return true if scanner has values left, false if end of scanner
* @throws IOException
*/
protected boolean seekAsDirection(KeyValue kv)
throws IOException {
return reseek(kv);
}
@Override @Override
public boolean reseek(KeyValue kv) throws IOException { public boolean reseek(KeyValue kv) throws IOException {
lock.lock(); lock.lock();

View File

@ -26,14 +26,14 @@ import java.util.SortedSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.NonLazyKeyValueScanner; import org.apache.hadoop.hbase.regionserver.NonReversedNonLazyKeyValueScanner;
/** /**
* Utility scanner that wraps a sortable collection and serves * Utility scanner that wraps a sortable collection and serves
* as a KeyValueScanner. * as a KeyValueScanner.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class CollectionBackedScanner extends NonLazyKeyValueScanner { public class CollectionBackedScanner extends NonReversedNonLazyKeyValueScanner {
final private Iterable<KeyValue> data; final private Iterable<KeyValue> data;
final KeyValue.KVComparator comparator; final KeyValue.KVComparator comparator;
private Iterator<KeyValue> iter; private Iterator<KeyValue> iter;

View File

@ -2680,26 +2680,34 @@ public class TestFromClientSide {
} }
private void scanTestNull(HTable ht, byte [] row, byte [] family, private void scanTestNull(HTable ht, byte[] row, byte[] family, byte[] value)
byte [] value) throws Exception {
throws Exception { scanTestNull(ht, row, family, value, false);
}
private void scanTestNull(HTable ht, byte[] row, byte[] family, byte[] value,
boolean isReversedScan) throws Exception {
Scan scan = new Scan(); Scan scan = new Scan();
scan.setReversed(isReversedScan);
scan.addColumn(family, null); scan.addColumn(family, null);
Result result = getSingleScanResult(ht, scan); Result result = getSingleScanResult(ht, scan);
assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
scan = new Scan(); scan = new Scan();
scan.setReversed(isReversedScan);
scan.addColumn(family, HConstants.EMPTY_BYTE_ARRAY); scan.addColumn(family, HConstants.EMPTY_BYTE_ARRAY);
result = getSingleScanResult(ht, scan); result = getSingleScanResult(ht, scan);
assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
scan = new Scan(); scan = new Scan();
scan.setReversed(isReversedScan);
scan.addFamily(family); scan.addFamily(family);
result = getSingleScanResult(ht, scan); result = getSingleScanResult(ht, scan);
assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
scan = new Scan(); scan = new Scan();
scan.setReversed(isReversedScan);
result = getSingleScanResult(ht, scan); result = getSingleScanResult(ht, scan);
assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value); assertSingleResult(result, row, family, HConstants.EMPTY_BYTE_ARRAY, value);
@ -5271,4 +5279,473 @@ public class TestFromClientSide {
assertEquals(insertNum, count); assertEquals(insertNum, count);
} }
@Test
public void testSuperSimpleWithReverseScan() throws Exception {
byte[] TABLE = Bytes.toBytes("testSuperSimpleWithReverseScan");
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
Put put = new Put(Bytes.toBytes("0-b11111-0000000000000000000"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b11111-0000000000000000002"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b11111-0000000000000000004"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b11111-0000000000000000006"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b11111-0000000000000000008"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b22222-0000000000000000001"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b22222-0000000000000000003"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b22222-0000000000000000005"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b22222-0000000000000000007"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
put = new Put(Bytes.toBytes("0-b22222-0000000000000000009"));
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
ht.flushCommits();
Scan scan = new Scan(Bytes.toBytes("0-b11111-9223372036854775807"),
Bytes.toBytes("0-b11111-0000000000000000000"));
scan.setReversed(true);
ResultScanner scanner = ht.getScanner(scan);
Result result = scanner.next();
assertTrue(Bytes.equals(result.getRow(),
Bytes.toBytes("0-b11111-0000000000000000008")));
scanner.close();
ht.close();
}
@Test
public void testFiltersWithReverseScan() throws Exception {
byte[] TABLE = Bytes.toBytes("testFiltersWithReverseScan");
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
byte[][] ROWS = makeN(ROW, 10);
byte[][] QUALIFIERS = { Bytes.toBytes("col0-<d2v1>-<d3v2>"),
Bytes.toBytes("col1-<d2v1>-<d3v2>"),
Bytes.toBytes("col2-<d2v1>-<d3v2>"),
Bytes.toBytes("col3-<d2v1>-<d3v2>"),
Bytes.toBytes("col4-<d2v1>-<d3v2>"),
Bytes.toBytes("col5-<d2v1>-<d3v2>"),
Bytes.toBytes("col6-<d2v1>-<d3v2>"),
Bytes.toBytes("col7-<d2v1>-<d3v2>"),
Bytes.toBytes("col8-<d2v1>-<d3v2>"),
Bytes.toBytes("col9-<d2v1>-<d3v2>") };
for (int i = 0; i < 10; i++) {
Put put = new Put(ROWS[i]);
put.add(FAMILY, QUALIFIERS[i], VALUE);
ht.put(put);
}
Scan scan = new Scan();
scan.setReversed(true);
scan.addFamily(FAMILY);
Filter filter = new QualifierFilter(CompareOp.EQUAL,
new RegexStringComparator("col[1-5]"));
scan.setFilter(filter);
ResultScanner scanner = ht.getScanner(scan);
int expectedIndex = 5;
for (Result result : scanner) {
assertEquals(result.size(), 1);
assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[expectedIndex]));
assertTrue(Bytes.equals(result.raw()[0].getQualifier(),
QUALIFIERS[expectedIndex]));
expectedIndex--;
}
assertEquals(expectedIndex, 0);
scanner.close();
ht.close();
}
@Test
public void testKeyOnlyFilterWithReverseScan() throws Exception {
byte[] TABLE = Bytes.toBytes("testKeyOnlyFilterWithReverseScan");
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
byte[][] ROWS = makeN(ROW, 10);
byte[][] QUALIFIERS = { Bytes.toBytes("col0-<d2v1>-<d3v2>"),
Bytes.toBytes("col1-<d2v1>-<d3v2>"),
Bytes.toBytes("col2-<d2v1>-<d3v2>"),
Bytes.toBytes("col3-<d2v1>-<d3v2>"),
Bytes.toBytes("col4-<d2v1>-<d3v2>"),
Bytes.toBytes("col5-<d2v1>-<d3v2>"),
Bytes.toBytes("col6-<d2v1>-<d3v2>"),
Bytes.toBytes("col7-<d2v1>-<d3v2>"),
Bytes.toBytes("col8-<d2v1>-<d3v2>"),
Bytes.toBytes("col9-<d2v1>-<d3v2>") };
for (int i = 0; i < 10; i++) {
Put put = new Put(ROWS[i]);
put.add(FAMILY, QUALIFIERS[i], VALUE);
ht.put(put);
}
Scan scan = new Scan();
scan.setReversed(true);
scan.addFamily(FAMILY);
Filter filter = new KeyOnlyFilter(true);
scan.setFilter(filter);
ResultScanner scanner = ht.getScanner(scan);
int count = 0;
for (Result result : ht.getScanner(scan)) {
assertEquals(result.size(), 1);
assertEquals(result.raw()[0].getValueLength(), Bytes.SIZEOF_INT);
assertEquals(Bytes.toInt(result.raw()[0].getValue()), VALUE.length);
count++;
}
assertEquals(count, 10);
scanner.close();
ht.close();
}
/**
* Test simple table and non-existent row cases.
*/
@Test
public void testSimpleMissingWithReverseScan() throws Exception {
byte[] TABLE = Bytes.toBytes("testSimpleMissingWithReverseScan");
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
byte[][] ROWS = makeN(ROW, 4);
// Try to get a row on an empty table
Scan scan = new Scan();
scan.setReversed(true);
Result result = getSingleScanResult(ht, scan);
assertNullResult(result);
scan = new Scan(ROWS[0]);
scan.setReversed(true);
result = getSingleScanResult(ht, scan);
assertNullResult(result);
scan = new Scan(ROWS[0], ROWS[1]);
scan.setReversed(true);
result = getSingleScanResult(ht, scan);
assertNullResult(result);
scan = new Scan();
scan.setReversed(true);
scan.addFamily(FAMILY);
result = getSingleScanResult(ht, scan);
assertNullResult(result);
scan = new Scan();
scan.setReversed(true);
scan.addColumn(FAMILY, QUALIFIER);
result = getSingleScanResult(ht, scan);
assertNullResult(result);
// Insert a row
Put put = new Put(ROWS[2]);
put.add(FAMILY, QUALIFIER, VALUE);
ht.put(put);
// Make sure we can scan the row
scan = new Scan();
scan.setReversed(true);
result = getSingleScanResult(ht, scan);
assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
scan = new Scan(ROWS[3], ROWS[0]);
scan.setReversed(true);
result = getSingleScanResult(ht, scan);
assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
scan = new Scan(ROWS[2], ROWS[1]);
scan.setReversed(true);
result = getSingleScanResult(ht, scan);
assertSingleResult(result, ROWS[2], FAMILY, QUALIFIER, VALUE);
// Try to scan empty rows around it
// Introduced MemStore#shouldSeekForReverseScan to fix the following
scan = new Scan(ROWS[1]);
scan.setReversed(true);
result = getSingleScanResult(ht, scan);
assertNullResult(result);
ht.close();
}
@Test
public void testNullWithReverseScan() throws Exception {
byte[] TABLE = Bytes.toBytes("testNullWithReverseScan");
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
// Null qualifier (should work)
Put put = new Put(ROW);
put.add(FAMILY, null, VALUE);
ht.put(put);
scanTestNull(ht, ROW, FAMILY, VALUE, true);
Delete delete = new Delete(ROW);
delete.deleteColumns(FAMILY, null);
ht.delete(delete);
// Use a new table
byte[] TABLE2 = Bytes.toBytes("testNull2WithReverseScan");
ht = TEST_UTIL.createTable(TABLE2, FAMILY);
// Empty qualifier, byte[0] instead of null (should work)
put = new Put(ROW);
put.add(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE);
ht.put(put);
scanTestNull(ht, ROW, FAMILY, VALUE, true);
TEST_UTIL.flush();
scanTestNull(ht, ROW, FAMILY, VALUE, true);
delete = new Delete(ROW);
delete.deleteColumns(FAMILY, HConstants.EMPTY_BYTE_ARRAY);
ht.delete(delete);
// Null value
put = new Put(ROW);
put.add(FAMILY, QUALIFIER, null);
ht.put(put);
Scan scan = new Scan();
scan.setReversed(true);
scan.addColumn(FAMILY, QUALIFIER);
Result result = getSingleScanResult(ht, scan);
assertSingleResult(result, ROW, FAMILY, QUALIFIER, null);
ht.close();
}
@Test
public void testDeletesWithReverseScan() throws Exception {
byte[] TABLE = Bytes.toBytes("testDeletesWithReverseScan");
byte[][] ROWS = makeNAscii(ROW, 6);
byte[][] FAMILIES = makeNAscii(FAMILY, 3);
byte[][] VALUES = makeN(VALUE, 5);
long[] ts = { 1000, 2000, 3000, 4000, 5000 };
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES,
TEST_UTIL.getConfiguration(), 3);
Put put = new Put(ROW);
put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
put.add(FAMILIES[0], QUALIFIER, ts[1], VALUES[1]);
ht.put(put);
Delete delete = new Delete(ROW);
delete.deleteFamily(FAMILIES[0], ts[0]);
ht.delete(delete);
Scan scan = new Scan(ROW);
scan.setReversed(true);
scan.addFamily(FAMILIES[0]);
scan.setMaxVersions(Integer.MAX_VALUE);
Result result = getSingleScanResult(ht, scan);
assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1] },
new byte[][] { VALUES[1] }, 0, 0);
// Test delete latest version
put = new Put(ROW);
put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]);
put.add(FAMILIES[0], QUALIFIER, ts[2], VALUES[2]);
put.add(FAMILIES[0], QUALIFIER, ts[3], VALUES[3]);
put.add(FAMILIES[0], null, ts[4], VALUES[4]);
put.add(FAMILIES[0], null, ts[2], VALUES[2]);
put.add(FAMILIES[0], null, ts[3], VALUES[3]);
ht.put(put);
delete = new Delete(ROW);
delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4]
ht.delete(delete);
scan = new Scan(ROW);
scan.setReversed(true);
scan.addColumn(FAMILIES[0], QUALIFIER);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1],
ts[2], ts[3] }, new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2);
// Test for HBASE-1847
delete = new Delete(ROW);
delete.deleteColumn(FAMILIES[0], null);
ht.delete(delete);
// Cleanup null qualifier
delete = new Delete(ROW);
delete.deleteColumns(FAMILIES[0], null);
ht.delete(delete);
// Expected client behavior might be that you can re-put deleted values
// But alas, this is not to be. We can't put them back in either case.
put = new Put(ROW);
put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000
put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000
ht.put(put);
// The Scanner returns the previous values, the expected-naive-unexpected
// behavior
scan = new Scan(ROW);
scan.setReversed(true);
scan.addFamily(FAMILIES[0]);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
assertNResult(result, ROW, FAMILIES[0], QUALIFIER, new long[] { ts[1],
ts[2], ts[3] }, new byte[][] { VALUES[1], VALUES[2], VALUES[3] }, 0, 2);
// Test deleting an entire family from one row but not the other various
// ways
put = new Put(ROWS[0]);
put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]);
put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]);
put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]);
put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
ht.put(put);
put = new Put(ROWS[1]);
put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]);
put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]);
put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]);
put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
ht.put(put);
put = new Put(ROWS[2]);
put.add(FAMILIES[1], QUALIFIER, ts[0], VALUES[0]);
put.add(FAMILIES[1], QUALIFIER, ts[1], VALUES[1]);
put.add(FAMILIES[2], QUALIFIER, ts[2], VALUES[2]);
put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
ht.put(put);
delete = new Delete(ROWS[0]);
delete.deleteFamily(FAMILIES[2]);
ht.delete(delete);
delete = new Delete(ROWS[1]);
delete.deleteColumns(FAMILIES[1], QUALIFIER);
ht.delete(delete);
delete = new Delete(ROWS[2]);
delete.deleteColumn(FAMILIES[1], QUALIFIER);
delete.deleteColumn(FAMILIES[1], QUALIFIER);
delete.deleteColumn(FAMILIES[2], QUALIFIER);
ht.delete(delete);
scan = new Scan(ROWS[0]);
scan.setReversed(true);
scan.addFamily(FAMILIES[1]);
scan.addFamily(FAMILIES[2]);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
assertTrue("Expected 2 keys but received " + result.size(),
result.size() == 2);
assertNResult(result, ROWS[0], FAMILIES[1], QUALIFIER, new long[] { ts[0],
ts[1] }, new byte[][] { VALUES[0], VALUES[1] }, 0, 1);
scan = new Scan(ROWS[1]);
scan.setReversed(true);
scan.addFamily(FAMILIES[1]);
scan.addFamily(FAMILIES[2]);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
assertTrue("Expected 2 keys but received " + result.size(),
result.size() == 2);
scan = new Scan(ROWS[2]);
scan.setReversed(true);
scan.addFamily(FAMILIES[1]);
scan.addFamily(FAMILIES[2]);
scan.setMaxVersions(Integer.MAX_VALUE);
result = getSingleScanResult(ht, scan);
assertEquals(1, result.size());
assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
new long[] { ts[2] }, new byte[][] { VALUES[2] }, 0, 0);
// Test if we delete the family first in one row (HBASE-1541)
delete = new Delete(ROWS[3]);
delete.deleteFamily(FAMILIES[1]);
ht.delete(delete);
put = new Put(ROWS[3]);
put.add(FAMILIES[2], QUALIFIER, VALUES[0]);
ht.put(put);
put = new Put(ROWS[4]);
put.add(FAMILIES[1], QUALIFIER, VALUES[1]);
put.add(FAMILIES[2], QUALIFIER, VALUES[2]);
ht.put(put);
scan = new Scan(ROWS[4]);
scan.setReversed(true);
scan.addFamily(FAMILIES[1]);
scan.addFamily(FAMILIES[2]);
scan.setMaxVersions(Integer.MAX_VALUE);
ResultScanner scanner = ht.getScanner(scan);
result = scanner.next();
assertTrue("Expected 2 keys but received " + result.size(),
result.size() == 2);
assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[4]));
assertTrue(Bytes.equals(result.raw()[1].getRow(), ROWS[4]));
assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[1]));
assertTrue(Bytes.equals(result.raw()[1].getValue(), VALUES[2]));
result = scanner.next();
assertTrue("Expected 1 key but received " + result.size(),
result.size() == 1);
assertTrue(Bytes.equals(result.raw()[0].getRow(), ROWS[3]));
assertTrue(Bytes.equals(result.raw()[0].getValue(), VALUES[0]));
scanner.close();
ht.close();
}
/**
* Tests reversed scan under multi regions
*/
@Test
public void testReversedScanUnderMultiRegions() throws Exception {
// Test Initialization.
byte[] TABLE = Bytes.toBytes("testReversedScanUnderMultiRegions");
byte[] maxByteArray = ReversedClientScanner.MAX_BYTE_ARRAY;
byte[][] splitRows = new byte[][] { Bytes.toBytes("005"),
Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)),
Bytes.toBytes("006"),
Bytes.add(Bytes.toBytes("006"), Bytes.multiple(maxByteArray, 8)),
Bytes.toBytes("007"),
Bytes.add(Bytes.toBytes("007"), Bytes.multiple(maxByteArray, 4)),
Bytes.toBytes("008"), Bytes.multiple(maxByteArray, 2) };
HTable table = TEST_UTIL.createTable(TABLE, FAMILY, splitRows);
TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
assertEquals(splitRows.length + 1, table.getRegionLocations().size());
// Insert one row each region
int insertNum = splitRows.length;
for (int i = 0; i < insertNum; i++) {
Put put = new Put(splitRows[i]);
put.add(FAMILY, QUALIFIER, VALUE);
table.put(put);
}
// scan forward
ResultScanner scanner = table.getScanner(new Scan());
int count = 0;
for (Result r : scanner) {
assertTrue(!r.isEmpty());
count++;
}
assertEquals(insertNum, count);
// scan backward
Scan scan = new Scan();
scan.setReversed(true);
scanner = table.getScanner(scan);
count = 0;
byte[] lastRow = null;
for (Result r : scanner) {
assertTrue(!r.isEmpty());
count++;
byte[] thisRow = r.getRow();
if (lastRow != null) {
assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+ ",this row=" + Bytes.toString(thisRow),
Bytes.compareTo(thisRow, lastRow) < 0);
}
lastRow = thisRow;
}
assertEquals(insertNum, count);
table.close();
}
} }

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -314,6 +315,16 @@ public class TestFilter {
verifyScan(s, expectedRows, expectedKeys); verifyScan(s, expectedRows, expectedKeys);
} }
public void testPrefixFilterWithReverseScan() throws Exception {
// Grab rows from group one (half of total)
long expectedRows = this.numRows / 2;
long expectedKeys = this.colsPerRow;
Scan s = new Scan();
s.setReversed(true);
s.setFilter(new PrefixFilter(Bytes.toBytes("testRowOne")));
verifyScan(s, expectedRows, expectedKeys);
}
@Test @Test
public void testPageFilter() throws Exception { public void testPageFilter() throws Exception {
@ -401,6 +412,140 @@ public class TestFilter {
} }
public void testPageFilterWithReverseScan() throws Exception {
// KVs in first 6 rows
KeyValue[] expectedKVs = {
// testRowOne-0
new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
new KeyValue(ROWS_ONE[0], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
// testRowOne-2
new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
new KeyValue(ROWS_ONE[2], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
// testRowOne-3
new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[2], VALUES[0]),
new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[3], VALUES[0]),
new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[2], VALUES[0]),
new KeyValue(ROWS_ONE[3], FAMILIES[1], QUALIFIERS_ONE[3], VALUES[0]),
// testRowTwo-0
new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
new KeyValue(ROWS_TWO[0], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
// testRowTwo-2
new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
new KeyValue(ROWS_TWO[2], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]),
// testRowTwo-3
new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[2], VALUES[1]),
new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[3], VALUES[1]),
new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[0], VALUES[1]),
new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[2], VALUES[1]),
new KeyValue(ROWS_TWO[3], FAMILIES[1], QUALIFIERS_TWO[3], VALUES[1]) };
// Grab all 6 rows
long expectedRows = 6;
long expectedKeys = this.colsPerRow;
Scan s = new Scan();
s.setReversed(true);
s.setFilter(new PageFilter(expectedRows));
verifyScan(s, expectedRows, expectedKeys);
// Grab first 4 rows (6 cols per row)
expectedRows = 4;
expectedKeys = this.colsPerRow;
s = new Scan();
s.setReversed(true);
s.setFilter(new PageFilter(expectedRows));
verifyScan(s, expectedRows, expectedKeys);
// Grab first 2 rows
expectedRows = 2;
expectedKeys = this.colsPerRow;
s = new Scan();
s.setReversed(true);
s.setFilter(new PageFilter(expectedRows));
verifyScan(s, expectedRows, expectedKeys);
// Grab first row
expectedRows = 1;
expectedKeys = this.colsPerRow;
s = new Scan();
s.setReversed(true);
s.setFilter(new PageFilter(expectedRows));
verifyScan(s, expectedRows, expectedKeys);
}
public void testWhileMatchFilterWithFilterRowWithReverseScan()
throws Exception {
final int pageSize = 4;
Scan s = new Scan();
s.setReversed(true);
WhileMatchFilter filter = new WhileMatchFilter(new PageFilter(pageSize));
s.setFilter(filter);
InternalScanner scanner = this.region.getScanner(s);
int scannerCounter = 0;
while (true) {
boolean isMoreResults = scanner.next(new ArrayList<Cell>());
scannerCounter++;
if (scannerCounter >= pageSize) {
Assert.assertTrue(
"The WhileMatchFilter should now filter all remaining",
filter.filterAllRemaining());
}
if (!isMoreResults) {
break;
}
}
scanner.close();
Assert.assertEquals("The page filter returned more rows than expected",
pageSize, scannerCounter);
}
public void testWhileMatchFilterWithFilterRowKeyWithReverseScan()
throws Exception {
Scan s = new Scan();
String prefix = "testRowOne";
WhileMatchFilter filter = new WhileMatchFilter(new PrefixFilter(
Bytes.toBytes(prefix)));
s.setFilter(filter);
s.setReversed(true);
InternalScanner scanner = this.region.getScanner(s);
while (true) {
ArrayList<Cell> values = new ArrayList<Cell>();
boolean isMoreResults = scanner.next(values);
if (!isMoreResults
|| !Bytes.toString(values.get(0).getRow()).startsWith(prefix)) {
Assert.assertTrue(
"The WhileMatchFilter should now filter all remaining",
filter.filterAllRemaining());
}
if (!isMoreResults) {
break;
}
}
scanner.close();
}
/** /**
* Tests the the {@link WhileMatchFilter} works in combination with a * Tests the the {@link WhileMatchFilter} works in combination with a
* {@link Filter} that uses the * {@link Filter} that uses the

View File

@ -75,7 +75,10 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver {
Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s) Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s)
throws IOException { throws IOException {
HRegion r = c.getEnvironment().getRegion(); HRegion r = c.getEnvironment().getRegion();
return new StoreScanner(store, store.getScanInfo(), scan, targetCols, return scan.isReversed() ? new ReversedStoreScanner(store,
r.getReadpoint(scan.getIsolationLevel())); store.getScanInfo(), scan, targetCols, r.getReadpoint(scan
.getIsolationLevel())) : new StoreScanner(store,
store.getScanInfo(), scan, targetCols, r.getReadpoint(scan
.getIsolationLevel()));
} }
} }

View File

@ -775,6 +775,47 @@ public class TestCompaction {
thread.interruptIfNecessary(); thread.interruptIfNecessary();
} }
/**
* Test that on a major compaction, if all cells are expired or deleted, then we'll end up with no
* product. Make sure scanner over region returns right answer in this case - and that it just
* basically works.
* @throws IOException
*/
public void testMajorCompactingToNoOutputWithReverseScan() throws IOException {
createStoreFile(r);
for (int i = 0; i < compactionThreshold; i++) {
createStoreFile(r);
}
// Now delete everything.
Scan scan = new Scan();
scan.setReversed(true);
InternalScanner s = r.getScanner(scan);
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
assertTrue(!results.isEmpty());
r.delete(new Delete(results.get(0).getRow()));
if (!result) break;
} while (true);
s.close();
// Flush
r.flushcache();
// Major compact.
r.compactStores(true);
scan = new Scan();
scan.setReversed(true);
s = r.getScanner(scan);
int counter = 0;
do {
List<Cell> results = new ArrayList<Cell>();
boolean result = s.next(results);
if (!result) break;
counter++;
} while (true);
s.close();
assertEquals(0, counter);
}
private class StoreMockMaker extends StatefulStoreMockMaker { private class StoreMockMaker extends StatefulStoreMockMaker {
public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>(); public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>(); public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -4011,4 +4012,563 @@ public class TestHRegion {
assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts, assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
Bytes.toString(CellUtil.cloneValue(kv))); Bytes.toString(CellUtil.cloneValue(kv)));
} }
public void testReverseScanner_FromMemStore_SingleCF_Normal()
throws IOException {
byte[] rowC = Bytes.toBytes("rowC");
byte[] rowA = Bytes.toBytes("rowA");
byte[] rowB = Bytes.toBytes("rowB");
byte[] cf = Bytes.toBytes("CF");
byte[][] families = { cf };
byte[] col = Bytes.toBytes("C");
long ts = 1;
String method = this.getName();
this.region = initHRegion(tableName, method, families);
try {
KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
null);
KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
Put put = null;
put = new Put(rowC);
put.add(kv1);
put.add(kv11);
region.put(put);
put = new Put(rowA);
put.add(kv2);
region.put(put);
put = new Put(rowB);
put.add(kv3);
region.put(put);
Scan scan = new Scan(rowC);
scan.setMaxVersions(5);
scan.setReversed(true);
InternalScanner scanner = region.getScanner(scan);
List<Cell> currRow = new ArrayList<Cell>();
boolean hasNext = scanner.next(currRow);
assertEquals(2, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
assertFalse(hasNext);
scanner.close();
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
throws IOException {
byte[] rowC = Bytes.toBytes("rowC");
byte[] rowA = Bytes.toBytes("rowA");
byte[] rowB = Bytes.toBytes("rowB");
byte[] rowD = Bytes.toBytes("rowD");
byte[] cf = Bytes.toBytes("CF");
byte[][] families = { cf };
byte[] col = Bytes.toBytes("C");
long ts = 1;
String method = this.getName();
this.region = initHRegion(tableName, method, families);
try {
KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
null);
KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
Put put = null;
put = new Put(rowC);
put.add(kv1);
put.add(kv11);
region.put(put);
put = new Put(rowA);
put.add(kv2);
region.put(put);
put = new Put(rowB);
put.add(kv3);
region.put(put);
Scan scan = new Scan(rowD);
List<Cell> currRow = new ArrayList<Cell>();
scan.setReversed(true);
scan.setMaxVersions(5);
InternalScanner scanner = region.getScanner(scan);
boolean hasNext = scanner.next(currRow);
assertEquals(2, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
assertFalse(hasNext);
scanner.close();
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
public void testReverseScanner_FromMemStore_SingleCF_FullScan()
throws IOException {
byte[] rowC = Bytes.toBytes("rowC");
byte[] rowA = Bytes.toBytes("rowA");
byte[] rowB = Bytes.toBytes("rowB");
byte[] cf = Bytes.toBytes("CF");
byte[][] families = { cf };
byte[] col = Bytes.toBytes("C");
long ts = 1;
String method = this.getName();
this.region = initHRegion(tableName, method, families);
try {
KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
null);
KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
Put put = null;
put = new Put(rowC);
put.add(kv1);
put.add(kv11);
region.put(put);
put = new Put(rowA);
put.add(kv2);
region.put(put);
put = new Put(rowB);
put.add(kv3);
region.put(put);
Scan scan = new Scan();
List<Cell> currRow = new ArrayList<Cell>();
scan.setReversed(true);
InternalScanner scanner = region.getScanner(scan);
boolean hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
assertFalse(hasNext);
scanner.close();
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
// case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
byte[] rowA = Bytes.toBytes("rowA");
byte[] rowB = Bytes.toBytes("rowB");
byte[] rowC = Bytes.toBytes("rowC");
byte[] rowD = Bytes.toBytes("rowD");
byte[] rowE = Bytes.toBytes("rowE");
byte[] cf = Bytes.toBytes("CF");
byte[][] families = { cf };
byte[] col1 = Bytes.toBytes("col1");
byte[] col2 = Bytes.toBytes("col2");
long ts = 1;
String method = this.getName();
this.region = initHRegion(tableName, method, families);
try {
KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
Put put = null;
put = new Put(rowA);
put.add(kv1);
region.put(put);
put = new Put(rowB);
put.add(kv2);
region.put(put);
put = new Put(rowC);
put.add(kv3);
region.put(put);
put = new Put(rowD);
put.add(kv4_1);
region.put(put);
put = new Put(rowD);
put.add(kv4_2);
region.put(put);
put = new Put(rowE);
put.add(kv5);
region.put(put);
region.flushcache();
Scan scan = new Scan(rowD, rowA);
scan.addColumn(families[0], col1);
scan.setReversed(true);
List<Cell> currRow = new ArrayList<Cell>();
InternalScanner scanner = region.getScanner(scan);
boolean hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
assertFalse(hasNext);
scanner.close();
scan = new Scan(rowD, rowA);
scan.addColumn(families[0], col2);
scan.setReversed(true);
currRow.clear();
scanner = region.getScanner(scan);
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
scanner.close();
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
public void testReverseScanner_smaller_blocksize() throws IOException {
// case to ensure no conflict with HFile index optimization
byte[] rowA = Bytes.toBytes("rowA");
byte[] rowB = Bytes.toBytes("rowB");
byte[] rowC = Bytes.toBytes("rowC");
byte[] rowD = Bytes.toBytes("rowD");
byte[] rowE = Bytes.toBytes("rowE");
byte[] cf = Bytes.toBytes("CF");
byte[][] families = { cf };
byte[] col1 = Bytes.toBytes("col1");
byte[] col2 = Bytes.toBytes("col2");
long ts = 1;
String method = this.getName();
HBaseConfiguration config = new HBaseConfiguration();
config.setInt("test.block.size", 1);
this.region = initHRegion(tableName, method, config, families);
try {
KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
Put put = null;
put = new Put(rowA);
put.add(kv1);
region.put(put);
put = new Put(rowB);
put.add(kv2);
region.put(put);
put = new Put(rowC);
put.add(kv3);
region.put(put);
put = new Put(rowD);
put.add(kv4_1);
region.put(put);
put = new Put(rowD);
put.add(kv4_2);
region.put(put);
put = new Put(rowE);
put.add(kv5);
region.put(put);
region.flushcache();
Scan scan = new Scan(rowD, rowA);
scan.addColumn(families[0], col1);
scan.setReversed(true);
List<Cell> currRow = new ArrayList<Cell>();
InternalScanner scanner = region.getScanner(scan);
boolean hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
assertFalse(hasNext);
scanner.close();
scan = new Scan(rowD, rowA);
scan.addColumn(families[0], col2);
scan.setReversed(true);
currRow.clear();
scanner = region.getScanner(scan);
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
scanner.close();
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
throws IOException {
byte[] row0 = Bytes.toBytes("row0"); // 1 kv
byte[] row1 = Bytes.toBytes("row1"); // 2 kv
byte[] row2 = Bytes.toBytes("row2"); // 4 kv
byte[] row3 = Bytes.toBytes("row3"); // 2 kv
byte[] row4 = Bytes.toBytes("row4"); // 5 kv
byte[] row5 = Bytes.toBytes("row5"); // 2 kv
byte[] cf1 = Bytes.toBytes("CF1");
byte[] cf2 = Bytes.toBytes("CF2");
byte[] cf3 = Bytes.toBytes("CF3");
byte[][] families = { cf1, cf2, cf3 };
byte[] col = Bytes.toBytes("C");
long ts = 1;
String method = this.getName();
HBaseConfiguration conf = new HBaseConfiguration();
// disable compactions in this test.
conf.setInt("hbase.hstore.compactionThreshold", 10000);
this.region = initHRegion(tableName, method, conf, families);
try {
// kv naming style: kv(row number) totalKvCountInThisRow seq no
KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
null);
KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
null);
KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
KeyValue.Type.Put, null);
KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
null);
KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
null);
KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
null);
KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
KeyValue.Type.Put, null);
KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
null);
KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
KeyValue.Type.Put, null);
KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
null);
KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
null);
KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
KeyValue.Type.Put, null);
KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
null);
KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
KeyValue.Type.Put, null);
KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
null);
KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
null);
// hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
Put put = null;
put = new Put(row1);
put.add(kv1_2_1);
region.put(put);
put = new Put(row2);
put.add(kv2_4_1);
region.put(put);
put = new Put(row4);
put.add(kv4_5_4);
put.add(kv4_5_5);
region.put(put);
region.flushcache();
// hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
put = new Put(row4);
put.add(kv4_5_1);
put.add(kv4_5_3);
region.put(put);
put = new Put(row1);
put.add(kv1_2_2);
region.put(put);
put = new Put(row2);
put.add(kv2_4_4);
region.put(put);
region.flushcache();
// hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
put = new Put(row4);
put.add(kv4_5_2);
region.put(put);
put = new Put(row2);
put.add(kv2_4_2);
put.add(kv2_4_3);
region.put(put);
put = new Put(row3);
put.add(kv3_2_2);
region.put(put);
region.flushcache();
// memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
// ( 2 kv)
put = new Put(row0);
put.add(kv0_1_1);
region.put(put);
put = new Put(row3);
put.add(kv3_2_1);
region.put(put);
put = new Put(row5);
put.add(kv5_2_1);
put.add(kv5_2_2);
region.put(put);
// scan range = ["row4", min), skip the max "row5"
Scan scan = new Scan(row4);
scan.setMaxVersions(5);
scan.setBatch(3);
scan.setReversed(true);
InternalScanner scanner = region.getScanner(scan);
List<Cell> currRow = new ArrayList<Cell>();
boolean hasNext = false;
// 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
// included in scan range
// "row4" takes 2 next() calls since batch=3
hasNext = scanner.next(currRow);
assertEquals(3, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(2, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
assertTrue(hasNext);
// 2. scan out "row3" (2 kv)
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(2, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row3));
assertTrue(hasNext);
// 3. scan out "row2" (4 kvs)
// "row2" takes 2 next() calls since batch=3
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(3, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
assertTrue(hasNext);
// 4. scan out "row1" (2 kv)
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(2, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row1));
assertTrue(hasNext);
// 5. scan out "row0" (1 kv)
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row0));
assertFalse(hasNext);
scanner.close();
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
throws IOException {
byte[] row1 = Bytes.toBytes("row1");
byte[] row2 = Bytes.toBytes("row2");
byte[] row3 = Bytes.toBytes("row3");
byte[] row4 = Bytes.toBytes("row4");
byte[] cf1 = Bytes.toBytes("CF1");
byte[] cf2 = Bytes.toBytes("CF2");
byte[] cf3 = Bytes.toBytes("CF3");
byte[] cf4 = Bytes.toBytes("CF4");
byte[][] families = { cf1, cf2, cf3, cf4 };
byte[] col = Bytes.toBytes("C");
long ts = 1;
String method = this.getName();
HBaseConfiguration conf = new HBaseConfiguration();
// disable compactions in this test.
conf.setInt("hbase.hstore.compactionThreshold", 10000);
this.region = initHRegion(tableName, method, conf, families);
try {
KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
// storefile1
Put put = new Put(row1);
put.add(kv1);
region.put(put);
region.flushcache();
// storefile2
put = new Put(row2);
put.add(kv2);
region.put(put);
region.flushcache();
// storefile3
put = new Put(row3);
put.add(kv3);
region.put(put);
region.flushcache();
// memstore
put = new Put(row4);
put.add(kv4);
region.put(put);
// scan range = ["row4", min)
Scan scan = new Scan(row4);
scan.setReversed(true);
scan.setBatch(10);
InternalScanner scanner = region.getScanner(scan);
List<Cell> currRow = new ArrayList<Cell>();
boolean hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row3));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
assertTrue(hasNext);
currRow.clear();
hasNext = scanner.next(currRow);
assertEquals(1, currRow.size());
assertTrue(Bytes.equals(currRow.get(0).getRow(), row1));
assertFalse(hasNext);
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
}
}
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
families);
}
} }

View File

@ -0,0 +1,703 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
/**
* Test cases against ReversibleKeyValueScanner
*/
@Category(MediumTests.class)
public class TestReversibleScanners {
private static final Log LOG = LogFactory.getLog(TestReversibleScanners.class);
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
private static long TS = System.currentTimeMillis();
private static int MAXMVCC = 7;
private static byte[] ROW = Bytes.toBytes("testRow");
private static final int ROWSIZE = 200;
private static byte[][] ROWS = makeN(ROW, ROWSIZE);
private static byte[] QUAL = Bytes.toBytes("testQual");
private static final int QUALSIZE = 5;
private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
private static byte[] VALUE = Bytes.toBytes("testValue");
private static final int VALUESIZE = 3;
private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
@Test
public void testReversibleStoreFileScanner() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path hfilePath = new Path(new Path(
TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"),
"regionname"), "familyname");
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
HFileContextBuilder hcBuilder = new HFileContextBuilder();
hcBuilder.withBlockSize(2 * 1024);
HFileContext hFileContext = hcBuilder.build();
StoreFile.Writer writer = new StoreFile.WriterBuilder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
writeStoreFile(writer);
StoreFile sf = new StoreFile(fs, writer.getPath(),
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
List<StoreFileScanner> scanners = StoreFileScanner
.getScannersForStoreFiles(Collections.singletonList(sf), false, true,
false, Long.MAX_VALUE);
StoreFileScanner scanner = scanners.get(0);
seekTestOfReversibleKeyValueScanner(scanner);
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
scanners = StoreFileScanner.getScannersForStoreFiles(
Collections.singletonList(sf), false, true, false, readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
}
}
@Test
public void testReversibleMemstoreScanner() throws IOException {
MemStore memstore = new MemStore();
writeMemstore(memstore);
List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
seekTestOfReversibleKeyValueScanner(scanners.get(0));
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
scanners = memstore.getScanners(readPoint);
seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
}
}
@Test
public void testReversibleKeyValueHeap() throws IOException {
// write data to one memstore and two store files
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path hfilePath = new Path(new Path(
TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"),
"familyname");
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
HFileContextBuilder hcBuilder = new HFileContextBuilder();
hcBuilder.withBlockSize(2 * 1024);
HFileContext hFileContext = hcBuilder.build();
StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
MemStore memstore = new MemStore();
writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
writer2 });
StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
/**
* Test without MVCC
*/
int startRowNum = ROWSIZE / 2;
ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
ROWS[startRowNum], MAXMVCC);
internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
startRowNum = ROWSIZE - 1;
kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
HConstants.EMPTY_START_ROW, MAXMVCC);
internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
/**
* Test with MVCC
*/
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
startRowNum = ROWSIZE - 1;
kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
HConstants.EMPTY_START_ROW, readPoint);
for (int i = startRowNum; i >= 0; i--) {
if (i - 2 < 0) break;
i = i - 2;
kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1]));
Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
i, 0, readPoint);
if (nextReadableNum == null) break;
KeyValue expecedKey = makeKV(nextReadableNum.getFirst(),
nextReadableNum.getSecond());
assertEquals(expecedKey, kvHeap.peek());
i = nextReadableNum.getFirst();
int qualNum = nextReadableNum.getSecond();
if (qualNum + 1 < QUALSIZE) {
kvHeap.backwardSeek(makeKV(i, qualNum + 1));
nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
readPoint);
if (nextReadableNum == null) break;
expecedKey = makeKV(nextReadableNum.getFirst(),
nextReadableNum.getSecond());
assertEquals(expecedKey, kvHeap.peek());
i = nextReadableNum.getFirst();
qualNum = nextReadableNum.getSecond();
}
kvHeap.next();
if (qualNum + 1 >= QUALSIZE) {
nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0,
readPoint);
} else {
nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
readPoint);
}
if (nextReadableNum == null) break;
expecedKey = makeKV(nextReadableNum.getFirst(),
nextReadableNum.getSecond());
assertEquals(expecedKey, kvHeap.peek());
i = nextReadableNum.getFirst();
}
}
}
@Test
public void testReversibleStoreScanner() throws IOException {
// write data to one memstore and two store files
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path hfilePath = new Path(new Path(
TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"),
"familyname");
CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
HFileContextBuilder hcBuilder = new HFileContextBuilder();
hcBuilder.withBlockSize(2 * 1024);
HFileContext hFileContext = hcBuilder.build();
StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
hfilePath).withFileContext(hFileContext).build();
MemStore memstore = new MemStore();
writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
writer2 });
StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
ScanType scanType = ScanType.USER_SCAN;
ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE,
Long.MAX_VALUE, false, 0, KeyValue.COMPARATOR);
// Case 1.Test a full reversed scan
Scan scan = new Scan();
scan.setReversed(true);
StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2,
scan, scanType, scanInfo, MAXMVCC);
verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
// Case 2.Test reversed scan with a specified start row
int startRowNum = ROWSIZE / 2;
byte[] startRow = ROWS[startRowNum];
scan.setStartRow(startRow);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
scanType, scanInfo, MAXMVCC);
verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
startRowNum + 1, false);
// Case 3.Test reversed scan with a specified start row and specified
// qualifiers
assertTrue(QUALSIZE > 2);
scan.addColumn(FAMILYNAME, QUALS[0]);
scan.addColumn(FAMILYNAME, QUALS[2]);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
scanType, scanInfo, MAXMVCC);
verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
false);
// Case 4.Test reversed scan with mvcc based on case 3
for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
LOG.info("Setting read point to " + readPoint);
storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
scanType, scanInfo, readPoint);
int expectedRowCount = 0;
int expectedKVCount = 0;
for (int i = startRowNum; i >= 0; i--) {
int kvCount = 0;
if (makeMVCC(i, 0) <= readPoint) {
kvCount++;
}
if (makeMVCC(i, 2) <= readPoint) {
kvCount++;
}
if (kvCount > 0) {
expectedRowCount++;
expectedKVCount += kvCount;
}
}
verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount,
false);
}
}
@Test
public void testReversibleRegionScanner() throws IOException {
byte[] tableName = Bytes.toBytes("testtable");
byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
Configuration conf = HBaseConfiguration.create();
HRegion region = TEST_UTIL.createLocalHRegion(tableName, null, null,
"testReversibleRegionScanner", conf, false, Durability.SYNC_WAL, null,
FAMILYNAME, FAMILYNAME2);
loadDataToRegion(region, FAMILYNAME2);
// verify row count with forward scan
Scan scan = new Scan();
InternalScanner scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
// Case1:Full reversed scan
scan.setReversed(true);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
// Case2:Full reversed scan with one family
scan = new Scan();
scan.setReversed(true);
scan.addFamily(FAMILYNAME);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
// Case3:Specify qualifiers + One family
byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
for (byte[] specifiedQualifier : specifiedQualifiers)
scan.addColumn(FAMILYNAME, specifiedQualifier);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
// Case4:Specify qualifiers + Two families
for (byte[] specifiedQualifier : specifiedQualifiers)
scan.addColumn(FAMILYNAME2, specifiedQualifier);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
// Case5: Case4 + specify start row
int startRowNum = ROWSIZE * 3 / 4;
scan.setStartRow(ROWS[startRowNum]);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1),
false);
// Case6: Case4 + specify stop row
int stopRowNum = ROWSIZE / 4;
scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY);
scan.setStopRow(ROWS[stopRowNum]);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE
- stopRowNum - 1), false);
// Case7: Case4 + specify start row + specify stop row
scan.setStartRow(ROWS[startRowNum]);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2,
(startRowNum - stopRowNum), false);
// Case8: Case7 + SingleColumnValueFilter
int valueNum = startRowNum % VALUESIZE;
Filter filter = new SingleColumnValueFilter(FAMILYNAME,
specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]);
scan.setFilter(filter);
scanner = region.getScanner(scan);
int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE
+ (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum,
false);
// Case9: Case7 + PageFilter
int pageSize = 10;
filter = new PageFilter(pageSize);
scan.setFilter(filter);
scanner = region.getScanner(scan);
int expectedRowNum = pageSize;
verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
// Case10: Case7 + FilterList+MUST_PASS_ONE
SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(
FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]);
SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(
FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]);
expectedRowNum = 0;
for (int i = startRowNum; i > stopRowNum; i--) {
if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
expectedRowNum++;
}
}
filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
scan.setFilter(filter);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
// Case10: Case7 + FilterList+MUST_PASS_ALL
filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
expectedRowNum = 0;
scan.setFilter(filter);
scanner = region.getScanner(scan);
verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
}
private StoreScanner getReversibleStoreScanner(MemStore memstore,
StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType,
ScanInfo scanInfo, int readPoint) throws IOException {
List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null,
false, readPoint);
NavigableSet<byte[]> columns = null;
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
.entrySet()) {
// Should only one family
columns = entry.getValue();
}
StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo,
scanType, columns, scanners);
return storeScanner;
}
private void verifyCountAndOrder(InternalScanner scanner,
int expectedKVCount, int expectedRowCount, boolean forward)
throws IOException {
List<Cell> kvList = new ArrayList<Cell>();
Result lastResult = null;
int rowCount = 0;
int kvCount = 0;
try {
while (scanner.next(kvList)) {
if (kvList.isEmpty()) continue;
rowCount++;
kvCount += kvList.size();
if (lastResult != null) {
Result curResult = Result.create(kvList);
assertEquals("LastResult:" + lastResult + "CurResult:" + curResult,
forward,
Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
}
lastResult = Result.create(kvList);
kvList.clear();
}
} finally {
scanner.close();
}
if (!kvList.isEmpty()) {
rowCount++;
kvCount += kvList.size();
kvList.clear();
}
assertEquals(expectedKVCount, kvCount);
assertEquals(expectedRowCount, rowCount);
}
private void internalTestSeekAndNextForReversibleKeyValueHeap(
ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException {
// Test next and seek
for (int i = startRowNum; i >= 0; i--) {
if (i % 2 == 1 && i - 2 >= 0) {
i = i - 2;
kvHeap.seekToPreviousRow(KeyValue.createFirstOnRow(ROWS[i + 1]));
}
for (int j = 0; j < QUALSIZE; j++) {
if (j % 2 == 1 && (j + 1) < QUALSIZE) {
j = j + 1;
kvHeap.backwardSeek(makeKV(i, j));
}
assertEquals(makeKV(i, j), kvHeap.peek());
kvHeap.next();
}
}
assertEquals(null, kvHeap.peek());
}
private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore,
StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint)
throws IOException {
List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow,
true, readPoint);
ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners,
KeyValue.COMPARATOR);
return kvHeap;
}
private List<KeyValueScanner> getScanners(MemStore memstore, StoreFile sf1,
StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint)
throws IOException {
List<StoreFileScanner> fileScanners = StoreFileScanner
.getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
false, readPoint);
List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
fileScanners.size() + 1);
scanners.addAll(fileScanners);
scanners.addAll(memScanners);
if (doSeek) {
if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
for (KeyValueScanner scanner : scanners) {
scanner.seekToLastRow();
}
} else {
KeyValue startKey = KeyValue.createFirstOnRow(startRow);
for (KeyValueScanner scanner : scanners) {
scanner.backwardSeek(startKey);
}
}
}
return scanners;
}
private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner)
throws IOException {
/**
* Test without MVCC
*/
// Test seek to last row
assertTrue(scanner.seekToLastRow());
assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
// Test backward seek in three cases
// Case1: seek in the same row in backwardSeek
KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
assertTrue(scanner.backwardSeek(seekKey));
assertEquals(seekKey, scanner.peek());
// Case2: seek to the previous row in backwardSeek
int seekRowNum = ROWSIZE - 2;
assertTrue(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[seekRowNum])));
KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
assertEquals(expectedKey, scanner.peek());
// Case3: unable to backward seek
assertFalse(scanner.backwardSeek(KeyValue.createLastOnRow(ROWS[0])));
assertEquals(null, scanner.peek());
// Test seek to previous row
seekRowNum = ROWSIZE - 4;
assertTrue(scanner.seekToPreviousRow(KeyValue
.createFirstOnRow(ROWS[seekRowNum])));
expectedKey = makeKV(seekRowNum - 1, 0);
assertEquals(expectedKey, scanner.peek());
// Test seek to previous row for the first row
assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
assertEquals(null, scanner.peek());
}
private void seekTestOfReversibleKeyValueScannerWithMVCC(
KeyValueScanner scanner, int readPoint) throws IOException {
/**
* Test with MVCC
*/
// Test seek to last row
KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
ROWSIZE - 1, 0, readPoint);
assertEquals(expectedKey != null, scanner.seekToLastRow());
assertEquals(expectedKey, scanner.peek());
// Test backward seek in two cases
// Case1: seek in the same row in backwardSeek
expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
QUALSIZE - 2, readPoint);
assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey));
assertEquals(expectedKey, scanner.peek());
// Case2: seek to the previous row in backwardSeek
int seekRowNum = ROWSIZE - 3;
KeyValue seekKey = KeyValue.createLastOnRow(ROWS[seekRowNum]);
expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
readPoint);
assertEquals(expectedKey != null, scanner.backwardSeek(seekKey));
assertEquals(expectedKey, scanner.peek());
// Test seek to previous row
seekRowNum = ROWSIZE - 4;
expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
readPoint);
assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValue
.createFirstOnRow(ROWS[seekRowNum])));
assertEquals(expectedKey, scanner.peek());
}
private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,
int startQualNum, int readPoint) {
Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
startRowNum, startQualNum, readPoint);
if (nextReadableNum == null)
return null;
return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
}
private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(
int startRowNum, int startQualNum, int readPoint) {
Pair<Integer, Integer> nextReadableNum = null;
boolean findExpected = false;
for (int i = startRowNum; i >= 0; i--) {
for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
if (makeMVCC(i, j) <= readPoint) {
nextReadableNum = new Pair<Integer, Integer>(i, j);
findExpected = true;
break;
}
}
if (findExpected)
break;
}
return nextReadableNum;
}
private static void loadDataToRegion(HRegion region, byte[] additionalFamily)
throws IOException {
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
for (int j = 0; j < QUALSIZE; j++) {
put.add(makeKV(i, j));
// put additional family
put.add(makeKV(i, j, additionalFamily));
}
region.put(put);
if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
region.flushcache();
}
}
}
private static void writeMemstoreAndStoreFiles(MemStore memstore,
final StoreFile.Writer[] writers) throws IOException {
Random rand = new Random();
try {
for (int i = 0; i < ROWSIZE; i++) {
for (int j = 0; j < QUALSIZE; j++) {
if (i % 2 == 0) {
memstore.add(makeKV(i, j));
} else {
writers[(i + j) % writers.length].append(makeKV(i, j));
}
}
}
} finally {
for (int i = 0; i < writers.length; i++) {
writers[i].close();
}
}
}
private static void writeStoreFile(final StoreFile.Writer writer)
throws IOException {
try {
for (int i = 0; i < ROWSIZE; i++) {
for (int j = 0; j < QUALSIZE; j++) {
writer.append(makeKV(i, j));
}
}
} finally {
writer.close();
}
}
private static void writeMemstore(MemStore memstore) throws IOException {
// Add half of the keyvalues to memstore
for (int i = 0; i < ROWSIZE; i++) {
for (int j = 0; j < QUALSIZE; j++) {
if ((i + j) % 2 == 0) {
memstore.add(makeKV(i, j));
}
}
}
memstore.snapshot();
// Add another half of the keyvalues to snapshot
for (int i = 0; i < ROWSIZE; i++) {
for (int j = 0; j < QUALSIZE; j++) {
if ((i + j) % 2 == 1) {
memstore.add(makeKV(i, j));
}
}
}
}
private static KeyValue makeKV(int rowNum, int cqNum) {
return makeKV(rowNum, cqNum, FAMILYNAME);
}
private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
VALUES[rowNum % VALUESIZE]);
kv.setMvccVersion(makeMVCC(rowNum, cqNum));
return kv;
}
private static long makeMVCC(int rowNum, int cqNum) {
return (rowNum + cqNum) % (MAXMVCC + 1);
}
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
}
return ret;
}
}