HBASE-5520 Support reseek() at RegionScanner (Ram)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1303005 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ramkrishna 2012-03-20 17:10:50 +00:00
parent 45a15792f0
commit 2f58da0489
4 changed files with 141 additions and 4 deletions

View File

@ -3500,6 +3500,23 @@ public class HRegion implements HeapSize { // , Writable{
KeyValueHeap getStoreHeapForTesting() {
return storeHeap;
}
@Override
public synchronized boolean reseek(byte[] row) throws IOException {
if (row == null) {
throw new IllegalArgumentException("Row cannot be null.");
}
startRegionOperation();
try {
// This could be a new thread from the last time we called next().
MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
KeyValue kv = KeyValue.createFirstOnRow(row);
// use request seek to make use of the lazy seek option. See HBASE-5520
return this.storeHeap.requestSeek(kv, true, true);
} finally {
closeRegionOperation();
}
}
}
// Utility methods

View File

@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
@ -37,4 +39,17 @@ public interface RegionScanner extends InternalScanner {
* further rows.
*/
public boolean isFilterDone();
/**
* Do a reseek to the required row. Should not be used to seek to a key which
* may come before the current position. Always seeks to the beginning of a
* row boundary.
*
* @throws IOException
* @throws IllegalArgumentException
* if row is null
*
*/
public boolean reseek(byte[] row) throws IOException;
}

View File

@ -96,6 +96,11 @@ public class TestCoprocessorInterface extends HBaseTestCase {
return delegate.isFilterDone();
}
@Override
public boolean reseek(byte[] row) throws IOException {
return false;
}
}
public static class CoprocessorImpl extends BaseRegionObserver {

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@ -37,6 +36,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
@ -63,10 +63,24 @@ public class TestFilter extends HBaseTestCase {
Bytes.toBytes("testRowTwo-2"), Bytes.toBytes("testRowTwo-3")
};
private static final byte [][] ROWS_THREE = {
Bytes.toBytes("testRowThree-0"), Bytes.toBytes("testRowThree-1"),
Bytes.toBytes("testRowThree-2"), Bytes.toBytes("testRowThree-3")
};
private static final byte [][] ROWS_FOUR = {
Bytes.toBytes("testRowFour-0"), Bytes.toBytes("testRowFour-1"),
Bytes.toBytes("testRowFour-2"), Bytes.toBytes("testRowFour-3")
};
private static final byte [][] FAMILIES = {
Bytes.toBytes("testFamilyOne"), Bytes.toBytes("testFamilyTwo")
};
private static final byte [][] FAMILIES_1 = {
Bytes.toBytes("testFamilyThree"), Bytes.toBytes("testFamilyFour")
};
private static final byte [][] QUALIFIERS_ONE = {
Bytes.toBytes("testQualifierOne-0"), Bytes.toBytes("testQualifierOne-1"),
Bytes.toBytes("testQualifierOne-2"), Bytes.toBytes("testQualifierOne-3")
@ -77,10 +91,24 @@ public class TestFilter extends HBaseTestCase {
Bytes.toBytes("testQualifierTwo-2"), Bytes.toBytes("testQualifierTwo-3")
};
private static final byte [][] QUALIFIERS_THREE = {
Bytes.toBytes("testQualifierThree-0"), Bytes.toBytes("testQualifierThree-1"),
Bytes.toBytes("testQualifierThree-2"), Bytes.toBytes("testQualifierThree-3")
};
private static final byte [][] QUALIFIERS_FOUR = {
Bytes.toBytes("testQualifierFour-0"), Bytes.toBytes("testQualifierFour-1"),
Bytes.toBytes("testQualifierFour-2"), Bytes.toBytes("testQualifierFour-3")
};
private static final byte [][] VALUES = {
Bytes.toBytes("testValueOne"), Bytes.toBytes("testValueTwo")
};
byte [][] NEW_FAMILIES = {
Bytes.toBytes("f1"), Bytes.toBytes("f2")
};
private long numRows = ROWS_ONE.length + ROWS_TWO.length;
private long colsPerRow = FAMILIES.length * QUALIFIERS_ONE.length;
@ -90,6 +118,11 @@ public class TestFilter extends HBaseTestCase {
HTableDescriptor htd = new HTableDescriptor(getName());
htd.addFamily(new HColumnDescriptor(FAMILIES[0]));
htd.addFamily(new HColumnDescriptor(FAMILIES[1]));
htd.addFamily(new HColumnDescriptor(FAMILIES_1[0]));
htd.addFamily(new HColumnDescriptor(FAMILIES_1[1]));
htd.addFamily(new HColumnDescriptor(NEW_FAMILIES[0]));
htd.addFamily(new HColumnDescriptor(NEW_FAMILIES[1]));
htd.addFamily(new HColumnDescriptor(FAMILIES_1[1]));
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
this.region = HRegion.createHRegion(info, this.testDir, this.conf, htd);
@ -170,6 +203,73 @@ public class TestFilter extends HBaseTestCase {
super.tearDown();
}
public void testRegionScannerReseek() throws Exception {
// create new rows and column family to show how reseek works..
for (byte[] ROW : ROWS_THREE) {
Put p = new Put(ROW);
p.setWriteToWAL(true);
for (byte[] QUALIFIER : QUALIFIERS_THREE) {
p.add(FAMILIES[0], QUALIFIER, VALUES[0]);
}
this.region.put(p);
}
for (byte[] ROW : ROWS_FOUR) {
Put p = new Put(ROW);
p.setWriteToWAL(false);
for (byte[] QUALIFIER : QUALIFIERS_FOUR) {
p.add(FAMILIES[1], QUALIFIER, VALUES[1]);
}
this.region.put(p);
}
// Flush
this.region.flushcache();
// Insert second half (reverse families)
for (byte[] ROW : ROWS_THREE) {
Put p = new Put(ROW);
p.setWriteToWAL(false);
for (byte[] QUALIFIER : QUALIFIERS_THREE) {
p.add(FAMILIES[1], QUALIFIER, VALUES[0]);
}
this.region.put(p);
}
for (byte[] ROW : ROWS_FOUR) {
Put p = new Put(ROW);
p.setWriteToWAL(false);
for (byte[] QUALIFIER : QUALIFIERS_FOUR) {
p.add(FAMILIES[0], QUALIFIER, VALUES[1]);
}
this.region.put(p);
}
Scan s = new Scan();
// set a start row
s.setStartRow(ROWS_FOUR[1]);
RegionScanner scanner = region.getScanner(s);
// reseek to row three.
scanner.reseek(ROWS_THREE[1]);
List<KeyValue> results = new ArrayList<KeyValue>();
// the results should belong to ROWS_THREE[1]
scanner.next(results);
for (KeyValue keyValue : results) {
assertEquals("The rows with ROWS_TWO as row key should be appearing.",
Bytes.toString(keyValue.getRow()), Bytes.toString(ROWS_THREE[1]));
}
// again try to reseek to a value before ROWS_THREE[1]
scanner.reseek(ROWS_ONE[1]);
results = new ArrayList<KeyValue>();
// This time no seek would have been done to ROWS_ONE[1]
scanner.next(results);
for (KeyValue keyValue : results) {
assertFalse("Cannot rewind back to a value less than previous reseek.",
Bytes.toString(keyValue.getRow()).contains("testRowOne"));
}
}
public void testNoFilter() throws Exception {
// No filter
long expectedRows = this.numRows;
@ -608,7 +708,7 @@ public class TestFilter extends HBaseTestCase {
verifyScanNoEarlyOut(s, expectedRows, expectedKeys);
// Match all columns in second family
// look only in second group of rows
// look only in second group of rows
expectedRows = this.numRows / 2;
expectedKeys = this.colsPerRow / 2;
f = new FamilyFilter(CompareOp.GREATER,
@ -1345,7 +1445,7 @@ public class TestFilter extends HBaseTestCase {
assertFalse("Should not have returned whole value",
Bytes.equals(kv.getValue(), kvs[idx].getValue()));
if (useLen) {
assertEquals("Value in result is not SIZEOF_INT",
assertEquals("Value in result is not SIZEOF_INT",
kv.getValue().length, Bytes.SIZEOF_INT);
LOG.info("idx = " + idx + ", len=" + kvs[idx].getValueLength()
+ ", actual=" + Bytes.toInt(kv.getValue()));
@ -1353,7 +1453,7 @@ public class TestFilter extends HBaseTestCase {
kvs[idx].getValueLength(), Bytes.toInt(kv.getValue()) );
LOG.info("good");
} else {
assertEquals("Value in result is not empty",
assertEquals("Value in result is not empty",
kv.getValue().length, 0);
}
idx++;