HBASE-9999 Add support for small reverse scan
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1573949 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5ece5d8271
commit
78930691ca
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the scanner interface for the HBase client.
|
* Implements the scanner interface for the HBase client.
|
||||||
|
|
|
@ -153,21 +153,23 @@ public class ClientSmallScanner extends ClientScanner {
|
||||||
LOG.trace("Advancing internal small scanner to startKey at '"
|
LOG.trace("Advancing internal small scanner to startKey at '"
|
||||||
+ Bytes.toStringBinary(localStartKey) + "'");
|
+ Bytes.toStringBinary(localStartKey) + "'");
|
||||||
}
|
}
|
||||||
smallScanCallable = getSmallScanCallable(localStartKey, cacheNum);
|
smallScanCallable = getSmallScanCallable(
|
||||||
|
scan, getConnection(), getTable(), localStartKey, cacheNum);
|
||||||
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
|
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
|
||||||
this.scanMetrics.countOfRegions.incrementAndGet();
|
this.scanMetrics.countOfRegions.incrementAndGet();
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegionServerCallable<Result[]> getSmallScanCallable(
|
static RegionServerCallable<Result[]> getSmallScanCallable(
|
||||||
|
final Scan sc, HConnection connection, TableName table,
|
||||||
byte[] localStartKey, final int cacheNum) {
|
byte[] localStartKey, final int cacheNum) {
|
||||||
this.scan.setStartRow(localStartKey);
|
sc.setStartRow(localStartKey);
|
||||||
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
|
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
|
||||||
getConnection(), getTable(), scan.getStartRow()) {
|
connection, table, sc.getStartRow()) {
|
||||||
public Result[] call(int callTimeout) throws IOException {
|
public Result[] call(int callTimeout) throws IOException {
|
||||||
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
|
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
|
||||||
.getRegionInfo().getRegionName(), scan, cacheNum, true);
|
.getRegionInfo().getRegionName(), sc, cacheNum, true);
|
||||||
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
|
||||||
controller.setPriority(getTableName());
|
controller.setPriority(getTableName());
|
||||||
controller.setCallTimeout(callTimeout);
|
controller.setCallTimeout(callTimeout);
|
||||||
|
|
|
@ -742,15 +742,24 @@ public class HTable implements HTableInterface {
|
||||||
if (scan.getCaching() <= 0) {
|
if (scan.getCaching() <= 0) {
|
||||||
scan.setCaching(getScannerCaching());
|
scan.setCaching(getScannerCaching());
|
||||||
}
|
}
|
||||||
if (scan.isSmall() && !scan.isReversed()) {
|
|
||||||
return new ClientSmallScanner(getConfiguration(), scan, getName(),
|
if (scan.isReversed()) {
|
||||||
this.connection);
|
if (scan.isSmall()) {
|
||||||
} else if (scan.isReversed()) {
|
return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
|
||||||
return new ReversedClientScanner(getConfiguration(), scan, getName(),
|
this.connection);
|
||||||
this.connection);
|
} else {
|
||||||
|
return new ReversedClientScanner(getConfiguration(), scan, getName(),
|
||||||
|
this.connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (scan.isSmall()) {
|
||||||
|
return new ClientSmallScanner(getConfiguration(), scan, getName(),
|
||||||
|
this.connection, this.rpcCallerFactory);
|
||||||
|
} else {
|
||||||
|
return new ClientScanner(getConfiguration(), scan,
|
||||||
|
getName(), this.connection);
|
||||||
}
|
}
|
||||||
return new ClientScanner(getConfiguration(), scan,
|
|
||||||
getName(), this.connection);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A reversed client scanner which support backward scanning
|
* A reversed client scanner which support backward scanning
|
||||||
|
@ -114,6 +115,7 @@ public class ReversedClientScanner extends ClientScanner {
|
||||||
this.scanMetrics.countOfRegions.incrementAndGet();
|
this.scanMetrics.countOfRegions.incrementAndGet();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
ExceptionUtil.rethrowIfInterrupt(e);
|
||||||
close();
|
close();
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -151,7 +153,7 @@ public class ReversedClientScanner extends ClientScanner {
|
||||||
* @param row
|
* @param row
|
||||||
* @return a new byte array which is the closest front row of the specified one
|
* @return a new byte array which is the closest front row of the specified one
|
||||||
*/
|
*/
|
||||||
private byte[] createClosestRowBefore(byte[] row) {
|
protected byte[] createClosestRowBefore(byte[] row) {
|
||||||
if (row == null) {
|
if (row == null) {
|
||||||
throw new IllegalArgumentException("The passed row is empty");
|
throw new IllegalArgumentException("The passed row is empty");
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class ReversedScannerCallable extends ScannerCallable {
|
||||||
this.location = connection.getRegionLocation(tableName, row, reload);
|
this.location = connection.getRegionLocation(tableName, row, reload);
|
||||||
if (this.location == null) {
|
if (this.location == null) {
|
||||||
throw new IOException("Failed to find location, tableName="
|
throw new IOException("Failed to find location, tableName="
|
||||||
+ tableName + ", row=" + Bytes.toString(row) + ", reload="
|
+ tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
|
||||||
+ reload);
|
+ reload);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -6061,4 +6061,170 @@ public class TestFromClientSide {
|
||||||
assertEquals(insertNum, count);
|
assertEquals(insertNum, count);
|
||||||
table.close();
|
table.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests reversed scan under multi regions
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSmallReversedScanUnderMultiRegions() throws Exception {
|
||||||
|
// Test Initialization.
|
||||||
|
byte[] TABLE = Bytes.toBytes("testSmallReversedScanUnderMultiRegions");
|
||||||
|
byte[][] splitRows = new byte[][]{
|
||||||
|
Bytes.toBytes("000"), Bytes.toBytes("002"), Bytes.toBytes("004"),
|
||||||
|
Bytes.toBytes("006"), Bytes.toBytes("008"), Bytes.toBytes("010")};
|
||||||
|
HTable table = TEST_UTIL.createTable(TABLE, FAMILY, splitRows);
|
||||||
|
TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
|
||||||
|
|
||||||
|
assertEquals(splitRows.length + 1, table.getRegionLocations().size());
|
||||||
|
for (byte[] splitRow : splitRows) {
|
||||||
|
Put put = new Put(splitRow);
|
||||||
|
put.add(FAMILY, QUALIFIER, VALUE);
|
||||||
|
table.put(put);
|
||||||
|
|
||||||
|
byte[] nextRow = Bytes.copy(splitRow);
|
||||||
|
nextRow[nextRow.length - 1]++;
|
||||||
|
|
||||||
|
put = new Put(nextRow);
|
||||||
|
put.add(FAMILY, QUALIFIER, VALUE);
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
|
||||||
|
// scan forward
|
||||||
|
ResultScanner scanner = table.getScanner(new Scan());
|
||||||
|
int count = 0;
|
||||||
|
for (Result r : scanner) {
|
||||||
|
assertTrue(!r.isEmpty());
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
assertEquals(12, count);
|
||||||
|
|
||||||
|
reverseScanTest(table, false);
|
||||||
|
reverseScanTest(table, true);
|
||||||
|
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reverseScanTest(HTable table, boolean small) throws IOException {
|
||||||
|
// scan backward
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setReversed(true);
|
||||||
|
ResultScanner scanner = table.getScanner(scan);
|
||||||
|
int count = 0;
|
||||||
|
byte[] lastRow = null;
|
||||||
|
for (Result r : scanner) {
|
||||||
|
assertTrue(!r.isEmpty());
|
||||||
|
count++;
|
||||||
|
byte[] thisRow = r.getRow();
|
||||||
|
if (lastRow != null) {
|
||||||
|
assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
|
||||||
|
+ ",this row=" + Bytes.toString(thisRow),
|
||||||
|
Bytes.compareTo(thisRow, lastRow) < 0);
|
||||||
|
}
|
||||||
|
lastRow = thisRow;
|
||||||
|
}
|
||||||
|
assertEquals(12, count);
|
||||||
|
|
||||||
|
scan = new Scan();
|
||||||
|
scan.setSmall(small);
|
||||||
|
scan.setReversed(true);
|
||||||
|
scan.setStartRow(Bytes.toBytes("002"));
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
count = 0;
|
||||||
|
lastRow = null;
|
||||||
|
for (Result r : scanner) {
|
||||||
|
assertTrue(!r.isEmpty());
|
||||||
|
count++;
|
||||||
|
byte[] thisRow = r.getRow();
|
||||||
|
if (lastRow != null) {
|
||||||
|
assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
|
||||||
|
+ ",this row=" + Bytes.toString(thisRow),
|
||||||
|
Bytes.compareTo(thisRow, lastRow) < 0);
|
||||||
|
}
|
||||||
|
lastRow = thisRow;
|
||||||
|
}
|
||||||
|
assertEquals(3, count); // 000 001 002
|
||||||
|
|
||||||
|
scan = new Scan();
|
||||||
|
scan.setSmall(small);
|
||||||
|
scan.setReversed(true);
|
||||||
|
scan.setStartRow(Bytes.toBytes("002"));
|
||||||
|
scan.setStopRow(Bytes.toBytes("000"));
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
count = 0;
|
||||||
|
lastRow = null;
|
||||||
|
for (Result r : scanner) {
|
||||||
|
assertTrue(!r.isEmpty());
|
||||||
|
count++;
|
||||||
|
byte[] thisRow = r.getRow();
|
||||||
|
if (lastRow != null) {
|
||||||
|
assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
|
||||||
|
+ ",this row=" + Bytes.toString(thisRow),
|
||||||
|
Bytes.compareTo(thisRow, lastRow) < 0);
|
||||||
|
}
|
||||||
|
lastRow = thisRow;
|
||||||
|
}
|
||||||
|
assertEquals(2, count); // 001 002
|
||||||
|
|
||||||
|
scan = new Scan();
|
||||||
|
scan.setSmall(small);
|
||||||
|
scan.setReversed(true);
|
||||||
|
scan.setStartRow(Bytes.toBytes("001"));
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
count = 0;
|
||||||
|
lastRow = null;
|
||||||
|
for (Result r : scanner) {
|
||||||
|
assertTrue(!r.isEmpty());
|
||||||
|
count++;
|
||||||
|
byte[] thisRow = r.getRow();
|
||||||
|
if (lastRow != null) {
|
||||||
|
assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
|
||||||
|
+ ",this row=" + Bytes.toString(thisRow),
|
||||||
|
Bytes.compareTo(thisRow, lastRow) < 0);
|
||||||
|
}
|
||||||
|
lastRow = thisRow;
|
||||||
|
}
|
||||||
|
assertEquals(2, count); // 000 001
|
||||||
|
|
||||||
|
scan = new Scan();
|
||||||
|
scan.setSmall(small);
|
||||||
|
scan.setReversed(true);
|
||||||
|
scan.setStartRow(Bytes.toBytes("000"));
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
count = 0;
|
||||||
|
lastRow = null;
|
||||||
|
for (Result r : scanner) {
|
||||||
|
assertTrue(!r.isEmpty());
|
||||||
|
count++;
|
||||||
|
byte[] thisRow = r.getRow();
|
||||||
|
if (lastRow != null) {
|
||||||
|
assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
|
||||||
|
+ ",this row=" + Bytes.toString(thisRow),
|
||||||
|
Bytes.compareTo(thisRow, lastRow) < 0);
|
||||||
|
}
|
||||||
|
lastRow = thisRow;
|
||||||
|
}
|
||||||
|
assertEquals(1, count); // 000
|
||||||
|
|
||||||
|
scan = new Scan();
|
||||||
|
scan.setSmall(small);
|
||||||
|
scan.setReversed(true);
|
||||||
|
scan.setStartRow(Bytes.toBytes("006"));
|
||||||
|
scan.setStopRow(Bytes.toBytes("002"));
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
count = 0;
|
||||||
|
lastRow = null;
|
||||||
|
for (Result r : scanner) {
|
||||||
|
assertTrue(!r.isEmpty());
|
||||||
|
count++;
|
||||||
|
byte[] thisRow = r.getRow();
|
||||||
|
if (lastRow != null) {
|
||||||
|
assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
|
||||||
|
+ ",this row=" + Bytes.toString(thisRow),
|
||||||
|
Bytes.compareTo(thisRow, lastRow) < 0);
|
||||||
|
}
|
||||||
|
lastRow = thisRow;
|
||||||
|
}
|
||||||
|
assertEquals(4, count); // 003 004 005 006
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue