HBASE-9129 Scanner prefetching leaks scanners.
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1511110 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2581ebb0b2
commit
d20edb96dc
|
@ -149,7 +149,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
this.caller = rpcFactory.<Result[]> newCaller();
|
||||
|
||||
// initialize the scanner
|
||||
nextScanner(false);
|
||||
nextScanner(this.caching, false);
|
||||
}
|
||||
|
||||
protected HConnection getConnection() {
|
||||
|
@ -190,9 +190,10 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
* scanner at the scan.getStartRow(). We will go no further, just tidy
|
||||
* up outstanding scanners, if <code>currentRegion != null</code> and
|
||||
* <code>done</code> is true.
|
||||
* @param nbRows
|
||||
* @param done Server-side says we're done scanning.
|
||||
*/
|
||||
private boolean nextScanner(final boolean done)
|
||||
private boolean nextScanner(int nbRows, final boolean done)
|
||||
throws IOException {
|
||||
// Close the previous scanner if it's open
|
||||
if (this.callable != null) {
|
||||
|
@ -231,7 +232,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
Bytes.toStringBinary(localStartKey) + "'");
|
||||
}
|
||||
try {
|
||||
callable = getScannerCallable(localStartKey);
|
||||
callable = getScannerCallable(localStartKey, nbRows);
|
||||
// Open a scanner on the region server starting at the
|
||||
// beginning of the region
|
||||
this.caller.callWithRetries(callable);
|
||||
|
@ -246,11 +247,12 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
return true;
|
||||
}
|
||||
|
||||
protected ScannerCallable getScannerCallable(byte [] localStartKey) {
|
||||
protected ScannerCallable getScannerCallable(byte [] localStartKey,
|
||||
int nbRows) {
|
||||
scan.setStartRow(localStartKey);
|
||||
ScannerCallable s = new ScannerCallable(getConnection(),
|
||||
getTableName(), scan, this.scanMetrics);
|
||||
s.setCaching(this.caching);
|
||||
s.setCaching(nbRows);
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -284,13 +286,23 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
Result [] values = null;
|
||||
long remainingResultSize = maxScannerResultSize;
|
||||
int countdown = this.caching;
|
||||
|
||||
// We need to reset it if it's a new callable that was created
|
||||
// with a countdown in nextScanner
|
||||
callable.setCaching(this.caching);
|
||||
// This flag is set when we want to skip the result returned. We do
|
||||
// this when we reset scanner because it split under us.
|
||||
boolean skipFirst = false;
|
||||
boolean retryAfterOutOfOrderException = true;
|
||||
do {
|
||||
try {
|
||||
if (skipFirst) {
|
||||
// Skip only the first row (which was the last row of the last
|
||||
// already-processed batch).
|
||||
callable.setCaching(1);
|
||||
values = this.caller.callWithRetries(callable);
|
||||
callable.setCaching(this.caching);
|
||||
skipFirst = false;
|
||||
}
|
||||
// Server returns a null values if scanning is to stop. Else,
|
||||
// returns an empty array if scanning is to go on and we've just
|
||||
// exhausted current region.
|
||||
|
@ -360,15 +372,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
}
|
||||
lastNext = currentTime;
|
||||
if (values != null && values.length > 0) {
|
||||
int i = 0;
|
||||
if (skipFirst) {
|
||||
skipFirst = false;
|
||||
// We will cache one row less, which is fine
|
||||
countdown--;
|
||||
i = 1;
|
||||
}
|
||||
for (; i < values.length; i++) {
|
||||
Result rs = values[i];
|
||||
for (Result rs : values) {
|
||||
cache.add(rs);
|
||||
for (KeyValue kv : rs.raw()) {
|
||||
remainingResultSize -= kv.heapSize();
|
||||
|
@ -378,7 +382,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
}
|
||||
}
|
||||
// Values == null means server-side filter has determined we must STOP
|
||||
} while (remainingResultSize > 0 && countdown > 0 && nextScanner(values == null));
|
||||
} while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
|
||||
}
|
||||
|
||||
if (cache.size() > 0) {
|
||||
|
@ -431,12 +435,4 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
}
|
||||
closed = true;
|
||||
}
|
||||
|
||||
long currentScannerId() {
|
||||
return (callable == null) ? -1L : callable.scannerId;
|
||||
}
|
||||
|
||||
HRegionInfo currentRegionInfo() {
|
||||
return currentRegion;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -116,8 +116,6 @@ public class Scan extends OperationWithAttributes {
|
|||
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
|
||||
private Boolean loadColumnFamiliesOnDemand = null;
|
||||
|
||||
private boolean prefetching = true;
|
||||
|
||||
/**
|
||||
* Create a Scan operation across all rows.
|
||||
*/
|
||||
|
@ -170,7 +168,6 @@ public class Scan extends OperationWithAttributes {
|
|||
getScan = scan.isGetScan();
|
||||
filter = scan.getFilter(); // clone?
|
||||
loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
|
||||
prefetching = scan.getPrefetching();
|
||||
TimeRange ctr = scan.getTimeRange();
|
||||
tr = new TimeRange(ctr.getMin(), ctr.getMax());
|
||||
Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
|
||||
|
@ -204,7 +201,6 @@ public class Scan extends OperationWithAttributes {
|
|||
this.storeOffset = get.getRowOffsetPerColumnFamily();
|
||||
this.tr = get.getTimeRange();
|
||||
this.familyMap = get.getFamilyMap();
|
||||
this.prefetching = false;
|
||||
this.getScan = true;
|
||||
}
|
||||
|
||||
|
@ -368,21 +364,6 @@ public class Scan extends OperationWithAttributes {
|
|||
}
|
||||
|
||||
/**
|
||||
* Set if pre-fetching is enabled. If enabled, the region
|
||||
* server will try to read the next scan result ahead of time. This
|
||||
* improves scan performance if we are doing large scans.
|
||||
*
|
||||
* @param enablePrefetching if pre-fetching is enabled or not
|
||||
*/
|
||||
public void setPrefetching(boolean enablePrefetching) {
|
||||
this.prefetching = enablePrefetching;
|
||||
}
|
||||
|
||||
public boolean getPrefetching() {
|
||||
return prefetching;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum result size in bytes. See {@link #setMaxResultSize(long)}
|
||||
*/
|
||||
public long getMaxResultSize() {
|
||||
|
@ -632,7 +613,6 @@ public class Scan extends OperationWithAttributes {
|
|||
map.put("maxResultSize", this.maxResultSize);
|
||||
map.put("cacheBlocks", this.cacheBlocks);
|
||||
map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
|
||||
map.put("prefetching", this.prefetching);
|
||||
List<Long> timeRange = new ArrayList<Long>();
|
||||
timeRange.add(this.tr.getMin());
|
||||
timeRange.add(this.tr.getMax());
|
||||
|
|
|
@ -61,7 +61,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
|||
public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
|
||||
long scannerId = -1L;
|
||||
private long scannerId = -1L;
|
||||
private boolean instantiated = false;
|
||||
private boolean closed = false;
|
||||
private Scan scan;
|
||||
|
@ -137,7 +137,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
|||
/**
|
||||
* @see java.util.concurrent.Callable#call()
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public Result [] call() throws IOException {
|
||||
if (closed) {
|
||||
if (scannerId != -1) {
|
||||
|
|
|
@ -713,9 +713,6 @@ public final class ProtobufUtil {
|
|||
if (scan.getBatch() > 0) {
|
||||
scanBuilder.setBatchSize(scan.getBatch());
|
||||
}
|
||||
if (scan.getCaching() > 0) {
|
||||
scanBuilder.setCachingCount(scan.getCaching());
|
||||
}
|
||||
if (scan.getMaxResultSize() > 0) {
|
||||
scanBuilder.setMaxResultSize(scan.getMaxResultSize());
|
||||
}
|
||||
|
@ -723,7 +720,6 @@ public final class ProtobufUtil {
|
|||
if (loadColumnFamiliesOnDemand != null) {
|
||||
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
|
||||
}
|
||||
scanBuilder.setPrefetching(scan.getPrefetching());
|
||||
scanBuilder.setMaxVersions(scan.getMaxVersions());
|
||||
TimeRange timeRange = scan.getTimeRange();
|
||||
if (!timeRange.isAllTime()) {
|
||||
|
@ -801,9 +797,6 @@ public final class ProtobufUtil {
|
|||
if (proto.hasMaxVersions()) {
|
||||
scan.setMaxVersions(proto.getMaxVersions());
|
||||
}
|
||||
if (proto.hasPrefetching()) {
|
||||
scan.setPrefetching(proto.getPrefetching());
|
||||
}
|
||||
if (proto.hasStoreLimit()) {
|
||||
scan.setMaxResultsPerColumnFamily(proto.getStoreLimit());
|
||||
}
|
||||
|
@ -832,9 +825,6 @@ public final class ProtobufUtil {
|
|||
if (proto.hasBatchSize()) {
|
||||
scan.setBatch(proto.getBatchSize());
|
||||
}
|
||||
if (proto.hasCachingCount()) {
|
||||
scan.setCaching(proto.getCachingCount());
|
||||
}
|
||||
if (proto.hasMaxResultSize()) {
|
||||
scan.setMaxResultSize(proto.getMaxResultSize());
|
||||
}
|
||||
|
|
|
@ -22,9 +22,6 @@ import java.io.InterruptedIOException;
|
|||
import java.io.PrintWriter;
|
||||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -186,42 +183,8 @@ public class Threads {
|
|||
boundedCachedThreadPool.allowCoreThreadTimeOut(true);
|
||||
return boundedCachedThreadPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a ThreadPoolExecutor which has a bound on the number of tasks that can be
|
||||
* submitted to it, determined by the blockingLimit parameter. Excess tasks
|
||||
* submitted will block on the calling thread till space frees up.
|
||||
*
|
||||
* @param blockingLimit max number of tasks that can be submitted
|
||||
* @param timeout time value after which unused threads are killed
|
||||
* @param unit time unit for killing unused threads
|
||||
* @param threadFactory thread factory to use to spawn threads
|
||||
* @return the ThreadPoolExecutor
|
||||
*/
|
||||
public static ThreadPoolExecutor getBlockingThreadPool(
|
||||
int blockingLimit, long timeout, TimeUnit unit,
|
||||
ThreadFactory threadFactory) {
|
||||
ThreadPoolExecutor blockingThreadPool =
|
||||
new ThreadPoolExecutor(
|
||||
1, blockingLimit, timeout, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<Runnable>(),
|
||||
threadFactory,
|
||||
new RejectedExecutionHandler() {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
try {
|
||||
// The submitting thread will block until the thread pool frees up.
|
||||
executor.getQueue().put(r);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RejectedExecutionException(
|
||||
"Failed to requeue the rejected request because of ", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
blockingThreadPool.allowCoreThreadTimeOut(true);
|
||||
return blockingThreadPool;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
|
||||
* with a common prefix.
|
||||
|
|
|
@ -10625,14 +10625,6 @@ public final class ClientProtos {
|
|||
// optional bool load_column_families_on_demand = 13;
|
||||
boolean hasLoadColumnFamiliesOnDemand();
|
||||
boolean getLoadColumnFamiliesOnDemand();
|
||||
|
||||
// optional uint32 caching_count = 14;
|
||||
boolean hasCachingCount();
|
||||
int getCachingCount();
|
||||
|
||||
// optional bool prefetching = 15;
|
||||
boolean hasPrefetching();
|
||||
boolean getPrefetching();
|
||||
}
|
||||
public static final class Scan extends
|
||||
com.google.protobuf.GeneratedMessage
|
||||
|
@ -10821,26 +10813,6 @@ public final class ClientProtos {
|
|||
return loadColumnFamiliesOnDemand_;
|
||||
}
|
||||
|
||||
// optional uint32 caching_count = 14;
|
||||
public static final int CACHING_COUNT_FIELD_NUMBER = 14;
|
||||
private int cachingCount_;
|
||||
public boolean hasCachingCount() {
|
||||
return ((bitField0_ & 0x00000800) == 0x00000800);
|
||||
}
|
||||
public int getCachingCount() {
|
||||
return cachingCount_;
|
||||
}
|
||||
|
||||
// optional bool prefetching = 15;
|
||||
public static final int PREFETCHING_FIELD_NUMBER = 15;
|
||||
private boolean prefetching_;
|
||||
public boolean hasPrefetching() {
|
||||
return ((bitField0_ & 0x00001000) == 0x00001000);
|
||||
}
|
||||
public boolean getPrefetching() {
|
||||
return prefetching_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
column_ = java.util.Collections.emptyList();
|
||||
attribute_ = java.util.Collections.emptyList();
|
||||
|
@ -10855,8 +10827,6 @@ public final class ClientProtos {
|
|||
storeLimit_ = 0;
|
||||
storeOffset_ = 0;
|
||||
loadColumnFamiliesOnDemand_ = false;
|
||||
cachingCount_ = 0;
|
||||
prefetching_ = false;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -10927,12 +10897,6 @@ public final class ClientProtos {
|
|||
if (((bitField0_ & 0x00000400) == 0x00000400)) {
|
||||
output.writeBool(13, loadColumnFamiliesOnDemand_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000800) == 0x00000800)) {
|
||||
output.writeUInt32(14, cachingCount_);
|
||||
}
|
||||
if (((bitField0_ & 0x00001000) == 0x00001000)) {
|
||||
output.writeBool(15, prefetching_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -10994,14 +10958,6 @@ public final class ClientProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(13, loadColumnFamiliesOnDemand_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000800) == 0x00000800)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(14, cachingCount_);
|
||||
}
|
||||
if (((bitField0_ & 0x00001000) == 0x00001000)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBoolSize(15, prefetching_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -11084,16 +11040,6 @@ public final class ClientProtos {
|
|||
result = result && (getLoadColumnFamiliesOnDemand()
|
||||
== other.getLoadColumnFamiliesOnDemand());
|
||||
}
|
||||
result = result && (hasCachingCount() == other.hasCachingCount());
|
||||
if (hasCachingCount()) {
|
||||
result = result && (getCachingCount()
|
||||
== other.getCachingCount());
|
||||
}
|
||||
result = result && (hasPrefetching() == other.hasPrefetching());
|
||||
if (hasPrefetching()) {
|
||||
result = result && (getPrefetching()
|
||||
== other.getPrefetching());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -11155,14 +11101,6 @@ public final class ClientProtos {
|
|||
hash = (37 * hash) + LOAD_COLUMN_FAMILIES_ON_DEMAND_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
|
||||
}
|
||||
if (hasCachingCount()) {
|
||||
hash = (37 * hash) + CACHING_COUNT_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getCachingCount();
|
||||
}
|
||||
if (hasPrefetching()) {
|
||||
hash = (37 * hash) + PREFETCHING_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashBoolean(getPrefetching());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
return hash;
|
||||
}
|
||||
|
@ -11325,10 +11263,6 @@ public final class ClientProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000800);
|
||||
loadColumnFamiliesOnDemand_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00001000);
|
||||
cachingCount_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00002000);
|
||||
prefetching_ = false;
|
||||
bitField0_ = (bitField0_ & ~0x00004000);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -11437,14 +11371,6 @@ public final class ClientProtos {
|
|||
to_bitField0_ |= 0x00000400;
|
||||
}
|
||||
result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
|
||||
if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
|
||||
to_bitField0_ |= 0x00000800;
|
||||
}
|
||||
result.cachingCount_ = cachingCount_;
|
||||
if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
|
||||
to_bitField0_ |= 0x00001000;
|
||||
}
|
||||
result.prefetching_ = prefetching_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -11546,12 +11472,6 @@ public final class ClientProtos {
|
|||
if (other.hasLoadColumnFamiliesOnDemand()) {
|
||||
setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
|
||||
}
|
||||
if (other.hasCachingCount()) {
|
||||
setCachingCount(other.getCachingCount());
|
||||
}
|
||||
if (other.hasPrefetching()) {
|
||||
setPrefetching(other.getPrefetching());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -11676,16 +11596,6 @@ public final class ClientProtos {
|
|||
loadColumnFamiliesOnDemand_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
case 112: {
|
||||
bitField0_ |= 0x00002000;
|
||||
cachingCount_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
case 120: {
|
||||
bitField0_ |= 0x00004000;
|
||||
prefetching_ = input.readBool();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12439,48 +12349,6 @@ public final class ClientProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional uint32 caching_count = 14;
|
||||
private int cachingCount_ ;
|
||||
public boolean hasCachingCount() {
|
||||
return ((bitField0_ & 0x00002000) == 0x00002000);
|
||||
}
|
||||
public int getCachingCount() {
|
||||
return cachingCount_;
|
||||
}
|
||||
public Builder setCachingCount(int value) {
|
||||
bitField0_ |= 0x00002000;
|
||||
cachingCount_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearCachingCount() {
|
||||
bitField0_ = (bitField0_ & ~0x00002000);
|
||||
cachingCount_ = 0;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// optional bool prefetching = 15;
|
||||
private boolean prefetching_ ;
|
||||
public boolean hasPrefetching() {
|
||||
return ((bitField0_ & 0x00004000) == 0x00004000);
|
||||
}
|
||||
public boolean getPrefetching() {
|
||||
return prefetching_;
|
||||
}
|
||||
public Builder setPrefetching(boolean value) {
|
||||
bitField0_ |= 0x00004000;
|
||||
prefetching_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearPrefetching() {
|
||||
bitField0_ = (bitField0_ & ~0x00004000);
|
||||
prefetching_ = false;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:Scan)
|
||||
}
|
||||
|
||||
|
@ -21603,7 +21471,7 @@ public final class ClientProtos {
|
|||
"gion\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation" +
|
||||
"\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(" +
|
||||
"\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006resul" +
|
||||
"t\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\201\003\n\004" +
|
||||
"t\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\325\002\n\004" +
|
||||
"Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattribu" +
|
||||
"te\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003 " +
|
||||
"\001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007.",
|
||||
|
@ -21612,47 +21480,46 @@ public final class ClientProtos {
|
|||
"\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_r" +
|
||||
"esult_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024\n" +
|
||||
"\014store_offset\030\014 \001(\r\022&\n\036load_column_famil" +
|
||||
"ies_on_demand\030\r \001(\010\022\025\n\rcaching_count\030\016 \001" +
|
||||
"(\r\022\023\n\013prefetching\030\017 \001(\010\"\236\001\n\013ScanRequest\022" +
|
||||
" \n\006region\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004sca" +
|
||||
"n\030\002 \001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016nu" +
|
||||
"mber_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(",
|
||||
"\010\022\025\n\rnext_call_seq\030\006 \001(\004\"p\n\014ScanResponse" +
|
||||
"\022)\n\020result_cell_meta\030\001 \001(\0132\017.ResultCellM" +
|
||||
"eta\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030" +
|
||||
"\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"&\n\016ResultCellMeta\022\024\n\014" +
|
||||
"cells_length\030\001 \003(\r\"\263\001\n\024BulkLoadHFileRequ" +
|
||||
"est\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\0225\n" +
|
||||
"\013family_path\030\002 \003(\0132 .BulkLoadHFileReques" +
|
||||
"t.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\n" +
|
||||
"FamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t" +
|
||||
"\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(",
|
||||
"\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014" +
|
||||
"\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003 " +
|
||||
"\002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031CoprocessorServi" +
|
||||
"ceRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" +
|
||||
"ier\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCa" +
|
||||
"ll\"]\n\032CoprocessorServiceResponse\022 \n\006regi" +
|
||||
"on\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(" +
|
||||
"\0132\016.NameBytesPair\"B\n\013MultiAction\022 \n\010muta" +
|
||||
"tion\030\001 \001(\0132\016.MutationProto\022\021\n\003get\030\002 \001(\0132" +
|
||||
"\004.Get\"I\n\014ActionResult\022\026\n\005value\030\001 \001(\0132\007.R",
|
||||
"esult\022!\n\texception\030\002 \001(\0132\016.NameBytesPair" +
|
||||
"\"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.Regio" +
|
||||
"nSpecifier\022\034\n\006action\030\002 \003(\0132\014.MultiAction" +
|
||||
"\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006res" +
|
||||
"ult\030\001 \003(\0132\r.ActionResult2\342\002\n\rClientServi" +
|
||||
"ce\022 \n\003Get\022\013.GetRequest\032\014.GetResponse\022/\n\010" +
|
||||
"MultiGet\022\020.MultiGetRequest\032\021.MultiGetRes" +
|
||||
"ponse\022)\n\006Mutate\022\016.MutateRequest\032\017.Mutate" +
|
||||
"Response\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRes" +
|
||||
"ponse\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileRe",
|
||||
"quest\032\026.BulkLoadHFileResponse\022F\n\013ExecSer" +
|
||||
"vice\022\032.CoprocessorServiceRequest\032\033.Copro" +
|
||||
"cessorServiceResponse\022&\n\005Multi\022\r.MultiRe" +
|
||||
"quest\032\016.MultiResponseBB\n*org.apache.hado" +
|
||||
"op.hbase.protobuf.generatedB\014ClientProto" +
|
||||
"sH\001\210\001\001\240\001\001"
|
||||
"ies_on_demand\030\r \001(\010\"\236\001\n\013ScanRequest\022 \n\006r" +
|
||||
"egion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 " +
|
||||
"\001(\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number" +
|
||||
"_of_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n" +
|
||||
"\rnext_call_seq\030\006 \001(\004\"p\n\014ScanResponse\022)\n\020",
|
||||
"result_cell_meta\030\001 \001(\0132\017.ResultCellMeta\022" +
|
||||
"\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(" +
|
||||
"\010\022\013\n\003ttl\030\004 \001(\r\"&\n\016ResultCellMeta\022\024\n\014cell" +
|
||||
"s_length\030\001 \003(\r\"\263\001\n\024BulkLoadHFileRequest\022" +
|
||||
" \n\006region\030\001 \002(\0132\020.RegionSpecifier\0225\n\013fam" +
|
||||
"ily_path\030\002 \003(\0132 .BulkLoadHFileRequest.Fa" +
|
||||
"milyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFami" +
|
||||
"lyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025" +
|
||||
"BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n" +
|
||||
"\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014",
|
||||
"service_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022" +
|
||||
"\017\n\007request\030\004 \002(\014\"d\n\031CoprocessorServiceRe" +
|
||||
"quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
|
||||
"%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCall\"]" +
|
||||
"\n\032CoprocessorServiceResponse\022 \n\006region\030\001" +
|
||||
" \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016." +
|
||||
"NameBytesPair\"B\n\013MultiAction\022 \n\010mutation" +
|
||||
"\030\001 \001(\0132\016.MutationProto\022\021\n\003get\030\002 \001(\0132\004.Ge" +
|
||||
"t\"I\n\014ActionResult\022\026\n\005value\030\001 \001(\0132\007.Resul" +
|
||||
"t\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"^\n\014",
|
||||
"MultiRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpe" +
|
||||
"cifier\022\034\n\006action\030\002 \003(\0132\014.MultiAction\022\016\n\006" +
|
||||
"atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006result\030" +
|
||||
"\001 \003(\0132\r.ActionResult2\342\002\n\rClientService\022 " +
|
||||
"\n\003Get\022\013.GetRequest\032\014.GetResponse\022/\n\010Mult" +
|
||||
"iGet\022\020.MultiGetRequest\032\021.MultiGetRespons" +
|
||||
"e\022)\n\006Mutate\022\016.MutateRequest\032\017.MutateResp" +
|
||||
"onse\022#\n\004Scan\022\014.ScanRequest\032\r.ScanRespons" +
|
||||
"e\022>\n\rBulkLoadHFile\022\025.BulkLoadHFileReques" +
|
||||
"t\032\026.BulkLoadHFileResponse\022F\n\013ExecService",
|
||||
"\022\032.CoprocessorServiceRequest\032\033.Coprocess" +
|
||||
"orServiceResponse\022&\n\005Multi\022\r.MultiReques" +
|
||||
"t\032\016.MultiResponseBB\n*org.apache.hadoop.h" +
|
||||
"base.protobuf.generatedB\014ClientProtosH\001\210" +
|
||||
"\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -21768,7 +21635,7 @@ public final class ClientProtos {
|
|||
internal_static_Scan_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_Scan_descriptor,
|
||||
new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "CachingCount", "Prefetching", },
|
||||
new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", },
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
|
||||
internal_static_ScanRequest_descriptor =
|
||||
|
|
|
@ -236,8 +236,6 @@ message Scan {
|
|||
optional uint32 store_limit = 11;
|
||||
optional uint32 store_offset = 12;
|
||||
optional bool load_column_families_on_demand = 13; /* DO NOT add defaults to load_column_families_on_demand. */
|
||||
optional uint32 caching_count = 14;
|
||||
optional bool prefetching = 15;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -183,7 +183,6 @@ import com.google.protobuf.Service;
|
|||
* defines the keyspace for this HRegion.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("deprecation")
|
||||
public class HRegion implements HeapSize { // , Writable{
|
||||
public static final Log LOG = LogFactory.getLog(HRegion.class);
|
||||
|
||||
|
@ -3543,6 +3542,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return returnResult;
|
||||
}
|
||||
|
||||
|
||||
private void populateFromJoinedHeap(List<KeyValue> results, int limit)
|
||||
throws IOException {
|
||||
assert joinedContinuationRow != null;
|
||||
|
|
|
@ -45,8 +45,6 @@ import java.util.TreeSet;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
@ -61,7 +59,6 @@ import org.apache.hadoop.hbase.CellScannable;
|
|||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||
|
@ -71,6 +68,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.HealthCheckChore;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
|
@ -162,8 +160,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
|
||||
|
@ -477,11 +475,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
// Table level lock manager for locking for region operations
|
||||
private TableLockManager tableLockManager;
|
||||
|
||||
/**
|
||||
* Threadpool for doing scanner prefetches
|
||||
*/
|
||||
protected ThreadPoolExecutor scanPrefetchThreadPool;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location
|
||||
*
|
||||
|
@ -632,18 +625,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
}
|
||||
|
||||
RegionScanner getScanner(long scannerId) {
|
||||
RegionScannerHolder scannerHolder = getScannerHolder(scannerId);
|
||||
String scannerIdString = Long.toString(scannerId);
|
||||
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
|
||||
if (scannerHolder != null) {
|
||||
return scannerHolder.scanner;
|
||||
return scannerHolder.s;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public RegionScannerHolder getScannerHolder(long scannerId) {
|
||||
String scannerIdString = Long.toString(scannerId);
|
||||
return scanners.get(scannerIdString);
|
||||
}
|
||||
|
||||
/**
|
||||
* All initialization needed before we go register with Master.
|
||||
*
|
||||
|
@ -860,11 +849,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
if (this.thriftServer != null) this.thriftServer.shutdown();
|
||||
this.leases.closeAfterLeasesExpire();
|
||||
this.rpcServer.stop();
|
||||
|
||||
if (scanPrefetchThreadPool != null) {
|
||||
// shutdown the prefetch threads
|
||||
scanPrefetchThreadPool.shutdownNow();
|
||||
}
|
||||
if (this.splitLogWorker != null) {
|
||||
splitLogWorker.stop();
|
||||
}
|
||||
|
@ -1134,7 +1118,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
// exception next time they come in.
|
||||
for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
|
||||
try {
|
||||
e.getValue().closeScanner();
|
||||
e.getValue().s.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Closing scanner " + e.getKey(), ioe);
|
||||
}
|
||||
|
@ -1559,14 +1543,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
this.replicationSinkHandler.startReplicationService();
|
||||
}
|
||||
|
||||
// start the scanner prefetch threadpool
|
||||
int numHandlers = conf.getInt("hbase.regionserver.prefetcher.threads.max",
|
||||
conf.getInt("hbase.regionserver.handler.count", 10)
|
||||
+ conf.getInt("hbase.regionserver.metahandler.count", 10));
|
||||
scanPrefetchThreadPool =
|
||||
Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS,
|
||||
new DaemonThreadFactory(RegionScannerHolder.PREFETCHER_THREAD_PREFIX));
|
||||
|
||||
// Start Server. This service is like leases in that it internally runs
|
||||
// a thread.
|
||||
this.rpcServer.start();
|
||||
|
@ -2372,7 +2348,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
public void leaseExpired() {
|
||||
RegionScannerHolder rsh = scanners.remove(this.scannerName);
|
||||
if (rsh != null) {
|
||||
RegionScanner s = rsh.scanner;
|
||||
RegionScanner s = rsh.s;
|
||||
LOG.info("Scanner " + this.scannerName + " lease expired on region "
|
||||
+ s.getRegionInfo().getRegionNameAsString());
|
||||
try {
|
||||
|
@ -2381,7 +2357,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
region.getCoprocessorHost().preScannerClose(s);
|
||||
}
|
||||
|
||||
rsh.closeScanner();
|
||||
s.close();
|
||||
if (region != null && region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postScannerClose(s);
|
||||
}
|
||||
|
@ -2686,22 +2662,20 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
return this.fsOk;
|
||||
}
|
||||
|
||||
protected RegionScannerHolder addScanner(
|
||||
RegionScanner s, HRegion r) throws LeaseStillHeldException {
|
||||
RegionScannerHolder holder = new RegionScannerHolder(this, s, r);
|
||||
String scannerName = null;
|
||||
protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
|
||||
long scannerId = -1;
|
||||
while (true) {
|
||||
scannerId = nextLong();
|
||||
scannerName = String.valueOf(scannerId);
|
||||
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, holder);
|
||||
scannerId = rand.nextLong();
|
||||
if (scannerId == -1) continue;
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
|
||||
if (existing == null) {
|
||||
holder.scannerName = scannerName;
|
||||
this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
|
||||
new ScannerListener(scannerName));
|
||||
return holder;
|
||||
new ScannerListener(scannerName));
|
||||
break;
|
||||
}
|
||||
}
|
||||
return scannerId;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2963,6 +2937,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
@Override
|
||||
public ScanResponse scan(final RpcController controller,
|
||||
final ScanRequest request) throws ServiceException {
|
||||
Leases.Lease lease = null;
|
||||
String scannerName = null;
|
||||
try {
|
||||
if (!request.hasScannerId() && !request.hasScan()) {
|
||||
|
@ -3013,10 +2988,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
throw new UnknownScannerException(
|
||||
"Name: " + scannerName + ", already closed?");
|
||||
}
|
||||
scanner = rsh.scanner;
|
||||
// Use the region found in the online region list,
|
||||
// not that one in the RegionScannerHolder. So that we can
|
||||
// make sure the region is still open in this region server.
|
||||
scanner = rsh.s;
|
||||
region = getRegion(scanner.getRegionInfo().getRegionName());
|
||||
} else {
|
||||
region = getRegion(request.getRegion());
|
||||
|
@ -3027,6 +2999,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
if (!isLoadingCfsOnDemandSet) {
|
||||
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
|
||||
}
|
||||
byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
||||
region.prepareScanner(scan);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
||||
|
@ -3037,14 +3010,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
if (region.getCoprocessorHost() != null) {
|
||||
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
|
||||
}
|
||||
rsh = addScanner(scanner, region);
|
||||
scannerName = rsh.scannerName;
|
||||
scannerId = Long.parseLong(scannerName);
|
||||
|
||||
scannerId = addScanner(scanner);
|
||||
scannerName = String.valueOf(scannerId);
|
||||
ttl = this.scannerLeaseTimeoutPeriod;
|
||||
if (scan.getPrefetching()) {
|
||||
rsh.enablePrefetching(scan.getCaching());
|
||||
}
|
||||
}
|
||||
|
||||
if (rows > 0) {
|
||||
|
@ -3052,34 +3020,110 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
// performed even before checking of Lease.
|
||||
// See HBASE-5974
|
||||
if (request.hasNextCallSeq()) {
|
||||
if (request.getNextCallSeq() != rsh.nextCallSeq) {
|
||||
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
|
||||
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
|
||||
"; request=" + TextFormat.shortDebugString(request));
|
||||
if (rsh == null) {
|
||||
rsh = scanners.get(scannerName);
|
||||
}
|
||||
// Increment the nextCallSeq value which is the next expected from client.
|
||||
rsh.nextCallSeq++;
|
||||
}
|
||||
|
||||
ttl = this.scannerLeaseTimeoutPeriod;
|
||||
ScanResult result = rsh.getScanResult(rows);
|
||||
if (result.isException) {
|
||||
throw result.ioException;
|
||||
}
|
||||
|
||||
moreResults = result.moreResults;
|
||||
if (result.results != null) {
|
||||
List<CellScannable> cellScannables =
|
||||
new ArrayList<CellScannable>(result.results.size());
|
||||
ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
|
||||
for (Result res : result.results) {
|
||||
cellScannables.add(res);
|
||||
rcmBuilder.addCellsLength(res.size());
|
||||
if (rsh != null) {
|
||||
if (request.getNextCallSeq() != rsh.nextCallSeq) {
|
||||
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
|
||||
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
|
||||
"; request=" + TextFormat.shortDebugString(request));
|
||||
}
|
||||
// Increment the nextCallSeq value which is the next expected from client.
|
||||
rsh.nextCallSeq++;
|
||||
}
|
||||
}
|
||||
try {
|
||||
// Remove lease while its being processed in server; protects against case
|
||||
// where processing of request takes > lease expiration time.
|
||||
lease = leases.removeLease(scannerName);
|
||||
List<Result> results = new ArrayList<Result>(rows);
|
||||
long currentScanResultSize = 0;
|
||||
|
||||
boolean done = false;
|
||||
// Call coprocessor. Get region info from scanner.
|
||||
if (region != null && region.getCoprocessorHost() != null) {
|
||||
Boolean bypass = region.getCoprocessorHost().preScannerNext(
|
||||
scanner, results, rows);
|
||||
if (!results.isEmpty()) {
|
||||
for (Result r : results) {
|
||||
if (maxScannerResultSize < Long.MAX_VALUE){
|
||||
for (KeyValue kv : r.raw()) {
|
||||
currentScanResultSize += kv.heapSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (bypass != null && bypass.booleanValue()) {
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!done) {
|
||||
long maxResultSize = scanner.getMaxResultSize();
|
||||
if (maxResultSize <= 0) {
|
||||
maxResultSize = maxScannerResultSize;
|
||||
}
|
||||
List<KeyValue> values = new ArrayList<KeyValue>();
|
||||
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
|
||||
region.startRegionOperation(Operation.SCAN);
|
||||
try {
|
||||
int i = 0;
|
||||
synchronized(scanner) {
|
||||
for (; i < rows
|
||||
&& currentScanResultSize < maxResultSize; i++) {
|
||||
// Collect values to be returned here
|
||||
boolean moreRows = scanner.nextRaw(values);
|
||||
if (!values.isEmpty()) {
|
||||
if (maxScannerResultSize < Long.MAX_VALUE){
|
||||
for (KeyValue kv : values) {
|
||||
currentScanResultSize += kv.heapSize();
|
||||
}
|
||||
}
|
||||
results.add(new Result(values));
|
||||
}
|
||||
if (!moreRows) {
|
||||
break;
|
||||
}
|
||||
values.clear();
|
||||
}
|
||||
}
|
||||
region.readRequestsCount.add(i);
|
||||
} finally {
|
||||
region.closeRegionOperation();
|
||||
}
|
||||
|
||||
// coprocessor postNext hook
|
||||
if (region != null && region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
|
||||
}
|
||||
}
|
||||
|
||||
// If the scanner's filter - if any - is done with the scan
|
||||
// and wants to tell the client to stop the scan. This is done by passing
|
||||
// a null result, and setting moreResults to false.
|
||||
if (scanner.isFilterDone() && results.isEmpty()) {
|
||||
moreResults = false;
|
||||
results = null;
|
||||
} else {
|
||||
ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
|
||||
List<CellScannable> cellScannables = new ArrayList<CellScannable>(results.size());
|
||||
for (Result res : results) {
|
||||
cellScannables.add(res);
|
||||
rcmBuilder.addCellsLength(res.size());
|
||||
}
|
||||
builder.setResultCellMeta(rcmBuilder.build());
|
||||
// TODO is this okey to assume the type and cast
|
||||
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
|
||||
.createCellScanner(cellScannables));
|
||||
}
|
||||
} finally {
|
||||
// We're done. On way out re-add the above removed lease.
|
||||
// Adding resets expiration time on lease.
|
||||
if (scanners.containsKey(scannerName)) {
|
||||
if (lease != null) leases.addLease(lease);
|
||||
ttl = this.scannerLeaseTimeoutPeriod;
|
||||
}
|
||||
builder.setResultCellMeta(rcmBuilder.build());
|
||||
// TODO is this okey to assume the type and cast
|
||||
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
|
||||
.createCellScanner(cellScannables));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3093,13 +3137,9 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
}
|
||||
rsh = scanners.remove(scannerName);
|
||||
if (rsh != null) {
|
||||
rsh.closeScanner();
|
||||
try {
|
||||
leases.cancelLease(scannerName);
|
||||
} catch (LeaseException le) {
|
||||
// That's ok, since the lease may be gone with
|
||||
// the prefetcher when cancelled.
|
||||
}
|
||||
scanner = rsh.s;
|
||||
scanner.close();
|
||||
leases.cancelLease(scannerName);
|
||||
if (region != null && region.getCoprocessorHost() != null) {
|
||||
region.getCoprocessorHost().postScannerClose(scanner);
|
||||
}
|
||||
|
@ -4182,6 +4222,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
|||
return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Holder class which holds the RegionScanner and nextCallSeq together.
|
||||
*/
|
||||
private static class RegionScannerHolder {
|
||||
private RegionScanner s;
|
||||
private long nextCallSeq = 0L;
|
||||
|
||||
public RegionScannerHolder(RegionScanner s) {
|
||||
this.s = s;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isHealthCheckerConfigured() {
|
||||
String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
|
||||
return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
|
||||
|
|
|
@ -17,41 +17,31 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTestConst;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
|
||||
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScannerHolder;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
/**
|
||||
* A client-side test, mostly testing scanners with various parameters.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestScannersFromClientSide {
|
||||
private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class);
|
||||
|
||||
|
@ -61,37 +51,6 @@ public class TestScannersFromClientSide {
|
|||
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
|
||||
private static byte [] VALUE = Bytes.toBytes("testValue");
|
||||
|
||||
private final boolean prefetching;
|
||||
private long maxSize;
|
||||
|
||||
@Parameters
|
||||
public static final Collection<Object[]> parameters() {
|
||||
List<Object[]> prefetchings = new ArrayList<Object[]>();
|
||||
prefetchings.add(new Object[] {Long.valueOf(-1)});
|
||||
prefetchings.add(new Object[] {Long.valueOf(0)});
|
||||
prefetchings.add(new Object[] {Long.valueOf(1)});
|
||||
prefetchings.add(new Object[] {Long.valueOf(1024)});
|
||||
return prefetchings;
|
||||
}
|
||||
|
||||
public TestScannersFromClientSide(Long maxPrefetchedResultSize) {
|
||||
this.maxSize = maxPrefetchedResultSize.longValue();
|
||||
if (this.maxSize < 0) {
|
||||
this.prefetching = false;
|
||||
} else {
|
||||
this.prefetching = true;
|
||||
if (this.maxSize == 0) {
|
||||
this.maxSize = RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_DEFAULT;
|
||||
} else {
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) {
|
||||
Configuration conf = rst.getRegionServer().getConfiguration();
|
||||
conf.setLong(RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_KEY, maxSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
|
@ -106,9 +65,22 @@ public class TestScannersFromClientSide {
|
|||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
long remainingPrefetchedSize = RegionScannerHolder.getPrefetchedResultSize();
|
||||
assertEquals("All prefetched results should be gone",
|
||||
0, remainingPrefetchedSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -117,23 +89,8 @@ public class TestScannersFromClientSide {
|
|||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testScanBatchWithDefaultCaching() throws Exception {
|
||||
batchedScanWithCachingSpecified(-1); // Using default caching which is 100
|
||||
}
|
||||
|
||||
/**
|
||||
* Test from client side for batch of scan
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testScanBatch() throws Exception {
|
||||
batchedScanWithCachingSpecified(1);
|
||||
}
|
||||
|
||||
private void batchedScanWithCachingSpecified(int caching) throws Exception {
|
||||
byte [] TABLE = Bytes.toBytes(
|
||||
"testScanBatch-" + prefetching + "_" + maxSize + "_" + caching);
|
||||
byte [] TABLE = Bytes.toBytes("testScanBatch");
|
||||
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
|
||||
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
|
||||
|
@ -142,7 +99,7 @@ public class TestScannersFromClientSide {
|
|||
Scan scan;
|
||||
Delete delete;
|
||||
Result result;
|
||||
ClientScanner scanner;
|
||||
ResultScanner scanner;
|
||||
boolean toLog = true;
|
||||
List<KeyValue> kvListExp;
|
||||
|
||||
|
@ -167,11 +124,8 @@ public class TestScannersFromClientSide {
|
|||
|
||||
// without batch
|
||||
scan = new Scan(ROW);
|
||||
scan.setCaching(caching);
|
||||
scan.setMaxVersions();
|
||||
scan.setPrefetching(prefetching);
|
||||
scanner = (ClientScanner)ht.getScanner(scan);
|
||||
verifyPrefetching(scanner);
|
||||
scanner = ht.getScanner(scan);
|
||||
|
||||
// c4:4, c5:5, c6:6, c7:7
|
||||
kvListExp = new ArrayList<KeyValue>();
|
||||
|
@ -181,16 +135,12 @@ public class TestScannersFromClientSide {
|
|||
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
|
||||
result = scanner.next();
|
||||
verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
|
||||
verifyPrefetching(scanner);
|
||||
|
||||
// with batch
|
||||
scan = new Scan(ROW);
|
||||
scan.setCaching(caching);
|
||||
scan.setMaxVersions();
|
||||
scan.setBatch(2);
|
||||
scan.setPrefetching(prefetching);
|
||||
scanner = (ClientScanner)ht.getScanner(scan);
|
||||
verifyPrefetching(scanner);
|
||||
scanner = ht.getScanner(scan);
|
||||
|
||||
// First batch: c4:4, c5:5
|
||||
kvListExp = new ArrayList<KeyValue>();
|
||||
|
@ -198,7 +148,6 @@ public class TestScannersFromClientSide {
|
|||
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
|
||||
result = scanner.next();
|
||||
verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
|
||||
verifyPrefetching(scanner);
|
||||
|
||||
// Second batch: c6:6, c7:7
|
||||
kvListExp = new ArrayList<KeyValue>();
|
||||
|
@ -206,7 +155,7 @@ public class TestScannersFromClientSide {
|
|||
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
|
||||
result = scanner.next();
|
||||
verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
|
||||
verifyPrefetching(scanner);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -216,7 +165,7 @@ public class TestScannersFromClientSide {
|
|||
*/
|
||||
@Test
|
||||
public void testGetMaxResults() throws Exception {
|
||||
byte [] TABLE = Bytes.toBytes("testGetMaxResults-" + prefetching + "_" + maxSize);
|
||||
byte [] TABLE = Bytes.toBytes("testGetMaxResults");
|
||||
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
||||
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
|
||||
|
||||
|
@ -336,7 +285,7 @@ public class TestScannersFromClientSide {
|
|||
*/
|
||||
@Test
|
||||
public void testScanMaxResults() throws Exception {
|
||||
byte [] TABLE = Bytes.toBytes("testScanLimit-" + prefetching + "_" + maxSize);
|
||||
byte [] TABLE = Bytes.toBytes("testScanLimit");
|
||||
byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
|
||||
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
||||
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
|
||||
|
@ -366,19 +315,17 @@ public class TestScannersFromClientSide {
|
|||
}
|
||||
|
||||
scan = new Scan();
|
||||
scan.setCaching(1);
|
||||
scan.setPrefetching(prefetching);
|
||||
scan.setMaxResultsPerColumnFamily(4);
|
||||
ClientScanner scanner = (ClientScanner)ht.getScanner(scan);
|
||||
ResultScanner scanner = ht.getScanner(scan);
|
||||
kvListScan = new ArrayList<KeyValue>();
|
||||
while ((result = scanner.next()) != null) {
|
||||
verifyPrefetching(scanner);
|
||||
for (KeyValue kv : result.list()) {
|
||||
kvListScan.add(kv);
|
||||
}
|
||||
}
|
||||
result = new Result(kvListScan);
|
||||
verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -388,7 +335,7 @@ public class TestScannersFromClientSide {
|
|||
*/
|
||||
@Test
|
||||
public void testGetRowOffset() throws Exception {
|
||||
byte [] TABLE = Bytes.toBytes("testGetRowOffset-" + prefetching + "_" + maxSize);
|
||||
byte [] TABLE = Bytes.toBytes("testGetRowOffset");
|
||||
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
||||
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
|
||||
|
||||
|
@ -476,48 +423,6 @@ public class TestScannersFromClientSide {
|
|||
"Testing offset + multiple CFs + maxResults");
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing only, find a region scanner holder for a scan.
|
||||
*/
|
||||
RegionScannerHolder findRegionScannerHolder(ClientScanner scanner) {
|
||||
long scannerId = scanner.currentScannerId();
|
||||
if (scannerId == -1L) return null;
|
||||
|
||||
HRegionInfo expectedRegion = scanner.currentRegionInfo();
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) {
|
||||
RegionScannerHolder rsh = rst.getRegionServer().getScannerHolder(scannerId);
|
||||
if (rsh != null && rsh.getRegionInfo().equals(expectedRegion)) {
|
||||
return rsh;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
void verifyPrefetching(ClientScanner scanner) throws IOException {
|
||||
long scannerId = scanner.currentScannerId();
|
||||
if (scannerId == -1L) return; // scanner is already closed
|
||||
RegionScannerHolder rsh = findRegionScannerHolder(scanner);
|
||||
assertNotNull("We should be able to find the scanner", rsh);
|
||||
boolean isPrefetchSubmitted = rsh.isPrefetchSubmitted();
|
||||
if (prefetching && (RegionScannerHolder.getPrefetchedResultSize() < this.maxSize)) {
|
||||
assertTrue("Prefetching should be submitted or no more result",
|
||||
isPrefetchSubmitted || scanner.next() == null);
|
||||
} else if (isPrefetchSubmitted) {
|
||||
// Prefetch submitted, it must be because prefetching is enabled,
|
||||
// and there was still room before it's scheduled
|
||||
long sizeBefore = RegionScannerHolder.getPrefetchedResultSize()
|
||||
- rsh.currentPrefetchedResultSize();
|
||||
assertTrue("There should have room before prefetching is submitted, maxSize=" +
|
||||
this.maxSize + ", prefetching=" + prefetching + ", sizeBefore=" + sizeBefore,
|
||||
prefetching && sizeBefore < this.maxSize);
|
||||
}
|
||||
if (isPrefetchSubmitted && rsh.waitForPrefetchingDone()) {
|
||||
assertTrue("Prefetched result size should not be 0",
|
||||
rsh.currentPrefetchedResultSize() > 0);
|
||||
}
|
||||
}
|
||||
|
||||
static void verifyResult(Result result, List<KeyValue> expKvList, boolean toLog,
|
||||
String msg) {
|
||||
|
||||
|
@ -543,4 +448,6 @@ public class TestScannersFromClientSide {
|
|||
|
||||
assertEquals(expKvList.size(), result.size());
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcesso
|
|||
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -130,7 +131,6 @@ public class TestRowProcessorEndpoint {
|
|||
// ignore table not found
|
||||
}
|
||||
table = util.createTable(TABLE, FAM);
|
||||
table.setAutoFlush(false);
|
||||
{
|
||||
Put put = new Put(ROW);
|
||||
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
||||
|
@ -144,8 +144,6 @@ public class TestRowProcessorEndpoint {
|
|||
put.add(FAM, F, G);
|
||||
table.put(put);
|
||||
row2Size = put.size();
|
||||
table.clearRegionCache();
|
||||
table.flushCommits();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -299,7 +299,6 @@ public class TestProtobufUtil {
|
|||
scanBuilder = ClientProtos.Scan.newBuilder(proto);
|
||||
scanBuilder.setMaxVersions(1);
|
||||
scanBuilder.setCacheBlocks(true);
|
||||
scanBuilder.setPrefetching(true);
|
||||
|
||||
Scan scan = ProtobufUtil.toScan(proto);
|
||||
assertEquals(scanBuilder.build(), ProtobufUtil.toScan(scan));
|
||||
|
|
|
@ -335,7 +335,6 @@ public class TestRegionServerMetrics {
|
|||
Scan s = new Scan();
|
||||
s.setBatch(1);
|
||||
s.setCaching(1);
|
||||
s.setPrefetching(false);
|
||||
ResultScanner resultScanners = t.getScanner(s);
|
||||
|
||||
for (int nextCount = 0; nextCount < 30; nextCount++) {
|
||||
|
|
Loading…
Reference in New Issue