HBASE-1537 Intra-row scanning

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@826464 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2009-10-18 15:32:45 +00:00
parent 6baaba0a2c
commit 9375e3c8b0
9 changed files with 230 additions and 20 deletions

View File

@ -121,8 +121,9 @@ Release 0.21.0 - Unreleased
(Kevin Patterson via Stack)
HBASE-1903 Enable DEBUG by default
HBASE-1907 Version all client writables
HBASE-1914 hlog should be able to set replication level for the log indendently
from any other files
HBASE-1914 hlog should be able to set replication level for the log
indendently from any other files
HBASE-1537 Intra-row scanning
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite

View File

@ -436,13 +436,16 @@ class TransactionState {
return next;
}
public boolean next(List<KeyValue> results) throws IOException {
public boolean next(List<KeyValue> results, int limit) throws IOException {
KeyValue peek = this.peek();
if (peek == null) {
return false;
}
byte [] row = peek.getRow();
results.add(peek);
if (limit > 0 && (results.size() == limit)) {
return true;
}
while (true){
if (this.peek() == null) {
break;
@ -451,10 +454,16 @@ class TransactionState {
break;
}
results.add(this.next());
if (limit > 0 && (results.size() == limit)) {
break;
}
}
return true;
return true;
}
public boolean next(List<KeyValue> results) throws IOException {
return next(results, -1);
}
}
}

View File

@ -66,6 +66,9 @@ import org.apache.hadoop.io.WritableFactories;
* To limit the number of versions of each column to be returned, execute
* {@link #setMaxVersions(int) setMaxVersions}.
* <p>
* To limit the maximum number of values returned for each call to next(),
* execute {@link #setBatch(int) setBatch}.
* <p>
* To add a filter, execute {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
* <p>
* Expert: To explicitly disable server-side block caching for this scan,
@ -77,6 +80,7 @@ public class Scan implements Writable {
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;
private int batch = -1;
private int caching = -1;
private boolean cacheBlocks = true;
private Filter filter = null;
@ -125,6 +129,7 @@ public class Scan implements Writable {
startRow = scan.getStartRow();
stopRow = scan.getStopRow();
maxVersions = scan.getMaxVersions();
batch = scan.getBatch();
caching = scan.getCaching();
cacheBlocks = scan.getCacheBlocks();
filter = scan.getFilter(); // clone?
@ -234,6 +239,14 @@ public class Scan implements Writable {
return this;
}
/**
* Set the maximum number of values to return for each call to next()
* @param batch the maximum number of values
*/
public void setBatch(int batch) {
this.batch = batch;
}
/**
* Set the number of rows for caching that will be passed to scanners.
* If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
@ -318,6 +331,13 @@ public class Scan implements Writable {
return this.maxVersions;
}
/**
* @return maximum number of values to return for a single call to next()
*/
public int getBatch() {
return this.batch;
}
/**
* @return caching the number of rows fetched when calling next on a scanner
*/
@ -381,6 +401,8 @@ public class Scan implements Writable {
sb.append(Bytes.toString(this.stopRow));
sb.append(", maxVersions=");
sb.append("" + this.maxVersions);
sb.append(", batch=");
sb.append("" + this.batch);
sb.append(", caching=");
sb.append("" + this.caching);
sb.append(", cacheBlocks=");
@ -444,6 +466,7 @@ public class Scan implements Writable {
this.startRow = Bytes.readByteArray(in);
this.stopRow = Bytes.readByteArray(in);
this.maxVersions = in.readInt();
this.batch = in.readInt();
this.caching = in.readInt();
this.cacheBlocks = in.readBoolean();
if(in.readBoolean()) {
@ -473,6 +496,7 @@ public class Scan implements Writable {
Bytes.writeByteArray(out, this.startRow);
Bytes.writeByteArray(out, this.stopRow);
out.writeInt(this.maxVersions);
out.writeInt(this.batch);
out.writeInt(this.caching);
out.writeBoolean(this.cacheBlocks);
if(this.filter == null) {

View File

@ -1679,9 +1679,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
private final byte [] stopRow;
private Filter filter;
private List<KeyValue> results = new ArrayList<KeyValue>();
private int batch;
RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
this.filter = scan.getFilter();
this.batch = scan.getBatch();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
this.stopRow = null;
} else {
@ -1711,14 +1714,14 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
public boolean next(List<KeyValue> outResults) throws IOException {
public boolean next(List<KeyValue> outResults, int limit) throws IOException {
if (closing.get() || closed.get()) {
close();
throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
" is closing=" + closing.get() + " or closed=" + closed.get());
}
results.clear();
boolean returnResult = nextInternal();
boolean returnResult = nextInternal(limit);
if (!returnResult && filter != null && filter.filterRow()) {
results.clear();
}
@ -1730,6 +1733,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
return returnResult;
}
public boolean next(List<KeyValue> outResults) throws IOException {
// apply the batching limit by default
return next(outResults, batch);
}
/*
* @return True if a filter rules the scanner is over, done.
*/
@ -1741,7 +1749,7 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
* @return true if there are more rows, false if scanner is done
* @throws IOException
*/
private boolean nextInternal() throws IOException {
private boolean nextInternal(int limit) throws IOException {
byte [] currentRow = null;
boolean filterCurrentRow = false;
while (true) {
@ -1774,7 +1782,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
currentRow = row;
continue;
}
this.storeHeap.next(results);
this.storeHeap.next(results, limit);
if (limit > 0 && results.size() == limit) {
return true;
}
}
}

View File

@ -46,12 +46,21 @@ public interface InternalScanner extends Closeable {
* @return true if more rows exist after this one, false if scanner is done
* @throws IOException
*/
public boolean next(List<KeyValue> results)
throws IOException;
public boolean next(List<KeyValue> results) throws IOException;
/**
* Grab the next row's worth of values with a limit on the number of values
* to return.
* @param results
* @param limit
* @return true if more rows exist after this one, false if scanner is done
* @throws IOException
*/
public boolean next(List<KeyValue> result, int limit) throws IOException;
/**
* Closes the scanner and releases any resources it has allocated
* @throws IOException
*/
public void close() throws IOException;
}
}

View File

@ -96,11 +96,13 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
* <p>
* This can ONLY be called when you are using Scanners that implement
* InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
* @param result
* @param limit
* @return true if there are more keys, false if all scanners are done
*/
public boolean next(List<KeyValue> result) throws IOException {
public boolean next(List<KeyValue> result, int limit) throws IOException {
InternalScanner currentAsInternal = (InternalScanner)this.current;
currentAsInternal.next(result);
currentAsInternal.next(result, limit);
KeyValue pee = this.current.peek();
if (pee == null) {
this.current.close();
@ -110,7 +112,21 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
this.current = this.heap.poll();
return (this.current != null);
}
/**
* Gets the next row of keys from the top-most scanner.
* <p>
* This method takes care of updating the heap.
* <p>
* This can ONLY be called when you are using Scanners that implement
* InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
* @param result
* @return true if there are more keys, false if all scanners are done
*/
public boolean next(List<KeyValue> result) throws IOException {
return next(result, -1);
}
private class KVScannerComparator implements Comparator<KeyValueScanner> {
private KVComparator kvComparator;
/**

View File

@ -170,6 +170,12 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca
return false;
}
@Override
public boolean next(List<KeyValue> results, int limit) throws IOException {
// should not use limits with minor compacting store scanner
return next(results);
}
public void close() {
heap.close();
}

View File

@ -146,9 +146,10 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
/**
* Get the next row of values from this Store.
* @param result
* @param limit
* @return true if there are more rows, false if scanner is done
*/
public synchronized boolean next(List<KeyValue> outResult) throws IOException {
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
@ -157,14 +158,17 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
matcher.setRow(peeked.getRow());
KeyValue kv;
List<KeyValue> results = new ArrayList<KeyValue>();
while((kv = this.heap.peek()) != null) {
LOOP: while((kv = this.heap.peek()) != null) {
QueryMatcher.MatchCode qcode = matcher.match(kv);
switch(qcode) {
case INCLUDE:
KeyValue next = this.heap.next();
results.add(next);
if (limit > 0 && (results.size() == limit)) {
break LOOP;
}
continue;
case DONE:
// copy jazz
outResult.addAll(results);
@ -209,6 +213,10 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
return false;
}
public synchronized boolean next(List<KeyValue> outResult) throws IOException {
return next(outResult, -1);
}
private List<KeyValueScanner> getStoreFileScanners() {
List<HFileScanner> s =
new ArrayList<HFileScanner>(this.store.getStorefilesCount());

View File

@ -0,0 +1,126 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
public class TestWideScanner extends HBaseTestCase {
private final Log LOG = LogFactory.getLog(this.getClass());
final int BATCH = 1000;
private MiniDFSCluster cluster = null;
private HRegion r;
static final HTableDescriptor TESTTABLEDESC =
new HTableDescriptor("testwidescan");
static {
TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
HConstants.FOREVER, false));
}
/** HRegionInfo for root region */
public static final HRegionInfo REGION_INFO =
new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY);
@Override
public void setUp() throws Exception {
cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
// Set the hbase.rootdir to be the home directory in mini dfs.
this.conf.set(HConstants.HBASE_DIR,
this.cluster.getFileSystem().getHomeDirectory().toString());
super.setUp();
}
private int addWideContent(HRegion region, byte[] family)
throws IOException {
int count = 0;
// add a few rows of 2500 columns (we'll use batch of 1000) to make things
// interesting
for (char c = 'a'; c <= 'c'; c++) {
byte[] row = Bytes.toBytes("ab" + c);
int i;
for (i = 0; i < 2500; i++) {
byte[] b = Bytes.toBytes(String.format("%10d", i));
Put put = new Put(row);
put.add(family, b, b);
region.put(put);
count++;
}
}
// add one row of 100,000 columns
{
byte[] row = Bytes.toBytes("abf");
int i;
for (i = 0; i < 100000; i++) {
byte[] b = Bytes.toBytes(String.format("%10d", i));
Put put = new Put(row);
put.add(family, b, b);
region.put(put);
count++;
}
}
return count;
}
public void testWideScanBatching() throws IOException {
try {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
int inserted = addWideContent(this.r, HConstants.CATALOG_FAMILY);
List<KeyValue> results = new ArrayList<KeyValue>();
Scan scan = new Scan();
scan.addFamily(HConstants.CATALOG_FAMILY);
scan.setBatch(BATCH);
InternalScanner s = r.getScanner(scan);
int total = 0;
int i = 0;
boolean more;
do {
more = s.next(results);
i++;
LOG.info("iteration #" + i + ", results.size=" + results.size());
// assert that the result set is no larger than BATCH
assertTrue(results.size() <= BATCH);
total += results.size();
if (results.size() > 0) {
// assert that all results are from the same row
byte[] row = results.get(0).getRow();
for (KeyValue kv: results) {
assertTrue(Bytes.equals(row, kv.getRow()));
}
}
results.clear();
} while (more);
// assert that the scanner returned all values
LOG.info("inserted " + inserted + ", scanned " + total);
assertTrue(total == inserted);
s.close();
} finally {
this.r.close();
this.r.getLog().closeAndDelete();
shutdownDfs(this.cluster);
}
}
}