HBASE-9488 Improve performance for small scan

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1524272 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
zjushch 2013-09-18 01:40:20 +00:00
parent e7c5a38416
commit de3d34bd4a
12 changed files with 537 additions and 79 deletions

View File

@ -51,24 +51,24 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceStability.Stable
public class ClientScanner extends AbstractClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass());
private Scan scan;
private boolean closed = false;
protected Scan scan;
protected boolean closed = false;
// Current region scanner is against. Gets cleared if current region goes
// wonky: e.g. if it splits on us.
private HRegionInfo currentRegion = null;
protected HRegionInfo currentRegion = null;
private ScannerCallable callable = null;
private final LinkedList<Result> cache = new LinkedList<Result>();
private final int caching;
private long lastNext;
protected final LinkedList<Result> cache = new LinkedList<Result>();
protected final int caching;
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
private Result lastResult = null;
private ScanMetrics scanMetrics = null;
private final long maxScannerResultSize;
protected Result lastResult = null;
protected ScanMetrics scanMetrics = null;
protected final long maxScannerResultSize;
private final HConnection connection;
private final TableName tableName;
private final int scannerTimeout;
private boolean scanMetricsPublished = false;
private RpcRetryingCaller<Result []> caller;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result []> caller;
/**
* Create a new ClientScanner for the specified table. An HConnection will be
@ -168,6 +168,10 @@ public class ClientScanner extends AbstractClientScanner {
this.caller = rpcFactory.<Result[]> newCaller();
initializeScannerInConstruction();
}
protected void initializeScannerInConstruction() throws IOException{
// initialize the scanner
nextScanner(this.caching, false);
}
@ -198,7 +202,7 @@ public class ClientScanner extends AbstractClientScanner {
}
// returns true if the passed region endKey
private boolean checkScanStopRow(final byte [] endKey) {
protected boolean checkScanStopRow(final byte [] endKey) {
if (this.scan.getStopRow().length > 0) {
// there is a stop row, check to see if we are past it.
byte [] stopRow = scan.getStopRow();
@ -297,7 +301,7 @@ public class ClientScanner extends AbstractClientScanner {
*
* scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
*/
private void writeScanMetrics() {
protected void writeScanMetrics() {
if (this.scanMetrics == null || scanMetricsPublished) {
return;
}

View File

@ -0,0 +1,245 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ServiceException;
/**
* Client scanner for small scan. Generally, only one RPC is called to fetch the
* scan results, unless the results cross multiple regions or the row count of
* results excess the caching.
*
* For small scan, it will get better performance than {@link ClientScanner}
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ClientSmallScanner extends ClientScanner {
private final Log LOG = LogFactory.getLog(this.getClass());
private RegionServerCallable<Result[]> smallScanCallable = null;
// When fetching results from server, skip the first result if it has the same
// row with this one
private byte[] skipRowOfFirstResult = null;
/**
* Create a new ClientSmallScanner for the specified table. An HConnection
* will be retrieved using the passed Configuration. Note that the passed
* {@link Scan} 's start row maybe changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName) throws IOException {
this(conf, scan, tableName, HConnectionManager.getConnection(conf));
}
/**
* Create a new ClientSmallScanner for the specified table. An HConnection
* will be retrieved using the passed Configuration. Note that the passed
* {@link Scan} 's start row maybe changed.
* @param conf
* @param scan
* @param tableName
* @param connection
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, HConnection connection) throws IOException {
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
}
/**
* Create a new ShortClientScanner for the specified table Note that the
* passed {@link Scan}'s start row maybe changed changed.
*
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to rangeGet
* @param connection Connection identifying the cluster
* @param rpcFactory
* @throws IOException
*/
public ClientSmallScanner(final Configuration conf, final Scan scan,
final TableName tableName, HConnection connection,
RpcRetryingCallerFactory rpcFactory) throws IOException {
super(conf, scan, tableName, connection, rpcFactory);
}
@Override
protected void initializeScannerInConstruction() throws IOException {
// No need to initialize the scanner when constructing instance, do it when
// calling next(). Do nothing here.
}
/**
* Gets a scanner for following scan. Move to next region or continue from the
* last result or start from the start row.
* @param nbRows
* @param done true if Server-side says we're done scanning.
* @param currentRegionDone true if scan is over on current region
* @return true if has next scanner
* @throws IOException
*/
private boolean nextScanner(int nbRows, final boolean done,
boolean currentRegionDone) throws IOException {
// Where to start the next getter
byte[] localStartKey;
int cacheNum = nbRows;
skipRowOfFirstResult = null;
// if we're at end of table, close and return false to stop iterating
if (this.currentRegion != null && currentRegionDone) {
byte[] endKey = this.currentRegion.getEndKey();
if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY)
|| checkScanStopRow(endKey) || done) {
close();
if (LOG.isDebugEnabled()) {
LOG.debug("Finished with small scan at " + this.currentRegion);
}
return false;
}
localStartKey = endKey;
if (LOG.isDebugEnabled()) {
LOG.debug("Finished with region " + this.currentRegion);
}
} else if (this.lastResult != null) {
localStartKey = this.lastResult.getRow();
skipRowOfFirstResult = this.lastResult.getRow();
cacheNum++;
} else {
localStartKey = this.scan.getStartRow();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Advancing internal small scanner to startKey at '"
+ Bytes.toStringBinary(localStartKey) + "'");
}
smallScanCallable = getSmallScanCallable(localStartKey, cacheNum);
if (this.scanMetrics != null && skipRowOfFirstResult == null) {
this.scanMetrics.countOfRegions.incrementAndGet();
}
return true;
}
private RegionServerCallable<Result[]> getSmallScanCallable(
byte[] localStartKey, final int cacheNum) {
this.scan.setStartRow(localStartKey);
RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
getConnection(), getTable(), scan.getStartRow()) {
public Result[] call() throws IOException {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), scan, cacheNum, true);
ScanResponse response = null;
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
try {
response = getStub().scan(controller, request);
return ResponseConverter.getResults(controller.cellScanner(),
response);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return callable;
}
@Override
public Result next() throws IOException {
// If the scanner is closed and there's nothing left in the cache, next is a
// no-op.
if (cache.size() == 0 && this.closed) {
return null;
}
if (cache.size() == 0) {
Result[] values = null;
long remainingResultSize = maxScannerResultSize;
int countdown = this.caching;
boolean currentRegionDone = false;
// Values == null means server-side filter has determined we must STOP
while (remainingResultSize > 0 && countdown > 0
&& nextScanner(countdown, values == null, currentRegionDone)) {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = this.caller.callWithRetries(smallScanCallable);
this.currentRegion = smallScanCallable.getHRegionInfo();
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
- lastNext);
}
lastNext = currentTime;
if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) {
Result rs = values[i];
if (i == 0 && this.skipRowOfFirstResult != null
&& Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
// Skip the first result
continue;
}
cache.add(rs);
for (Cell kv : rs.rawCells()) {
remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
}
countdown--;
this.lastResult = rs;
}
}
currentRegionDone = countdown > 0;
}
}
if (cache.size() > 0) {
return cache.poll();
}
// if we exhausted this scanner before calling close, write out the scan
// metrics
writeScanMetrics();
return null;
}
@Override
public void close() {
if (!scanMetricsPublished) writeScanMetrics();
closed = true;
}
}

View File

@ -18,15 +18,31 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -35,6 +51,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@ -54,24 +71,8 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
/**
* <p>Used to communicate with a single HBase table.
@ -693,6 +694,10 @@ public class HTable implements HTableInterface {
if (scan.getCaching() <= 0) {
scan.setCaching(getScannerCaching());
}
if (scan.isSmall()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(),
this.connection);
}
return new ClientScanner(getConfiguration(), scan,
getName(), this.connection);
}

View File

@ -26,6 +26,7 @@ import java.net.SocketTimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
@ -141,4 +142,14 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> {
}
return sleep;
}
/**
* @return the HRegionInfo for the current region
*/
public HRegionInfo getHRegionInfo() {
if (this.location == null) {
return null;
}
return this.location.getRegionInfo();
}
}

View File

@ -116,6 +116,26 @@ public class Scan extends OperationWithAttributes {
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
private Boolean loadColumnFamiliesOnDemand = null;
/**
* Set it true for small scan to get better performance
*
* Small scan should use pread and big scan can use seek + read
*
* seek + read is fast but can cause two problem (1) resource contention (2)
* cause too much network io
*
* [89-fb] Using pread for non-compaction read request
* https://issues.apache.org/jira/browse/HBASE-7266
*
* On the other hand, if setting it true, we would do
* openScanner,next,closeScanner in one RPC call. It means the better
* performance for small scan. [HBASE-9488].
*
* Generally, if the scan range is within one data block(64KB), it could be
* considered as a small scan.
*/
private boolean small = false;
/**
* Create a Scan operation across all rows.
*/
@ -698,4 +718,36 @@ public class Scan extends OperationWithAttributes {
return attr == null ? IsolationLevel.READ_COMMITTED :
IsolationLevel.fromBytes(attr);
}
/**
* Set whether this scan is a small scan
* <p>
* Small scan should use pread and big scan can use seek + read
*
* seek + read is fast but can cause two problem (1) resource contention (2)
* cause too much network io
*
* [89-fb] Using pread for non-compaction read request
* https://issues.apache.org/jira/browse/HBASE-7266
*
* On the other hand, if setting it true, we would do
* openScanner,next,closeScanner in one RPC call. It means the better
* performance for small scan. [HBASE-9488].
*
* Generally, if the scan range is within one data block(64KB), it could be
* considered as a small scan.
*
* @param small
*/
public void setSmall(boolean small) {
this.small = small;
}
/**
* Get whether this scan is a small scan
* @return true if small scan
*/
public boolean isSmall() {
return small;
}
}

View File

@ -719,6 +719,9 @@ public final class ProtobufUtil {
if (scan.getMaxResultSize() > 0) {
scanBuilder.setMaxResultSize(scan.getMaxResultSize());
}
if (scan.isSmall()) {
scanBuilder.setSmall(scan.isSmall());
}
Boolean loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
if (loadColumnFamiliesOnDemand != null) {
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
@ -831,6 +834,9 @@ public final class ProtobufUtil {
if (proto.hasMaxResultSize()) {
scan.setMaxResultSize(proto.getMaxResultSize());
}
if (proto.hasSmall()) {
scan.setSmall(proto.getSmall());
}
for (NameBytesPair attribute: proto.getAttributeList()) {
scan.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}

View File

@ -14112,6 +14112,16 @@ public final class ClientProtos {
* </pre>
*/
boolean getLoadColumnFamiliesOnDemand();
// optional bool small = 14;
/**
* <code>optional bool small = 14;</code>
*/
boolean hasSmall();
/**
* <code>optional bool small = 14;</code>
*/
boolean getSmall();
}
/**
* Protobuf type {@code Scan}
@ -14262,6 +14272,11 @@ public final class ClientProtos {
loadColumnFamiliesOnDemand_ = input.readBool();
break;
}
case 112: {
bitField0_ |= 0x00000800;
small_ = input.readBool();
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -14576,6 +14591,22 @@ public final class ClientProtos {
return loadColumnFamiliesOnDemand_;
}
// optional bool small = 14;
public static final int SMALL_FIELD_NUMBER = 14;
private boolean small_;
/**
* <code>optional bool small = 14;</code>
*/
public boolean hasSmall() {
return ((bitField0_ & 0x00000800) == 0x00000800);
}
/**
* <code>optional bool small = 14;</code>
*/
public boolean getSmall() {
return small_;
}
private void initFields() {
column_ = java.util.Collections.emptyList();
attribute_ = java.util.Collections.emptyList();
@ -14590,6 +14621,7 @@ public final class ClientProtos {
storeLimit_ = 0;
storeOffset_ = 0;
loadColumnFamiliesOnDemand_ = false;
small_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -14660,6 +14692,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000400) == 0x00000400)) {
output.writeBool(13, loadColumnFamiliesOnDemand_);
}
if (((bitField0_ & 0x00000800) == 0x00000800)) {
output.writeBool(14, small_);
}
getUnknownFields().writeTo(output);
}
@ -14721,6 +14756,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(13, loadColumnFamiliesOnDemand_);
}
if (((bitField0_ & 0x00000800) == 0x00000800)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(14, small_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -14803,6 +14842,11 @@ public final class ClientProtos {
result = result && (getLoadColumnFamiliesOnDemand()
== other.getLoadColumnFamiliesOnDemand());
}
result = result && (hasSmall() == other.hasSmall());
if (hasSmall()) {
result = result && (getSmall()
== other.getSmall());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@ -14868,6 +14912,10 @@ public final class ClientProtos {
hash = (37 * hash) + LOAD_COLUMN_FAMILIES_ON_DEMAND_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
}
if (hasSmall()) {
hash = (37 * hash) + SMALL_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getSmall());
}
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@ -15034,6 +15082,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000800);
loadColumnFamiliesOnDemand_ = false;
bitField0_ = (bitField0_ & ~0x00001000);
small_ = false;
bitField0_ = (bitField0_ & ~0x00002000);
return this;
}
@ -15132,6 +15182,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000400;
}
result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
to_bitField0_ |= 0x00000800;
}
result.small_ = small_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -15233,6 +15287,9 @@ public final class ClientProtos {
if (other.hasLoadColumnFamiliesOnDemand()) {
setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
}
if (other.hasSmall()) {
setSmall(other.getSmall());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -16311,6 +16368,39 @@ public final class ClientProtos {
return this;
}
// optional bool small = 14;
private boolean small_ ;
/**
* <code>optional bool small = 14;</code>
*/
public boolean hasSmall() {
return ((bitField0_ & 0x00002000) == 0x00002000);
}
/**
* <code>optional bool small = 14;</code>
*/
public boolean getSmall() {
return small_;
}
/**
* <code>optional bool small = 14;</code>
*/
public Builder setSmall(boolean value) {
bitField0_ |= 0x00002000;
small_ = value;
onChanged();
return this;
}
/**
* <code>optional bool small = 14;</code>
*/
public Builder clearSmall() {
bitField0_ = (bitField0_ & ~0x00002000);
small_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:Scan)
}
@ -27657,7 +27747,7 @@ public final class ClientProtos {
"gion\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation" +
"\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(" +
"\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006resul" +
"t\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\325\002\n\004" +
"t\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\344\002\n\004" +
"Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattribu" +
"te\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003 " +
"\001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007.",
@ -27666,45 +27756,45 @@ public final class ClientProtos {
"\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_r" +
"esult_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024\n" +
"\014store_offset\030\014 \001(\r\022&\n\036load_column_famil" +
"ies_on_demand\030\r \001(\010\"\236\001\n\013ScanRequest\022 \n\006r" +
"egion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 " +
"\001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number" +
"_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n" +
"\rnext_call_seq\030\006 \001(\004\"y\n\014ScanResponse\022\030\n\020",
"cells_per_result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001" +
"(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022\030\n" +
"\007results\030\005 \003(\0132\007.Result\"\263\001\n\024BulkLoadHFil" +
"eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi" +
"er\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFileR" +
"equest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(" +
"\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030" +
"\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded" +
"\030\001 \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030" +
"\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_na",
"me\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Coprocessor" +
"ServiceRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
"pecifier\022%\n\004call\030\002 \002(\0132\027.CoprocessorServ" +
"iceCall\"]\n\032CoprocessorServiceResponse\022 \n" +
"\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value" +
"\030\002 \002(\0132\016.NameBytesPair\"B\n\013MultiAction\022 \n" +
"\010mutation\030\001 \001(\0132\016.MutationProto\022\021\n\003get\030\002" +
" \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005value\030\001 \001(" +
"\0132\007.Result\022!\n\texception\030\002 \001(\0132\016.NameByte" +
"sPair\"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.",
"RegionSpecifier\022\034\n\006action\030\002 \003(\0132\014.MultiA" +
"ction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035" +
"\n\006result\030\001 \003(\0132\r.ActionResult2\342\002\n\rClient" +
"Service\022 \n\003Get\022\013.GetRequest\032\014.GetRespons" +
"e\022/\n\010MultiGet\022\020.MultiGetRequest\032\021.MultiG" +
"etResponse\022)\n\006Mutate\022\016.MutateRequest\032\017.M" +
"utateResponse\022#\n\004Scan\022\014.ScanRequest\032\r.Sc" +
"anResponse\022>\n\rBulkLoadHFile\022\025.BulkLoadHF" +
"ileRequest\032\026.BulkLoadHFileResponse\022F\n\013Ex" +
"ecService\022\032.CoprocessorServiceRequest\032\033.",
"CoprocessorServiceResponse\022&\n\005Multi\022\r.Mu" +
"ltiRequest\032\016.MultiResponseBB\n*org.apache" +
".hadoop.hbase.protobuf.generatedB\014Client" +
"ProtosH\001\210\001\001\240\001\001"
"ies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\"\236\001\n\013S" +
"canRequest\022 \n\006region\030\001 \001(\0132\020.RegionSpeci" +
"fier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscanner_id\030" +
"\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_s" +
"canner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\"y\n\014S",
"canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
"\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
"\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\"\263" +
"\001\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132" +
"\020.RegionSpecifier\0225\n\013family_path\030\002 \003(\0132 " +
".BulkLoadHFileRequest.FamilyPath\022\026\n\016assi" +
"gn_seq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family" +
"\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRes" +
"ponse\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServ" +
"iceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002",
"(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" +
"\"d\n\031CoprocessorServiceRequest\022 \n\006region\030" +
"\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027." +
"CoprocessorServiceCall\"]\n\032CoprocessorSer" +
"viceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSpe" +
"cifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"B\n" +
"\013MultiAction\022 \n\010mutation\030\001 \001(\0132\016.Mutatio" +
"nProto\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResul" +
"t\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texception\030\002" +
" \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n\006",
"region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action" +
"\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\r" +
"MultiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionRe" +
"sult2\342\002\n\rClientService\022 \n\003Get\022\013.GetReque" +
"st\032\014.GetResponse\022/\n\010MultiGet\022\020.MultiGetR" +
"equest\032\021.MultiGetResponse\022)\n\006Mutate\022\016.Mu" +
"tateRequest\032\017.MutateResponse\022#\n\004Scan\022\014.S" +
"canRequest\032\r.ScanResponse\022>\n\rBulkLoadHFi" +
"le\022\025.BulkLoadHFileRequest\032\026.BulkLoadHFil" +
"eResponse\022F\n\013ExecService\022\032.CoprocessorSe",
"rviceRequest\032\033.CoprocessorServiceRespons" +
"e\022&\n\005Multi\022\r.MultiRequest\032\016.MultiRespons" +
"eBB\n*org.apache.hadoop.hbase.protobuf.ge" +
"neratedB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -27794,7 +27884,7 @@ public final class ClientProtos {
internal_static_Scan_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Scan_descriptor,
new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", });
new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "Small", });
internal_static_ScanRequest_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_ScanRequest_fieldAccessorTable = new

View File

@ -236,6 +236,7 @@ message Scan {
optional uint32 store_limit = 11;
optional uint32 store_offset = 12;
optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */
optional bool small = 14;
}
/**

View File

@ -855,8 +855,8 @@ public class HStore implements Store {
* @return all scanners for this store
*/
@Override
public List<KeyValueScanner> getScanners(boolean cacheBlocks,
boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
byte[] stopRow) throws IOException {
Collection<StoreFile> storeFilesToScan;
List<KeyValueScanner> memStoreScanners;
@ -875,7 +875,7 @@ public class HStore implements Store {
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, isGet, isCompaction, matcher);
.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);

View File

@ -83,6 +83,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
* the line).
* @param cacheBlocks
* @param isGet
* @param usePread
* @param isCompaction
* @param matcher
* @param startRow
@ -92,6 +93,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
List<KeyValueScanner> getScanners(
boolean cacheBlocks,
boolean isGet,
boolean usePread,
boolean isCompaction,
ScanQueryMatcher matcher,
byte[] startRow,

View File

@ -93,6 +93,9 @@ public class StoreScanner extends NonLazyKeyValueScanner
// if heap == null and lastTop != null, you need to reseek given the key below
protected KeyValue lastTop = null;
// A flag whether use pread for scan
private boolean scanUsePread = false;
/** An internal constructor. */
protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
final NavigableSet<byte[]> columns, long ttl, int minVersions) {
@ -111,6 +114,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
// for multi-row (non-"get") scans because this is not done in
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
this.scanUsePread = scan.isSmall();
// The parallel-seeking is on :
// 1) the config value is *true*
// 2) store has more than one store file
@ -276,7 +280,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
*/
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
final boolean isCompaction = false;
return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
boolean usePread = isGet || scanUsePread;
return selectScannersFrom(store.getScanners(cacheBlocks, isGet, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow()));
}

View File

@ -5235,4 +5235,41 @@ public class TestFromClientSide {
table.close();
TEST_UTIL.deleteTable(TABLE);
}
@Test
public void testSmallScan() throws Exception {
// Test Initialization.
byte[] TABLE = Bytes.toBytes("testSmallScan");
HTable table = TEST_UTIL.createTable(TABLE, FAMILY);
// Insert one row each region
int insertNum = 10;
for (int i = 0; i < 10; i++) {
Put put = new Put(Bytes.toBytes("row" + String.format("%03d", i)));
put.add(FAMILY, QUALIFIER, VALUE);
table.put(put);
}
// nomal scan
ResultScanner scanner = table.getScanner(new Scan());
int count = 0;
for (Result r : scanner) {
assertTrue(!r.isEmpty());
count++;
}
assertEquals(insertNum, count);
// small scan
Scan scan = new Scan(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
scan.setSmall(true);
scan.setCaching(2);
scanner = table.getScanner(scan);
count = 0;
for (Result r : scanner) {
assertTrue(!r.isEmpty());
count++;
}
assertEquals(insertNum, count);
}
}