HBASE-8420 Port HBASE-6874 Implement prefetching for scanners from 0.89-fb
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1486246 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cea01f4192
commit
bf4a3af4a4
|
@ -128,7 +128,7 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize the scanner
|
// initialize the scanner
|
||||||
nextScanner(this.caching, false);
|
nextScanner(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected HConnection getConnection() {
|
protected HConnection getConnection() {
|
||||||
|
@ -169,10 +169,9 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
* scanner at the scan.getStartRow(). We will go no further, just tidy
|
* scanner at the scan.getStartRow(). We will go no further, just tidy
|
||||||
* up outstanding scanners, if <code>currentRegion != null</code> and
|
* up outstanding scanners, if <code>currentRegion != null</code> and
|
||||||
* <code>done</code> is true.
|
* <code>done</code> is true.
|
||||||
* @param nbRows
|
|
||||||
* @param done Server-side says we're done scanning.
|
* @param done Server-side says we're done scanning.
|
||||||
*/
|
*/
|
||||||
private boolean nextScanner(int nbRows, final boolean done)
|
private boolean nextScanner(final boolean done)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Close the previous scanner if it's open
|
// Close the previous scanner if it's open
|
||||||
if (this.callable != null) {
|
if (this.callable != null) {
|
||||||
|
@ -210,7 +209,7 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
Bytes.toStringBinary(localStartKey) + "'");
|
Bytes.toStringBinary(localStartKey) + "'");
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
callable = getScannerCallable(localStartKey, nbRows);
|
callable = getScannerCallable(localStartKey);
|
||||||
// Open a scanner on the region server starting at the
|
// Open a scanner on the region server starting at the
|
||||||
// beginning of the region
|
// beginning of the region
|
||||||
callable.withRetries();
|
callable.withRetries();
|
||||||
|
@ -225,12 +224,11 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ScannerCallable getScannerCallable(byte [] localStartKey,
|
protected ScannerCallable getScannerCallable(byte [] localStartKey) {
|
||||||
int nbRows) {
|
|
||||||
scan.setStartRow(localStartKey);
|
scan.setStartRow(localStartKey);
|
||||||
ScannerCallable s = new ScannerCallable(getConnection(),
|
ScannerCallable s = new ScannerCallable(getConnection(),
|
||||||
getTableName(), scan, this.scanMetrics);
|
getTableName(), scan, this.scanMetrics);
|
||||||
s.setCaching(nbRows);
|
s.setCaching(this.caching);
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,27 +260,21 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
Result [] values = null;
|
Result [] values = null;
|
||||||
long remainingResultSize = maxScannerResultSize;
|
long remainingResultSize = maxScannerResultSize;
|
||||||
int countdown = this.caching;
|
int countdown = this.caching;
|
||||||
// We need to reset it if it's a new callable that was created
|
|
||||||
// with a countdown in nextScanner
|
|
||||||
callable.setCaching(this.caching);
|
|
||||||
// This flag is set when we want to skip the result returned. We do
|
// This flag is set when we want to skip the result returned. We do
|
||||||
// this when we reset scanner because it split under us.
|
// this when we reset scanner because it split under us.
|
||||||
boolean skipFirst = false;
|
boolean skipFirst = false;
|
||||||
boolean retryAfterOutOfOrderException = true;
|
boolean retryAfterOutOfOrderException = true;
|
||||||
do {
|
do {
|
||||||
try {
|
try {
|
||||||
if (skipFirst) {
|
|
||||||
// Skip only the first row (which was the last row of the last
|
|
||||||
// already-processed batch).
|
|
||||||
callable.setCaching(1);
|
|
||||||
values = callable.withRetries();
|
|
||||||
callable.setCaching(this.caching);
|
|
||||||
skipFirst = false;
|
|
||||||
}
|
|
||||||
// Server returns a null values if scanning is to stop. Else,
|
// Server returns a null values if scanning is to stop. Else,
|
||||||
// returns an empty array if scanning is to go on and we've just
|
// returns an empty array if scanning is to go on and we've just
|
||||||
// exhausted current region.
|
// exhausted current region.
|
||||||
values = callable.withRetries();
|
values = callable.withRetries();
|
||||||
|
if (skipFirst && values != null && values.length == 1) {
|
||||||
|
skipFirst = false; // Already skipped, unset it before scanning again
|
||||||
|
values = callable.withRetries();
|
||||||
|
}
|
||||||
retryAfterOutOfOrderException = true;
|
retryAfterOutOfOrderException = true;
|
||||||
} catch (DoNotRetryIOException e) {
|
} catch (DoNotRetryIOException e) {
|
||||||
// DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
|
// DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
|
||||||
|
@ -344,7 +336,15 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
lastNext = currentTime;
|
lastNext = currentTime;
|
||||||
if (values != null && values.length > 0) {
|
if (values != null && values.length > 0) {
|
||||||
for (Result rs : values) {
|
int i = 0;
|
||||||
|
if (skipFirst) {
|
||||||
|
skipFirst = false;
|
||||||
|
// We will cache one row less, which is fine
|
||||||
|
countdown--;
|
||||||
|
i = 1;
|
||||||
|
}
|
||||||
|
for (; i < values.length; i++) {
|
||||||
|
Result rs = values[i];
|
||||||
cache.add(rs);
|
cache.add(rs);
|
||||||
for (KeyValue kv : rs.raw()) {
|
for (KeyValue kv : rs.raw()) {
|
||||||
remainingResultSize -= kv.heapSize();
|
remainingResultSize -= kv.heapSize();
|
||||||
|
@ -354,7 +354,7 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Values == null means server-side filter has determined we must STOP
|
// Values == null means server-side filter has determined we must STOP
|
||||||
} while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
|
} while (remainingResultSize > 0 && countdown > 0 && nextScanner(values == null));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cache.size() > 0) {
|
if (cache.size() > 0) {
|
||||||
|
@ -411,4 +411,12 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
closed = true;
|
closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long currentScannerId() {
|
||||||
|
return (callable == null) ? -1L : callable.scannerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
HRegionInfo currentRegionInfo() {
|
||||||
|
return currentRegion;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,6 +116,8 @@ public class Scan extends OperationWithAttributes {
|
||||||
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
|
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
|
||||||
private Boolean loadColumnFamiliesOnDemand = null;
|
private Boolean loadColumnFamiliesOnDemand = null;
|
||||||
|
|
||||||
|
private boolean prefetching = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a Scan operation across all rows.
|
* Create a Scan operation across all rows.
|
||||||
*/
|
*/
|
||||||
|
@ -168,6 +170,7 @@ public class Scan extends OperationWithAttributes {
|
||||||
getScan = scan.isGetScan();
|
getScan = scan.isGetScan();
|
||||||
filter = scan.getFilter(); // clone?
|
filter = scan.getFilter(); // clone?
|
||||||
loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
|
loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue();
|
||||||
|
prefetching = scan.getPrefetching();
|
||||||
TimeRange ctr = scan.getTimeRange();
|
TimeRange ctr = scan.getTimeRange();
|
||||||
tr = new TimeRange(ctr.getMin(), ctr.getMax());
|
tr = new TimeRange(ctr.getMin(), ctr.getMax());
|
||||||
Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
|
Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
|
||||||
|
@ -201,6 +204,7 @@ public class Scan extends OperationWithAttributes {
|
||||||
this.storeOffset = get.getRowOffsetPerColumnFamily();
|
this.storeOffset = get.getRowOffsetPerColumnFamily();
|
||||||
this.tr = get.getTimeRange();
|
this.tr = get.getTimeRange();
|
||||||
this.familyMap = get.getFamilyMap();
|
this.familyMap = get.getFamilyMap();
|
||||||
|
this.prefetching = false;
|
||||||
this.getScan = true;
|
this.getScan = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -364,6 +368,21 @@ public class Scan extends OperationWithAttributes {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Set if pre-fetching is enabled. If enabled, the region
|
||||||
|
* server will try to read the next scan result ahead of time. This
|
||||||
|
* improves scan performance if we are doing large scans.
|
||||||
|
*
|
||||||
|
* @param enablePrefetching if pre-fetching is enabled or not
|
||||||
|
*/
|
||||||
|
public void setPrefetching(boolean enablePrefetching) {
|
||||||
|
this.prefetching = enablePrefetching;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getPrefetching() {
|
||||||
|
return prefetching;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
* @return the maximum result size in bytes. See {@link #setMaxResultSize(long)}
|
* @return the maximum result size in bytes. See {@link #setMaxResultSize(long)}
|
||||||
*/
|
*/
|
||||||
public long getMaxResultSize() {
|
public long getMaxResultSize() {
|
||||||
|
@ -613,6 +632,7 @@ public class Scan extends OperationWithAttributes {
|
||||||
map.put("maxResultSize", this.maxResultSize);
|
map.put("maxResultSize", this.maxResultSize);
|
||||||
map.put("cacheBlocks", this.cacheBlocks);
|
map.put("cacheBlocks", this.cacheBlocks);
|
||||||
map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
|
map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand);
|
||||||
|
map.put("prefetching", this.prefetching);
|
||||||
List<Long> timeRange = new ArrayList<Long>();
|
List<Long> timeRange = new ArrayList<Long>();
|
||||||
timeRange.add(this.tr.getMin());
|
timeRange.add(this.tr.getMin());
|
||||||
timeRange.add(this.tr.getMax());
|
timeRange.add(this.tr.getMax());
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
|
public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
|
||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
|
public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
|
||||||
private long scannerId = -1L;
|
long scannerId = -1L;
|
||||||
private boolean instantiated = false;
|
private boolean instantiated = false;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
private Scan scan;
|
private Scan scan;
|
||||||
|
@ -130,6 +130,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
/**
|
/**
|
||||||
* @see java.util.concurrent.Callable#call()
|
* @see java.util.concurrent.Callable#call()
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public Result [] call() throws IOException {
|
public Result [] call() throws IOException {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
if (scannerId != -1) {
|
if (scannerId != -1) {
|
||||||
|
|
|
@ -709,6 +709,9 @@ public final class ProtobufUtil {
|
||||||
if (scan.getBatch() > 0) {
|
if (scan.getBatch() > 0) {
|
||||||
scanBuilder.setBatchSize(scan.getBatch());
|
scanBuilder.setBatchSize(scan.getBatch());
|
||||||
}
|
}
|
||||||
|
if (scan.getCaching() > 0) {
|
||||||
|
scanBuilder.setCachingCount(scan.getCaching());
|
||||||
|
}
|
||||||
if (scan.getMaxResultSize() > 0) {
|
if (scan.getMaxResultSize() > 0) {
|
||||||
scanBuilder.setMaxResultSize(scan.getMaxResultSize());
|
scanBuilder.setMaxResultSize(scan.getMaxResultSize());
|
||||||
}
|
}
|
||||||
|
@ -716,6 +719,7 @@ public final class ProtobufUtil {
|
||||||
if (loadColumnFamiliesOnDemand != null) {
|
if (loadColumnFamiliesOnDemand != null) {
|
||||||
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
|
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue());
|
||||||
}
|
}
|
||||||
|
scanBuilder.setPrefetching(scan.getPrefetching());
|
||||||
scanBuilder.setMaxVersions(scan.getMaxVersions());
|
scanBuilder.setMaxVersions(scan.getMaxVersions());
|
||||||
TimeRange timeRange = scan.getTimeRange();
|
TimeRange timeRange = scan.getTimeRange();
|
||||||
if (!timeRange.isAllTime()) {
|
if (!timeRange.isAllTime()) {
|
||||||
|
@ -793,6 +797,9 @@ public final class ProtobufUtil {
|
||||||
if (proto.hasMaxVersions()) {
|
if (proto.hasMaxVersions()) {
|
||||||
scan.setMaxVersions(proto.getMaxVersions());
|
scan.setMaxVersions(proto.getMaxVersions());
|
||||||
}
|
}
|
||||||
|
if (proto.hasPrefetching()) {
|
||||||
|
scan.setPrefetching(proto.getPrefetching());
|
||||||
|
}
|
||||||
if (proto.hasStoreLimit()) {
|
if (proto.hasStoreLimit()) {
|
||||||
scan.setMaxResultsPerColumnFamily(proto.getStoreLimit());
|
scan.setMaxResultsPerColumnFamily(proto.getStoreLimit());
|
||||||
}
|
}
|
||||||
|
@ -821,6 +828,9 @@ public final class ProtobufUtil {
|
||||||
if (proto.hasBatchSize()) {
|
if (proto.hasBatchSize()) {
|
||||||
scan.setBatch(proto.getBatchSize());
|
scan.setBatch(proto.getBatchSize());
|
||||||
}
|
}
|
||||||
|
if (proto.hasCachingCount()) {
|
||||||
|
scan.setCaching(proto.getCachingCount());
|
||||||
|
}
|
||||||
if (proto.hasMaxResultSize()) {
|
if (proto.hasMaxResultSize()) {
|
||||||
scan.setMaxResultSize(proto.getMaxResultSize());
|
scan.setMaxResultSize(proto.getMaxResultSize());
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.util;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.lang.Thread.UncaughtExceptionHandler;
|
import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
import java.util.concurrent.RejectedExecutionHandler;
|
||||||
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -181,6 +184,40 @@ public class Threads {
|
||||||
return boundedCachedThreadPool;
|
return boundedCachedThreadPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a ThreadPoolExecutor which has a bound on the number of tasks that can be
|
||||||
|
* submitted to it, determined by the blockingLimit parameter. Excess tasks
|
||||||
|
* submitted will block on the calling thread till space frees up.
|
||||||
|
*
|
||||||
|
* @param blockingLimit max number of tasks that can be submitted
|
||||||
|
* @param timeout time value after which unused threads are killed
|
||||||
|
* @param unit time unit for killing unused threads
|
||||||
|
* @param threadFactory thread factory to use to spawn threads
|
||||||
|
* @return the ThreadPoolExecutor
|
||||||
|
*/
|
||||||
|
public static ThreadPoolExecutor getBlockingThreadPool(
|
||||||
|
int blockingLimit, long timeout, TimeUnit unit,
|
||||||
|
ThreadFactory threadFactory) {
|
||||||
|
ThreadPoolExecutor blockingThreadPool =
|
||||||
|
new ThreadPoolExecutor(
|
||||||
|
1, blockingLimit, timeout, TimeUnit.SECONDS,
|
||||||
|
new SynchronousQueue<Runnable>(),
|
||||||
|
threadFactory,
|
||||||
|
new RejectedExecutionHandler() {
|
||||||
|
@Override
|
||||||
|
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||||
|
try {
|
||||||
|
// The submitting thread will block until the thread pool frees up.
|
||||||
|
executor.getQueue().put(r);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RejectedExecutionException(
|
||||||
|
"Failed to requeue the rejected request because of ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
blockingThreadPool.allowCoreThreadTimeOut(true);
|
||||||
|
return blockingThreadPool;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
|
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
|
||||||
|
|
|
@ -10622,6 +10622,14 @@ public final class ClientProtos {
|
||||||
// optional bool loadColumnFamiliesOnDemand = 13;
|
// optional bool loadColumnFamiliesOnDemand = 13;
|
||||||
boolean hasLoadColumnFamiliesOnDemand();
|
boolean hasLoadColumnFamiliesOnDemand();
|
||||||
boolean getLoadColumnFamiliesOnDemand();
|
boolean getLoadColumnFamiliesOnDemand();
|
||||||
|
|
||||||
|
// optional uint32 cachingCount = 14;
|
||||||
|
boolean hasCachingCount();
|
||||||
|
int getCachingCount();
|
||||||
|
|
||||||
|
// optional bool prefetching = 15;
|
||||||
|
boolean hasPrefetching();
|
||||||
|
boolean getPrefetching();
|
||||||
}
|
}
|
||||||
public static final class Scan extends
|
public static final class Scan extends
|
||||||
com.google.protobuf.GeneratedMessage
|
com.google.protobuf.GeneratedMessage
|
||||||
|
@ -10810,6 +10818,26 @@ public final class ClientProtos {
|
||||||
return loadColumnFamiliesOnDemand_;
|
return loadColumnFamiliesOnDemand_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional uint32 cachingCount = 14;
|
||||||
|
public static final int CACHINGCOUNT_FIELD_NUMBER = 14;
|
||||||
|
private int cachingCount_;
|
||||||
|
public boolean hasCachingCount() {
|
||||||
|
return ((bitField0_ & 0x00000800) == 0x00000800);
|
||||||
|
}
|
||||||
|
public int getCachingCount() {
|
||||||
|
return cachingCount_;
|
||||||
|
}
|
||||||
|
|
||||||
|
// optional bool prefetching = 15;
|
||||||
|
public static final int PREFETCHING_FIELD_NUMBER = 15;
|
||||||
|
private boolean prefetching_;
|
||||||
|
public boolean hasPrefetching() {
|
||||||
|
return ((bitField0_ & 0x00001000) == 0x00001000);
|
||||||
|
}
|
||||||
|
public boolean getPrefetching() {
|
||||||
|
return prefetching_;
|
||||||
|
}
|
||||||
|
|
||||||
private void initFields() {
|
private void initFields() {
|
||||||
column_ = java.util.Collections.emptyList();
|
column_ = java.util.Collections.emptyList();
|
||||||
attribute_ = java.util.Collections.emptyList();
|
attribute_ = java.util.Collections.emptyList();
|
||||||
|
@ -10824,6 +10852,8 @@ public final class ClientProtos {
|
||||||
storeLimit_ = 0;
|
storeLimit_ = 0;
|
||||||
storeOffset_ = 0;
|
storeOffset_ = 0;
|
||||||
loadColumnFamiliesOnDemand_ = false;
|
loadColumnFamiliesOnDemand_ = false;
|
||||||
|
cachingCount_ = 0;
|
||||||
|
prefetching_ = false;
|
||||||
}
|
}
|
||||||
private byte memoizedIsInitialized = -1;
|
private byte memoizedIsInitialized = -1;
|
||||||
public final boolean isInitialized() {
|
public final boolean isInitialized() {
|
||||||
|
@ -10894,6 +10924,12 @@ public final class ClientProtos {
|
||||||
if (((bitField0_ & 0x00000400) == 0x00000400)) {
|
if (((bitField0_ & 0x00000400) == 0x00000400)) {
|
||||||
output.writeBool(13, loadColumnFamiliesOnDemand_);
|
output.writeBool(13, loadColumnFamiliesOnDemand_);
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000800) == 0x00000800)) {
|
||||||
|
output.writeUInt32(14, cachingCount_);
|
||||||
|
}
|
||||||
|
if (((bitField0_ & 0x00001000) == 0x00001000)) {
|
||||||
|
output.writeBool(15, prefetching_);
|
||||||
|
}
|
||||||
getUnknownFields().writeTo(output);
|
getUnknownFields().writeTo(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10955,6 +10991,14 @@ public final class ClientProtos {
|
||||||
size += com.google.protobuf.CodedOutputStream
|
size += com.google.protobuf.CodedOutputStream
|
||||||
.computeBoolSize(13, loadColumnFamiliesOnDemand_);
|
.computeBoolSize(13, loadColumnFamiliesOnDemand_);
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000800) == 0x00000800)) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeUInt32Size(14, cachingCount_);
|
||||||
|
}
|
||||||
|
if (((bitField0_ & 0x00001000) == 0x00001000)) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeBoolSize(15, prefetching_);
|
||||||
|
}
|
||||||
size += getUnknownFields().getSerializedSize();
|
size += getUnknownFields().getSerializedSize();
|
||||||
memoizedSerializedSize = size;
|
memoizedSerializedSize = size;
|
||||||
return size;
|
return size;
|
||||||
|
@ -11037,6 +11081,16 @@ public final class ClientProtos {
|
||||||
result = result && (getLoadColumnFamiliesOnDemand()
|
result = result && (getLoadColumnFamiliesOnDemand()
|
||||||
== other.getLoadColumnFamiliesOnDemand());
|
== other.getLoadColumnFamiliesOnDemand());
|
||||||
}
|
}
|
||||||
|
result = result && (hasCachingCount() == other.hasCachingCount());
|
||||||
|
if (hasCachingCount()) {
|
||||||
|
result = result && (getCachingCount()
|
||||||
|
== other.getCachingCount());
|
||||||
|
}
|
||||||
|
result = result && (hasPrefetching() == other.hasPrefetching());
|
||||||
|
if (hasPrefetching()) {
|
||||||
|
result = result && (getPrefetching()
|
||||||
|
== other.getPrefetching());
|
||||||
|
}
|
||||||
result = result &&
|
result = result &&
|
||||||
getUnknownFields().equals(other.getUnknownFields());
|
getUnknownFields().equals(other.getUnknownFields());
|
||||||
return result;
|
return result;
|
||||||
|
@ -11098,6 +11152,14 @@ public final class ClientProtos {
|
||||||
hash = (37 * hash) + LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER;
|
hash = (37 * hash) + LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER;
|
||||||
hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
|
hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand());
|
||||||
}
|
}
|
||||||
|
if (hasCachingCount()) {
|
||||||
|
hash = (37 * hash) + CACHINGCOUNT_FIELD_NUMBER;
|
||||||
|
hash = (53 * hash) + getCachingCount();
|
||||||
|
}
|
||||||
|
if (hasPrefetching()) {
|
||||||
|
hash = (37 * hash) + PREFETCHING_FIELD_NUMBER;
|
||||||
|
hash = (53 * hash) + hashBoolean(getPrefetching());
|
||||||
|
}
|
||||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||||
return hash;
|
return hash;
|
||||||
}
|
}
|
||||||
|
@ -11260,6 +11322,10 @@ public final class ClientProtos {
|
||||||
bitField0_ = (bitField0_ & ~0x00000800);
|
bitField0_ = (bitField0_ & ~0x00000800);
|
||||||
loadColumnFamiliesOnDemand_ = false;
|
loadColumnFamiliesOnDemand_ = false;
|
||||||
bitField0_ = (bitField0_ & ~0x00001000);
|
bitField0_ = (bitField0_ & ~0x00001000);
|
||||||
|
cachingCount_ = 0;
|
||||||
|
bitField0_ = (bitField0_ & ~0x00002000);
|
||||||
|
prefetching_ = false;
|
||||||
|
bitField0_ = (bitField0_ & ~0x00004000);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11368,6 +11434,14 @@ public final class ClientProtos {
|
||||||
to_bitField0_ |= 0x00000400;
|
to_bitField0_ |= 0x00000400;
|
||||||
}
|
}
|
||||||
result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
|
result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_;
|
||||||
|
if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
|
||||||
|
to_bitField0_ |= 0x00000800;
|
||||||
|
}
|
||||||
|
result.cachingCount_ = cachingCount_;
|
||||||
|
if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
|
||||||
|
to_bitField0_ |= 0x00001000;
|
||||||
|
}
|
||||||
|
result.prefetching_ = prefetching_;
|
||||||
result.bitField0_ = to_bitField0_;
|
result.bitField0_ = to_bitField0_;
|
||||||
onBuilt();
|
onBuilt();
|
||||||
return result;
|
return result;
|
||||||
|
@ -11469,6 +11543,12 @@ public final class ClientProtos {
|
||||||
if (other.hasLoadColumnFamiliesOnDemand()) {
|
if (other.hasLoadColumnFamiliesOnDemand()) {
|
||||||
setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
|
setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand());
|
||||||
}
|
}
|
||||||
|
if (other.hasCachingCount()) {
|
||||||
|
setCachingCount(other.getCachingCount());
|
||||||
|
}
|
||||||
|
if (other.hasPrefetching()) {
|
||||||
|
setPrefetching(other.getPrefetching());
|
||||||
|
}
|
||||||
this.mergeUnknownFields(other.getUnknownFields());
|
this.mergeUnknownFields(other.getUnknownFields());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -11593,6 +11673,16 @@ public final class ClientProtos {
|
||||||
loadColumnFamiliesOnDemand_ = input.readBool();
|
loadColumnFamiliesOnDemand_ = input.readBool();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 112: {
|
||||||
|
bitField0_ |= 0x00002000;
|
||||||
|
cachingCount_ = input.readUInt32();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 120: {
|
||||||
|
bitField0_ |= 0x00004000;
|
||||||
|
prefetching_ = input.readBool();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12346,6 +12436,48 @@ public final class ClientProtos {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional uint32 cachingCount = 14;
|
||||||
|
private int cachingCount_ ;
|
||||||
|
public boolean hasCachingCount() {
|
||||||
|
return ((bitField0_ & 0x00002000) == 0x00002000);
|
||||||
|
}
|
||||||
|
public int getCachingCount() {
|
||||||
|
return cachingCount_;
|
||||||
|
}
|
||||||
|
public Builder setCachingCount(int value) {
|
||||||
|
bitField0_ |= 0x00002000;
|
||||||
|
cachingCount_ = value;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
public Builder clearCachingCount() {
|
||||||
|
bitField0_ = (bitField0_ & ~0x00002000);
|
||||||
|
cachingCount_ = 0;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
// optional bool prefetching = 15;
|
||||||
|
private boolean prefetching_ ;
|
||||||
|
public boolean hasPrefetching() {
|
||||||
|
return ((bitField0_ & 0x00004000) == 0x00004000);
|
||||||
|
}
|
||||||
|
public boolean getPrefetching() {
|
||||||
|
return prefetching_;
|
||||||
|
}
|
||||||
|
public Builder setPrefetching(boolean value) {
|
||||||
|
bitField0_ |= 0x00004000;
|
||||||
|
prefetching_ = value;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
public Builder clearPrefetching() {
|
||||||
|
bitField0_ = (bitField0_ & ~0x00004000);
|
||||||
|
prefetching_ = false;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
// @@protoc_insertion_point(builder_scope:Scan)
|
// @@protoc_insertion_point(builder_scope:Scan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21467,7 +21599,7 @@ public final class ClientProtos {
|
||||||
"ation\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition" +
|
"ation\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition" +
|
||||||
"\030\003 \001(\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006" +
|
"\030\003 \001(\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006" +
|
||||||
"result\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010" +
|
"result\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010" +
|
||||||
"\"\307\002\n\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tat" +
|
"\"\362\002\n\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tat" +
|
||||||
"tribute\030\002 \003(\0132\016.NameBytesPair\022\020\n\010startRo" +
|
"tribute\030\002 \003(\0132\016.NameBytesPair\022\020\n\010startRo" +
|
||||||
"w\030\003 \001(\014\022\017\n\007stopRow\030\004 \001(\014\022\027\n\006filter\030\005 \001(\013" +
|
"w\030\003 \001(\014\022\017\n\007stopRow\030\004 \001(\014\022\027\n\006filter\030\005 \001(\013" +
|
||||||
"2\007.Filter\022\035\n\ttimeRange\030\006 \001(\0132\n.TimeRange" +
|
"2\007.Filter\022\035\n\ttimeRange\030\006 \001(\0132\n.TimeRange" +
|
||||||
|
@ -21475,45 +21607,46 @@ public final class ClientProtos {
|
||||||
"\010 \001(\010:\004true\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxRes" +
|
"\010 \001(\010:\004true\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxRes" +
|
||||||
"ultSize\030\n \001(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013sto" +
|
"ultSize\030\n \001(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013sto" +
|
||||||
"reOffset\030\014 \001(\r\022\"\n\032loadColumnFamiliesOnDe" +
|
"reOffset\030\014 \001(\r\022\"\n\032loadColumnFamiliesOnDe" +
|
||||||
"mand\030\r \001(\010\"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001" +
|
"mand\030\r \001(\010\022\024\n\014cachingCount\030\016 \001(\r\022\023\n\013pref" +
|
||||||
"(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Sca" +
|
"etching\030\017 \001(\010\"\230\001\n\013ScanRequest\022 \n\006region\030" +
|
||||||
"n\022\021\n\tscannerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001" +
|
"\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(\0132\005." +
|
||||||
"(\r\022\024\n\014closeScanner\030\005 \001(\010\022\023\n\013nextCallSeq\030" +
|
"Scan\022\021\n\tscannerId\030\003 \001(\004\022\024\n\014numberOfRows\030" +
|
||||||
"\006 \001(\004\"l\n\014ScanResponse\022\'\n\016resultCellMeta\030" +
|
"\004 \001(\r\022\024\n\014closeScanner\030\005 \001(\010\022\023\n\013nextCallS" +
|
||||||
"\001 \001(\0132\017.ResultCellMeta\022\021\n\tscannerId\030\002 \001(" +
|
"eq\030\006 \001(\004\"l\n\014ScanResponse\022\'\n\016resultCellMe" +
|
||||||
"\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"%\n\016R",
|
"ta\030\001 \001(\0132\017.ResultCellMeta\022\021\n\tscannerId\030\002",
|
||||||
"esultCellMeta\022\023\n\013cellsLength\030\001 \003(\r\"\260\001\n\024B" +
|
" \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"%" +
|
||||||
"ulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.Re" +
|
"\n\016ResultCellMeta\022\023\n\013cellsLength\030\001 \003(\r\"\260\001" +
|
||||||
"gionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .Bulk" +
|
"\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020" +
|
||||||
"LoadHFileRequest.FamilyPath\022\024\n\014assignSeq" +
|
".RegionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .B" +
|
||||||
"Num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022" +
|
"ulkLoadHFileRequest.FamilyPath\022\024\n\014assign" +
|
||||||
"\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016" +
|
"SeqNum\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002" +
|
||||||
"\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceCall" +
|
"(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" +
|
||||||
"\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n\nme" +
|
"e\022\016\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceC" +
|
||||||
"thodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Copro" +
|
"all\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n" +
|
||||||
"cessorServiceRequest\022 \n\006region\030\001 \002(\0132\020.R",
|
"\nmethodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Co",
|
||||||
"egionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coprocess" +
|
"processorServiceRequest\022 \n\006region\030\001 \002(\0132" +
|
||||||
"orServiceCall\"]\n\032CoprocessorServiceRespo" +
|
"\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coproc" +
|
||||||
"nse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n" +
|
"essorServiceCall\"]\n\032CoprocessorServiceRe" +
|
||||||
"\005value\030\002 \002(\0132\016.NameBytesPair\"B\n\013MultiAct" +
|
"sponse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
|
||||||
"ion\022 \n\010mutation\030\001 \001(\0132\016.MutationProto\022\021\n" +
|
"\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"B\n\013Multi" +
|
||||||
"\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005valu" +
|
"Action\022 \n\010mutation\030\001 \001(\0132\016.MutationProto" +
|
||||||
"e\030\001 \001(\0132\007.Result\022!\n\texception\030\002 \001(\0132\016.Na" +
|
"\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005v" +
|
||||||
"meBytesPair\"^\n\014MultiRequest\022 \n\006region\030\001 " +
|
"alue\030\001 \001(\0132\007.Result\022!\n\texception\030\002 \001(\0132\016" +
|
||||||
"\002(\0132\020.RegionSpecifier\022\034\n\006action\030\002 \003(\0132\014." +
|
".NameBytesPair\"^\n\014MultiRequest\022 \n\006region" +
|
||||||
"MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResp",
|
"\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action\030\002 \003(\013",
|
||||||
"onse\022\035\n\006result\030\001 \003(\0132\r.ActionResult2\342\002\n\r" +
|
"2\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiR" +
|
||||||
"ClientService\022 \n\003get\022\013.GetRequest\032\014.GetR" +
|
"esponse\022\035\n\006result\030\001 \003(\0132\r.ActionResult2\342" +
|
||||||
"esponse\022/\n\010multiGet\022\020.MultiGetRequest\032\021." +
|
"\002\n\rClientService\022 \n\003get\022\013.GetRequest\032\014.G" +
|
||||||
"MultiGetResponse\022)\n\006mutate\022\016.MutateReque" +
|
"etResponse\022/\n\010multiGet\022\020.MultiGetRequest" +
|
||||||
"st\032\017.MutateResponse\022#\n\004scan\022\014.ScanReques" +
|
"\032\021.MultiGetResponse\022)\n\006mutate\022\016.MutateRe" +
|
||||||
"t\032\r.ScanResponse\022>\n\rbulkLoadHFile\022\025.Bulk" +
|
"quest\032\017.MutateResponse\022#\n\004scan\022\014.ScanReq" +
|
||||||
"LoadHFileRequest\032\026.BulkLoadHFileResponse" +
|
"uest\032\r.ScanResponse\022>\n\rbulkLoadHFile\022\025.B" +
|
||||||
"\022F\n\013execService\022\032.CoprocessorServiceRequ" +
|
"ulkLoadHFileRequest\032\026.BulkLoadHFileRespo" +
|
||||||
"est\032\033.CoprocessorServiceResponse\022&\n\005mult" +
|
"nse\022F\n\013execService\022\032.CoprocessorServiceR" +
|
||||||
"i\022\r.MultiRequest\032\016.MultiResponseBB\n*org.",
|
"equest\032\033.CoprocessorServiceResponse\022&\n\005m",
|
||||||
"apache.hadoop.hbase.protobuf.generatedB\014" +
|
"ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" +
|
||||||
"ClientProtosH\001\210\001\001\240\001\001"
|
"rg.apache.hadoop.hbase.protobuf.generate" +
|
||||||
|
"dB\014ClientProtosH\001\210\001\001\240\001\001"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
@ -21629,7 +21762,7 @@ public final class ClientProtos {
|
||||||
internal_static_Scan_fieldAccessorTable = new
|
internal_static_Scan_fieldAccessorTable = new
|
||||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||||
internal_static_Scan_descriptor,
|
internal_static_Scan_descriptor,
|
||||||
new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", },
|
new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "CachingCount", "Prefetching", },
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
|
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class,
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
|
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class);
|
||||||
internal_static_ScanRequest_descriptor =
|
internal_static_ScanRequest_descriptor =
|
||||||
|
|
|
@ -233,6 +233,8 @@ message Scan {
|
||||||
optional uint32 storeLimit = 11;
|
optional uint32 storeLimit = 11;
|
||||||
optional uint32 storeOffset = 12;
|
optional uint32 storeOffset = 12;
|
||||||
optional bool loadColumnFamiliesOnDemand = 13; /* DO NOT add defaults to loadColumnFamiliesOnDemand. */
|
optional bool loadColumnFamiliesOnDemand = 13; /* DO NOT add defaults to loadColumnFamiliesOnDemand. */
|
||||||
|
optional uint32 cachingCount = 14;
|
||||||
|
optional bool prefetching = 15;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -185,6 +185,7 @@ import com.google.protobuf.Service;
|
||||||
* defines the keyspace for this HRegion.
|
* defines the keyspace for this HRegion.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public class HRegion implements HeapSize { // , Writable{
|
public class HRegion implements HeapSize { // , Writable{
|
||||||
public static final Log LOG = LogFactory.getLog(HRegion.class);
|
public static final Log LOG = LogFactory.getLog(HRegion.class);
|
||||||
|
|
||||||
|
@ -3603,7 +3604,6 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
return returnResult;
|
return returnResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void populateFromJoinedHeap(List<KeyValue> results, int limit)
|
private void populateFromJoinedHeap(List<KeyValue> results, int limit)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assert joinedContinuationRow != null;
|
assert joinedContinuationRow != null;
|
||||||
|
|
|
@ -45,6 +45,8 @@ import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
@ -59,12 +61,12 @@ import org.apache.hadoop.hbase.CellScannable;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.Chore;
|
import org.apache.hadoop.hbase.Chore;
|
||||||
|
import org.apache.hadoop.hbase.DaemonThreadFactory;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.HealthCheckChore;
|
import org.apache.hadoop.hbase.HealthCheckChore;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
@ -164,8 +166,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||||
|
@ -479,6 +481,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
// Table level lock manager for locking for region operations
|
// Table level lock manager for locking for region operations
|
||||||
private TableLockManager tableLockManager;
|
private TableLockManager tableLockManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Threadpool for doing scanner prefetches
|
||||||
|
*/
|
||||||
|
protected ThreadPoolExecutor scanPrefetchThreadPool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts a HRegionServer at the default location
|
* Starts a HRegionServer at the default location
|
||||||
*
|
*
|
||||||
|
@ -616,14 +623,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
}
|
}
|
||||||
|
|
||||||
RegionScanner getScanner(long scannerId) {
|
RegionScanner getScanner(long scannerId) {
|
||||||
String scannerIdString = Long.toString(scannerId);
|
RegionScannerHolder scannerHolder = getScannerHolder(scannerId);
|
||||||
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
|
|
||||||
if (scannerHolder != null) {
|
if (scannerHolder != null) {
|
||||||
return scannerHolder.s;
|
return scannerHolder.scanner;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RegionScannerHolder getScannerHolder(long scannerId) {
|
||||||
|
String scannerIdString = Long.toString(scannerId);
|
||||||
|
return scanners.get(scannerIdString);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All initialization needed before we go register with Master.
|
* All initialization needed before we go register with Master.
|
||||||
*
|
*
|
||||||
|
@ -837,6 +848,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
if (this.thriftServer != null) this.thriftServer.shutdown();
|
if (this.thriftServer != null) this.thriftServer.shutdown();
|
||||||
this.leases.closeAfterLeasesExpire();
|
this.leases.closeAfterLeasesExpire();
|
||||||
this.rpcServer.stop();
|
this.rpcServer.stop();
|
||||||
|
|
||||||
|
if (scanPrefetchThreadPool != null) {
|
||||||
|
// shutdown the prefetch threads
|
||||||
|
scanPrefetchThreadPool.shutdownNow();
|
||||||
|
}
|
||||||
if (this.splitLogWorker != null) {
|
if (this.splitLogWorker != null) {
|
||||||
splitLogWorker.stop();
|
splitLogWorker.stop();
|
||||||
}
|
}
|
||||||
|
@ -1107,7 +1123,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
// exception next time they come in.
|
// exception next time they come in.
|
||||||
for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
|
for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
|
||||||
try {
|
try {
|
||||||
e.getValue().s.close();
|
e.getValue().closeScanner();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Closing scanner " + e.getKey(), ioe);
|
LOG.warn("Closing scanner " + e.getKey(), ioe);
|
||||||
}
|
}
|
||||||
|
@ -1537,6 +1553,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
this.replicationSinkHandler.startReplicationService();
|
this.replicationSinkHandler.startReplicationService();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start the scanner prefetch threadpool
|
||||||
|
int numHandlers = conf.getInt("hbase.regionserver.prefetcher.threads.max",
|
||||||
|
conf.getInt("hbase.regionserver.handler.count", 10)
|
||||||
|
+ conf.getInt("hbase.regionserver.metahandler.count", 10));
|
||||||
|
scanPrefetchThreadPool =
|
||||||
|
Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS,
|
||||||
|
new DaemonThreadFactory(RegionScannerHolder.PREFETCHER_THREAD_PREFIX));
|
||||||
|
|
||||||
// Start Server. This service is like leases in that it internally runs
|
// Start Server. This service is like leases in that it internally runs
|
||||||
// a thread.
|
// a thread.
|
||||||
this.rpcServer.start();
|
this.rpcServer.start();
|
||||||
|
@ -1831,8 +1855,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
InetSocketAddress isa =
|
new InetSocketAddress(sn.getHostname(), sn.getPort());
|
||||||
new InetSocketAddress(sn.getHostname(), sn.getPort());
|
|
||||||
|
|
||||||
LOG.info("Attempting connect to Master server at " +
|
LOG.info("Attempting connect to Master server at " +
|
||||||
this.masterAddressManager.getMasterAddress());
|
this.masterAddressManager.getMasterAddress());
|
||||||
|
@ -2325,7 +2348,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
public void leaseExpired() {
|
public void leaseExpired() {
|
||||||
RegionScannerHolder rsh = scanners.remove(this.scannerName);
|
RegionScannerHolder rsh = scanners.remove(this.scannerName);
|
||||||
if (rsh != null) {
|
if (rsh != null) {
|
||||||
RegionScanner s = rsh.s;
|
RegionScanner s = rsh.scanner;
|
||||||
LOG.info("Scanner " + this.scannerName + " lease expired on region "
|
LOG.info("Scanner " + this.scannerName + " lease expired on region "
|
||||||
+ s.getRegionInfo().getRegionNameAsString());
|
+ s.getRegionInfo().getRegionNameAsString());
|
||||||
try {
|
try {
|
||||||
|
@ -2334,7 +2357,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
region.getCoprocessorHost().preScannerClose(s);
|
region.getCoprocessorHost().preScannerClose(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
s.close();
|
rsh.closeScanner();
|
||||||
if (region != null && region.getCoprocessorHost() != null) {
|
if (region != null && region.getCoprocessorHost() != null) {
|
||||||
region.getCoprocessorHost().postScannerClose(s);
|
region.getCoprocessorHost().postScannerClose(s);
|
||||||
}
|
}
|
||||||
|
@ -2638,20 +2661,22 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
return this.fsOk;
|
return this.fsOk;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
|
protected RegionScannerHolder addScanner(
|
||||||
|
RegionScanner s, HRegion r) throws LeaseStillHeldException {
|
||||||
|
RegionScannerHolder holder = new RegionScannerHolder(this, s, r);
|
||||||
|
String scannerName = null;
|
||||||
long scannerId = -1;
|
long scannerId = -1;
|
||||||
while (true) {
|
while (true) {
|
||||||
scannerId = rand.nextLong();
|
scannerId = nextLong();
|
||||||
if (scannerId == -1) continue;
|
scannerName = String.valueOf(scannerId);
|
||||||
String scannerName = String.valueOf(scannerId);
|
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, holder);
|
||||||
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
|
|
||||||
if (existing == null) {
|
if (existing == null) {
|
||||||
|
holder.scannerName = scannerName;
|
||||||
this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
|
this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
|
||||||
new ScannerListener(scannerName));
|
new ScannerListener(scannerName));
|
||||||
break;
|
return holder;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return scannerId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2913,7 +2938,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
@Override
|
@Override
|
||||||
public ScanResponse scan(final RpcController controller,
|
public ScanResponse scan(final RpcController controller,
|
||||||
final ScanRequest request) throws ServiceException {
|
final ScanRequest request) throws ServiceException {
|
||||||
Leases.Lease lease = null;
|
|
||||||
String scannerName = null;
|
String scannerName = null;
|
||||||
try {
|
try {
|
||||||
if (!request.hasScannerId() && !request.hasScan()) {
|
if (!request.hasScannerId() && !request.hasScan()) {
|
||||||
|
@ -2963,7 +2987,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
throw new UnknownScannerException(
|
throw new UnknownScannerException(
|
||||||
"Name: " + scannerName + ", already closed?");
|
"Name: " + scannerName + ", already closed?");
|
||||||
}
|
}
|
||||||
scanner = rsh.s;
|
scanner = rsh.scanner;
|
||||||
|
// Use the region found in the online region list,
|
||||||
|
// not that one in the RegionScannerHolder. So that we can
|
||||||
|
// make sure the region is still open in this region server.
|
||||||
region = getRegion(scanner.getRegionInfo().getRegionName());
|
region = getRegion(scanner.getRegionInfo().getRegionName());
|
||||||
} else {
|
} else {
|
||||||
region = getRegion(request.getRegion());
|
region = getRegion(request.getRegion());
|
||||||
|
@ -2974,7 +3001,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
if (!isLoadingCfsOnDemandSet) {
|
if (!isLoadingCfsOnDemandSet) {
|
||||||
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
|
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
|
||||||
}
|
}
|
||||||
byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
|
||||||
region.prepareScanner(scan);
|
region.prepareScanner(scan);
|
||||||
if (region.getCoprocessorHost() != null) {
|
if (region.getCoprocessorHost() != null) {
|
||||||
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
||||||
|
@ -2985,9 +3011,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
if (region.getCoprocessorHost() != null) {
|
if (region.getCoprocessorHost() != null) {
|
||||||
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
|
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
|
||||||
}
|
}
|
||||||
scannerId = addScanner(scanner);
|
rsh = addScanner(scanner, region);
|
||||||
scannerName = String.valueOf(scannerId);
|
scannerName = rsh.scannerName;
|
||||||
|
scannerId = Long.parseLong(scannerName);
|
||||||
|
|
||||||
ttl = this.scannerLeaseTimeoutPeriod;
|
ttl = this.scannerLeaseTimeoutPeriod;
|
||||||
|
if (scan.getPrefetching()) {
|
||||||
|
rsh.enablePrefetching(scan.getCaching());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rows > 0) {
|
if (rows > 0) {
|
||||||
|
@ -2995,110 +3026,34 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
// performed even before checking of Lease.
|
// performed even before checking of Lease.
|
||||||
// See HBASE-5974
|
// See HBASE-5974
|
||||||
if (request.hasNextCallSeq()) {
|
if (request.hasNextCallSeq()) {
|
||||||
if (rsh == null) {
|
if (request.getNextCallSeq() != rsh.nextCallSeq) {
|
||||||
rsh = scanners.get(scannerName);
|
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
|
||||||
}
|
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
|
||||||
if (rsh != null) {
|
"; request=" + TextFormat.shortDebugString(request));
|
||||||
if (request.getNextCallSeq() != rsh.nextCallSeq) {
|
|
||||||
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
|
|
||||||
+ " But the nextCallSeq got from client: " + request.getNextCallSeq() +
|
|
||||||
"; request=" + TextFormat.shortDebugString(request));
|
|
||||||
}
|
|
||||||
// Increment the nextCallSeq value which is the next expected from client.
|
|
||||||
rsh.nextCallSeq++;
|
|
||||||
}
|
}
|
||||||
|
// Increment the nextCallSeq value which is the next expected from client.
|
||||||
|
rsh.nextCallSeq++;
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
// Remove lease while its being processed in server; protects against case
|
|
||||||
// where processing of request takes > lease expiration time.
|
|
||||||
lease = leases.removeLease(scannerName);
|
|
||||||
List<Result> results = new ArrayList<Result>(rows);
|
|
||||||
long currentScanResultSize = 0;
|
|
||||||
|
|
||||||
boolean done = false;
|
ttl = this.scannerLeaseTimeoutPeriod;
|
||||||
// Call coprocessor. Get region info from scanner.
|
ScanResult result = rsh.getScanResult(rows);
|
||||||
if (region != null && region.getCoprocessorHost() != null) {
|
if (result.isException) {
|
||||||
Boolean bypass = region.getCoprocessorHost().preScannerNext(
|
throw result.ioException;
|
||||||
scanner, results, rows);
|
}
|
||||||
if (!results.isEmpty()) {
|
|
||||||
for (Result r : results) {
|
|
||||||
if (maxScannerResultSize < Long.MAX_VALUE){
|
|
||||||
for (KeyValue kv : r.raw()) {
|
|
||||||
currentScanResultSize += kv.heapSize();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (bypass != null && bypass.booleanValue()) {
|
|
||||||
done = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!done) {
|
moreResults = result.moreResults;
|
||||||
long maxResultSize = scanner.getMaxResultSize();
|
if (result.results != null) {
|
||||||
if (maxResultSize <= 0) {
|
List<CellScannable> cellScannables =
|
||||||
maxResultSize = maxScannerResultSize;
|
new ArrayList<CellScannable>(result.results.size());
|
||||||
}
|
ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
|
||||||
List<KeyValue> values = new ArrayList<KeyValue>();
|
for (Result res : result.results) {
|
||||||
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
|
cellScannables.add(res);
|
||||||
region.startRegionOperation(Operation.SCAN);
|
rcmBuilder.addCellsLength(res.size());
|
||||||
try {
|
|
||||||
int i = 0;
|
|
||||||
synchronized(scanner) {
|
|
||||||
for (; i < rows
|
|
||||||
&& currentScanResultSize < maxResultSize; i++) {
|
|
||||||
// Collect values to be returned here
|
|
||||||
boolean moreRows = scanner.nextRaw(values);
|
|
||||||
if (!values.isEmpty()) {
|
|
||||||
if (maxScannerResultSize < Long.MAX_VALUE){
|
|
||||||
for (KeyValue kv : values) {
|
|
||||||
currentScanResultSize += kv.heapSize();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
results.add(new Result(values));
|
|
||||||
}
|
|
||||||
if (!moreRows) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
values.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
region.readRequestsCount.add(i);
|
|
||||||
} finally {
|
|
||||||
region.closeRegionOperation();
|
|
||||||
}
|
|
||||||
|
|
||||||
// coprocessor postNext hook
|
|
||||||
if (region != null && region.getCoprocessorHost() != null) {
|
|
||||||
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the scanner's filter - if any - is done with the scan
|
|
||||||
// and wants to tell the client to stop the scan. This is done by passing
|
|
||||||
// a null result, and setting moreResults to false.
|
|
||||||
if (scanner.isFilterDone() && results.isEmpty()) {
|
|
||||||
moreResults = false;
|
|
||||||
results = null;
|
|
||||||
} else {
|
|
||||||
ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder();
|
|
||||||
List<CellScannable> cellScannables = new ArrayList<CellScannable>(results.size());
|
|
||||||
for (Result res : results) {
|
|
||||||
cellScannables.add(res);
|
|
||||||
rcmBuilder.addCellsLength(res.size());
|
|
||||||
}
|
|
||||||
builder.setResultCellMeta(rcmBuilder.build());
|
|
||||||
// TODO is this okey to assume the type and cast
|
|
||||||
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
|
|
||||||
.createCellScanner(cellScannables));
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
// We're done. On way out re-add the above removed lease.
|
|
||||||
// Adding resets expiration time on lease.
|
|
||||||
if (scanners.containsKey(scannerName)) {
|
|
||||||
if (lease != null) leases.addLease(lease);
|
|
||||||
ttl = this.scannerLeaseTimeoutPeriod;
|
|
||||||
}
|
}
|
||||||
|
builder.setResultCellMeta(rcmBuilder.build());
|
||||||
|
// TODO is this okey to assume the type and cast
|
||||||
|
((PayloadCarryingRpcController) controller).setCellScanner(CellUtil
|
||||||
|
.createCellScanner(cellScannables));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3112,9 +3067,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
}
|
}
|
||||||
rsh = scanners.remove(scannerName);
|
rsh = scanners.remove(scannerName);
|
||||||
if (rsh != null) {
|
if (rsh != null) {
|
||||||
scanner = rsh.s;
|
rsh.closeScanner();
|
||||||
scanner.close();
|
try {
|
||||||
leases.cancelLease(scannerName);
|
leases.cancelLease(scannerName);
|
||||||
|
} catch (LeaseException le) {
|
||||||
|
// That's ok, since the lease may be gone with
|
||||||
|
// the prefetcher when cancelled.
|
||||||
|
}
|
||||||
if (region != null && region.getCoprocessorHost() != null) {
|
if (region != null && region.getCoprocessorHost() != null) {
|
||||||
region.getCoprocessorHost().postScannerClose(scanner);
|
region.getCoprocessorHost().postScannerClose(scanner);
|
||||||
}
|
}
|
||||||
|
@ -4181,18 +4140,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
|
||||||
return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
|
return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Holder class which holds the RegionScanner and nextCallSeq together.
|
|
||||||
*/
|
|
||||||
private static class RegionScannerHolder {
|
|
||||||
private RegionScanner s;
|
|
||||||
private long nextCallSeq = 0L;
|
|
||||||
|
|
||||||
public RegionScannerHolder(RegionScanner s) {
|
|
||||||
this.s = s;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isHealthCheckerConfigured() {
|
private boolean isHealthCheckerConfigured() {
|
||||||
String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
|
String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
|
||||||
return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
|
return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
|
||||||
|
|
|
@ -0,0 +1,394 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holder class which holds the RegionScanner, nextCallSeq, ScanPrefetcher
|
||||||
|
* and information needed for prefetcher/fetcher.
|
||||||
|
*
|
||||||
|
* Originally, this is an inner class of HRegionServer. We moved it out
|
||||||
|
* since HRegionServer is getting bigger and bigger.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class RegionScannerHolder {
|
||||||
|
public final static String MAX_PREFETCHED_RESULT_SIZE_KEY
|
||||||
|
= "hbase.hregionserver.prefetcher.resultsize.max";
|
||||||
|
public final static int MAX_PREFETCHED_RESULT_SIZE_DEFAULT = 256 * 1024 * 1024;
|
||||||
|
|
||||||
|
final static Log LOG = LogFactory.getLog(RegionScannerHolder.class);
|
||||||
|
final static String PREFETCHER_THREAD_PREFIX = "scan-prefetch-";
|
||||||
|
|
||||||
|
private final static AtomicLong globalPrefetchedResultSize = new AtomicLong();
|
||||||
|
|
||||||
|
private ThreadPoolExecutor scanPrefetchThreadPool;
|
||||||
|
private Map<String, RegionScannerHolder> scanners;
|
||||||
|
private long maxScannerResultSize;
|
||||||
|
private Configuration conf;
|
||||||
|
private Leases leases;
|
||||||
|
|
||||||
|
private boolean prefetching = false;
|
||||||
|
private long maxGlobalPrefetchedResultSize;
|
||||||
|
private volatile Future<ScanResult> prefetchScanFuture;
|
||||||
|
private volatile long prefetchedResultSize;
|
||||||
|
private ScanPrefetcher prefetcher;
|
||||||
|
private HRegion region;
|
||||||
|
private int rows;
|
||||||
|
|
||||||
|
RegionScanner scanner;
|
||||||
|
long nextCallSeq = 0L;
|
||||||
|
String scannerName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the total size of all prefetched results not retrieved yet.
|
||||||
|
*/
|
||||||
|
public static long getPrefetchedResultSize() {
|
||||||
|
return globalPrefetchedResultSize.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a RegionScanner holder for a specific region server.
|
||||||
|
*
|
||||||
|
* @param rs the region server the specific region is on
|
||||||
|
* @param s the scanner to be held
|
||||||
|
* @param r the region the scanner is for
|
||||||
|
*/
|
||||||
|
RegionScannerHolder(HRegionServer rs, RegionScanner s, HRegion r) {
|
||||||
|
scanPrefetchThreadPool = rs.scanPrefetchThreadPool;
|
||||||
|
maxScannerResultSize = rs.maxScannerResultSize;
|
||||||
|
prefetcher = new ScanPrefetcher();
|
||||||
|
scanners = rs.scanners;
|
||||||
|
leases = rs.leases;
|
||||||
|
conf = rs.conf;
|
||||||
|
scanner = s;
|
||||||
|
region = r;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPrefetchSubmitted() {
|
||||||
|
return prefetchScanFuture != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionInfo getRegionInfo() {
|
||||||
|
return region.getRegionInfo();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the current prefetched result size
|
||||||
|
*/
|
||||||
|
public long currentPrefetchedResultSize() {
|
||||||
|
return prefetchedResultSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait till current prefetching task complete,
|
||||||
|
* return true if any data retrieved, false otherwise.
|
||||||
|
* Used for unit testing only.
|
||||||
|
*/
|
||||||
|
public boolean waitForPrefetchingDone() {
|
||||||
|
if (prefetchScanFuture != null) {
|
||||||
|
try {
|
||||||
|
ScanResult scanResult = prefetchScanFuture.get();
|
||||||
|
return scanResult != null && scanResult.results != null
|
||||||
|
&& !scanResult.results.isEmpty();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.debug("Got exception in getting scan result", t);
|
||||||
|
if (t instanceof InterruptedException) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop any prefetching task and close the scanner.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void closeScanner() throws IOException {
|
||||||
|
// stop prefetcher if needed.
|
||||||
|
if (prefetchScanFuture != null) {
|
||||||
|
synchronized (prefetcher) {
|
||||||
|
prefetcher.scannerClosing = true;
|
||||||
|
prefetchScanFuture.cancel(false);
|
||||||
|
}
|
||||||
|
prefetchScanFuture = null;
|
||||||
|
if (prefetchedResultSize > 0) {
|
||||||
|
globalPrefetchedResultSize.addAndGet(-prefetchedResultSize);
|
||||||
|
prefetchedResultSize = 0L;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the prefetched scan result, if any. Otherwise,
|
||||||
|
* do a scan synchronously and return the result, which
|
||||||
|
* may take some time. Region scan coprocessor, if specified,
|
||||||
|
* is invoked properly, which may override the scan result.
|
||||||
|
*
|
||||||
|
* @param rows the number of rows to scan, which is preferred
|
||||||
|
* not to change among scanner.next() calls.
|
||||||
|
*
|
||||||
|
* @return scan result, which has the data retrieved from
|
||||||
|
* the scanner, or some IOException if the scan failed.
|
||||||
|
* @throws IOException if failed to retrieve from the scanner.
|
||||||
|
*/
|
||||||
|
public ScanResult getScanResult(final int rows) throws IOException {
|
||||||
|
Preconditions.checkArgument(rows > 0, "Number of rows requested must be positive");
|
||||||
|
ScanResult scanResult = null;
|
||||||
|
this.rows = rows;
|
||||||
|
|
||||||
|
if (prefetchScanFuture == null) {
|
||||||
|
// Need to scan inline if not prefetched
|
||||||
|
scanResult = prefetcher.call();
|
||||||
|
} else {
|
||||||
|
// if we have a prefetched result, then use it
|
||||||
|
try {
|
||||||
|
scanResult = prefetchScanFuture.get();
|
||||||
|
if (scanResult.moreResults) {
|
||||||
|
int prefetchedRows = scanResult.results.size();
|
||||||
|
if (prefetchedRows != 0 && this.rows > prefetchedRows) {
|
||||||
|
// Try to scan more since we haven't prefetched enough
|
||||||
|
this.rows -= prefetchedRows;
|
||||||
|
ScanResult tmp = prefetcher.call();
|
||||||
|
if (tmp.isException) {
|
||||||
|
return tmp; // Keep the prefetched results for later
|
||||||
|
}
|
||||||
|
if (tmp.results != null && !tmp.results.isEmpty()) {
|
||||||
|
// Merge new results to the old result list
|
||||||
|
scanResult.results.addAll(tmp.results);
|
||||||
|
}
|
||||||
|
// Reset rows for next prefetching
|
||||||
|
this.rows = rows;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prefetchScanFuture = null;
|
||||||
|
if (prefetchedResultSize > 0) {
|
||||||
|
globalPrefetchedResultSize.addAndGet(-prefetchedResultSize);
|
||||||
|
prefetchedResultSize = 0L;
|
||||||
|
}
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
throw new IOException("failed to run prefetching task", ee.getCause());
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
IOException iie = new InterruptedIOException("scan was interrupted");
|
||||||
|
iie.initCause(ie);
|
||||||
|
throw iie;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (prefetching
|
||||||
|
&& scanResult.moreResults && !scanResult.results.isEmpty()) {
|
||||||
|
long totalPrefetchedResultSize = globalPrefetchedResultSize.get();
|
||||||
|
if (totalPrefetchedResultSize < maxGlobalPrefetchedResultSize) {
|
||||||
|
// Schedule a background prefetch for the next result
|
||||||
|
// if prefetch is enabled on scans and there are more results
|
||||||
|
prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher);
|
||||||
|
} else if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("One prefetching is skipped for scanner " + scannerName
|
||||||
|
+ " since total prefetched result size " + totalPrefetchedResultSize
|
||||||
|
+ " is more than the maximum configured "
|
||||||
|
+ maxGlobalPrefetchedResultSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return scanResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the rows to prefetch, and start the prefetching task.
|
||||||
|
*/
|
||||||
|
public void enablePrefetching(int caching) {
|
||||||
|
if (caching > 0) {
|
||||||
|
rows = caching;
|
||||||
|
} else {
|
||||||
|
rows = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
|
||||||
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
||||||
|
}
|
||||||
|
maxGlobalPrefetchedResultSize = conf.getLong(
|
||||||
|
MAX_PREFETCHED_RESULT_SIZE_KEY, MAX_PREFETCHED_RESULT_SIZE_DEFAULT);
|
||||||
|
if (globalPrefetchedResultSize.get() < maxGlobalPrefetchedResultSize) {
|
||||||
|
prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher);
|
||||||
|
}
|
||||||
|
prefetching = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This Callable abstracts calling a pre-fetch next. This is called on a
|
||||||
|
* threadpool. It makes a pre-fetch next call with the same parameters as
|
||||||
|
* the incoming next call. Note that the number of rows to return (nbRows)
|
||||||
|
* and/or the memory size for the result is the same as the previous call if
|
||||||
|
* pre-fetching is enabled. If these parameters change dynamically,
|
||||||
|
* they will take effect in the subsequent iteration.
|
||||||
|
*/
|
||||||
|
class ScanPrefetcher implements Callable<ScanResult> {
|
||||||
|
boolean scannerClosing = false;
|
||||||
|
|
||||||
|
public ScanResult call() {
|
||||||
|
ScanResult scanResult = null;
|
||||||
|
Leases.Lease lease = null;
|
||||||
|
try {
|
||||||
|
// Remove lease while its being processed in server; protects against case
|
||||||
|
// where processing of request takes > lease expiration time.
|
||||||
|
lease = leases.removeLease(scannerName);
|
||||||
|
List<Result> results = new ArrayList<Result>(rows);
|
||||||
|
long currentScanResultSize = 0;
|
||||||
|
boolean moreResults = true;
|
||||||
|
|
||||||
|
boolean done = false;
|
||||||
|
long maxResultSize = scanner.getMaxResultSize();
|
||||||
|
if (maxResultSize <= 0) {
|
||||||
|
maxResultSize = maxScannerResultSize;
|
||||||
|
}
|
||||||
|
String threadName = Thread.currentThread().getName();
|
||||||
|
boolean prefetchingThread = threadName.startsWith(PREFETCHER_THREAD_PREFIX);
|
||||||
|
// Call coprocessor. Get region info from scanner.
|
||||||
|
if (region != null && region.getCoprocessorHost() != null) {
|
||||||
|
Boolean bypass = region.getCoprocessorHost().preScannerNext(
|
||||||
|
scanner, results, rows);
|
||||||
|
if (!results.isEmpty()
|
||||||
|
&& (prefetchingThread || maxResultSize < Long.MAX_VALUE)) {
|
||||||
|
for (Result r : results) {
|
||||||
|
for (KeyValue kv : r.raw()) {
|
||||||
|
currentScanResultSize += kv.heapSize();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (bypass != null && bypass.booleanValue()) {
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!done) {
|
||||||
|
List<KeyValue> values = new ArrayList<KeyValue>();
|
||||||
|
MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
|
||||||
|
region.startRegionOperation();
|
||||||
|
try {
|
||||||
|
int i = 0;
|
||||||
|
synchronized(scanner) {
|
||||||
|
for (; i < rows
|
||||||
|
&& currentScanResultSize < maxResultSize; i++) {
|
||||||
|
// Collect values to be returned here
|
||||||
|
boolean moreRows = scanner.nextRaw(values);
|
||||||
|
if (!values.isEmpty()) {
|
||||||
|
if (prefetchingThread || maxResultSize < Long.MAX_VALUE){
|
||||||
|
for (KeyValue kv : values) {
|
||||||
|
currentScanResultSize += kv.heapSize();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results.add(new Result(values));
|
||||||
|
}
|
||||||
|
if (!moreRows) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
values.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
region.readRequestsCount.add(i);
|
||||||
|
} finally {
|
||||||
|
region.closeRegionOperation();
|
||||||
|
}
|
||||||
|
|
||||||
|
// coprocessor postNext hook
|
||||||
|
if (region != null && region.getCoprocessorHost() != null) {
|
||||||
|
region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the scanner's filter - if any - is done with the scan
|
||||||
|
// and wants to tell the client to stop the scan. This is done by passing
|
||||||
|
// a null result, and setting moreResults to false.
|
||||||
|
if (scanner.isFilterDone() && results.isEmpty()) {
|
||||||
|
moreResults = false;
|
||||||
|
results = null;
|
||||||
|
}
|
||||||
|
scanResult = new ScanResult(moreResults, results);
|
||||||
|
if (prefetchingThread && currentScanResultSize > 0) {
|
||||||
|
synchronized (prefetcher) {
|
||||||
|
if (!scannerClosing) {
|
||||||
|
globalPrefetchedResultSize.addAndGet(currentScanResultSize);
|
||||||
|
prefetchedResultSize = currentScanResultSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
// we should queue the exception as the result so that we can return
|
||||||
|
// this when the result is asked for
|
||||||
|
scanResult = new ScanResult(e);
|
||||||
|
} finally {
|
||||||
|
// We're done. On way out re-add the above removed lease.
|
||||||
|
// Adding resets expiration time on lease.
|
||||||
|
if (scanners.containsKey(scannerName)) {
|
||||||
|
if (lease != null) {
|
||||||
|
try {
|
||||||
|
leases.addLease(lease);
|
||||||
|
} catch (LeaseStillHeldException e) {
|
||||||
|
LOG.error("THIS SHOULD NOT HAPPEN", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return scanResult;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class abstracts the results of a single scanner's result. It tracks
|
||||||
|
* the list of Result objects if the pre-fetch next was successful, and
|
||||||
|
* tracks the exception if the next failed.
|
||||||
|
*/
|
||||||
|
class ScanResult {
|
||||||
|
final boolean isException;
|
||||||
|
IOException ioException = null;
|
||||||
|
|
||||||
|
List<Result> results = null;
|
||||||
|
boolean moreResults = false;
|
||||||
|
|
||||||
|
public ScanResult(IOException ioException) {
|
||||||
|
this.ioException = ioException;
|
||||||
|
isException = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ScanResult(boolean moreResults, List<Result> results) {
|
||||||
|
this.moreResults = moreResults;
|
||||||
|
this.results = results;
|
||||||
|
isException = false;
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,31 +17,41 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTestConst;
|
import org.apache.hadoop.hbase.HTestConst;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.MediumTests;
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
|
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
|
||||||
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
|
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionScannerHolder;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.After;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A client-side test, mostly testing scanners with various parameters.
|
* A client-side test, mostly testing scanners with various parameters.
|
||||||
*/
|
*/
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestScannersFromClientSide {
|
public class TestScannersFromClientSide {
|
||||||
private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class);
|
private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class);
|
||||||
|
|
||||||
|
@ -51,6 +61,37 @@ public class TestScannersFromClientSide {
|
||||||
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
|
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
|
||||||
private static byte [] VALUE = Bytes.toBytes("testValue");
|
private static byte [] VALUE = Bytes.toBytes("testValue");
|
||||||
|
|
||||||
|
private final boolean prefetching;
|
||||||
|
private long maxSize;
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static final Collection<Object[]> parameters() {
|
||||||
|
List<Object[]> prefetchings = new ArrayList<Object[]>();
|
||||||
|
prefetchings.add(new Object[] {Long.valueOf(-1)});
|
||||||
|
prefetchings.add(new Object[] {Long.valueOf(0)});
|
||||||
|
prefetchings.add(new Object[] {Long.valueOf(1)});
|
||||||
|
prefetchings.add(new Object[] {Long.valueOf(1024)});
|
||||||
|
return prefetchings;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestScannersFromClientSide(Long maxPrefetchedResultSize) {
|
||||||
|
this.maxSize = maxPrefetchedResultSize.longValue();
|
||||||
|
if (this.maxSize < 0) {
|
||||||
|
this.prefetching = false;
|
||||||
|
} else {
|
||||||
|
this.prefetching = true;
|
||||||
|
if (this.maxSize == 0) {
|
||||||
|
this.maxSize = RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_DEFAULT;
|
||||||
|
} else {
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) {
|
||||||
|
Configuration conf = rst.getRegionServer().getConfiguration();
|
||||||
|
conf.setLong(RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_KEY, maxSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws java.lang.Exception
|
* @throws java.lang.Exception
|
||||||
*/
|
*/
|
||||||
|
@ -65,22 +106,9 @@ public class TestScannersFromClientSide {
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws Exception {
|
public static void tearDownAfterClass() throws Exception {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
long remainingPrefetchedSize = RegionScannerHolder.getPrefetchedResultSize();
|
||||||
|
assertEquals("All prefetched results should be gone",
|
||||||
/**
|
0, remainingPrefetchedSize);
|
||||||
* @throws java.lang.Exception
|
|
||||||
*/
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
// Nothing to do.
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @throws java.lang.Exception
|
|
||||||
*/
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
// Nothing to do.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -89,8 +117,23 @@ public class TestScannersFromClientSide {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
|
public void testScanBatchWithDefaultCaching() throws Exception {
|
||||||
|
batchedScanWithCachingSpecified(-1); // Using default caching which is 100
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test from client side for batch of scan
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
public void testScanBatch() throws Exception {
|
public void testScanBatch() throws Exception {
|
||||||
byte [] TABLE = Bytes.toBytes("testScanBatch");
|
batchedScanWithCachingSpecified(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void batchedScanWithCachingSpecified(int caching) throws Exception {
|
||||||
|
byte [] TABLE = Bytes.toBytes(
|
||||||
|
"testScanBatch-" + prefetching + "_" + maxSize + "_" + caching);
|
||||||
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
|
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
|
||||||
|
|
||||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
|
HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
|
||||||
|
@ -99,7 +142,7 @@ public class TestScannersFromClientSide {
|
||||||
Scan scan;
|
Scan scan;
|
||||||
Delete delete;
|
Delete delete;
|
||||||
Result result;
|
Result result;
|
||||||
ResultScanner scanner;
|
ClientScanner scanner;
|
||||||
boolean toLog = true;
|
boolean toLog = true;
|
||||||
List<KeyValue> kvListExp;
|
List<KeyValue> kvListExp;
|
||||||
|
|
||||||
|
@ -124,8 +167,11 @@ public class TestScannersFromClientSide {
|
||||||
|
|
||||||
// without batch
|
// without batch
|
||||||
scan = new Scan(ROW);
|
scan = new Scan(ROW);
|
||||||
|
scan.setCaching(caching);
|
||||||
scan.setMaxVersions();
|
scan.setMaxVersions();
|
||||||
scanner = ht.getScanner(scan);
|
scan.setPrefetching(prefetching);
|
||||||
|
scanner = (ClientScanner)ht.getScanner(scan);
|
||||||
|
verifyPrefetching(scanner);
|
||||||
|
|
||||||
// c4:4, c5:5, c6:6, c7:7
|
// c4:4, c5:5, c6:6, c7:7
|
||||||
kvListExp = new ArrayList<KeyValue>();
|
kvListExp = new ArrayList<KeyValue>();
|
||||||
|
@ -135,12 +181,16 @@ public class TestScannersFromClientSide {
|
||||||
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
|
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
|
||||||
result = scanner.next();
|
result = scanner.next();
|
||||||
verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
|
verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
|
||||||
|
verifyPrefetching(scanner);
|
||||||
|
|
||||||
// with batch
|
// with batch
|
||||||
scan = new Scan(ROW);
|
scan = new Scan(ROW);
|
||||||
|
scan.setCaching(caching);
|
||||||
scan.setMaxVersions();
|
scan.setMaxVersions();
|
||||||
scan.setBatch(2);
|
scan.setBatch(2);
|
||||||
scanner = ht.getScanner(scan);
|
scan.setPrefetching(prefetching);
|
||||||
|
scanner = (ClientScanner)ht.getScanner(scan);
|
||||||
|
verifyPrefetching(scanner);
|
||||||
|
|
||||||
// First batch: c4:4, c5:5
|
// First batch: c4:4, c5:5
|
||||||
kvListExp = new ArrayList<KeyValue>();
|
kvListExp = new ArrayList<KeyValue>();
|
||||||
|
@ -148,6 +198,7 @@ public class TestScannersFromClientSide {
|
||||||
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
|
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE));
|
||||||
result = scanner.next();
|
result = scanner.next();
|
||||||
verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
|
verifyResult(result, kvListExp, toLog, "Testing first batch of scan");
|
||||||
|
verifyPrefetching(scanner);
|
||||||
|
|
||||||
// Second batch: c6:6, c7:7
|
// Second batch: c6:6, c7:7
|
||||||
kvListExp = new ArrayList<KeyValue>();
|
kvListExp = new ArrayList<KeyValue>();
|
||||||
|
@ -155,7 +206,7 @@ public class TestScannersFromClientSide {
|
||||||
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
|
kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE));
|
||||||
result = scanner.next();
|
result = scanner.next();
|
||||||
verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
|
verifyResult(result, kvListExp, toLog, "Testing second batch of scan");
|
||||||
|
verifyPrefetching(scanner);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -165,7 +216,7 @@ public class TestScannersFromClientSide {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testGetMaxResults() throws Exception {
|
public void testGetMaxResults() throws Exception {
|
||||||
byte [] TABLE = Bytes.toBytes("testGetMaxResults");
|
byte [] TABLE = Bytes.toBytes("testGetMaxResults-" + prefetching + "_" + maxSize);
|
||||||
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
||||||
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
|
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
|
||||||
|
|
||||||
|
@ -285,7 +336,7 @@ public class TestScannersFromClientSide {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testScanMaxResults() throws Exception {
|
public void testScanMaxResults() throws Exception {
|
||||||
byte [] TABLE = Bytes.toBytes("testScanLimit");
|
byte [] TABLE = Bytes.toBytes("testScanLimit-" + prefetching + "_" + maxSize);
|
||||||
byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
|
byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
|
||||||
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
||||||
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
|
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
|
||||||
|
@ -315,17 +366,19 @@ public class TestScannersFromClientSide {
|
||||||
}
|
}
|
||||||
|
|
||||||
scan = new Scan();
|
scan = new Scan();
|
||||||
|
scan.setCaching(1);
|
||||||
|
scan.setPrefetching(prefetching);
|
||||||
scan.setMaxResultsPerColumnFamily(4);
|
scan.setMaxResultsPerColumnFamily(4);
|
||||||
ResultScanner scanner = ht.getScanner(scan);
|
ClientScanner scanner = (ClientScanner)ht.getScanner(scan);
|
||||||
kvListScan = new ArrayList<KeyValue>();
|
kvListScan = new ArrayList<KeyValue>();
|
||||||
while ((result = scanner.next()) != null) {
|
while ((result = scanner.next()) != null) {
|
||||||
|
verifyPrefetching(scanner);
|
||||||
for (KeyValue kv : result.list()) {
|
for (KeyValue kv : result.list()) {
|
||||||
kvListScan.add(kv);
|
kvListScan.add(kv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result = new Result(kvListScan);
|
result = new Result(kvListScan);
|
||||||
verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
|
verifyResult(result, kvListExp, toLog, "Testing scan with maxResults");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -335,7 +388,7 @@ public class TestScannersFromClientSide {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testGetRowOffset() throws Exception {
|
public void testGetRowOffset() throws Exception {
|
||||||
byte [] TABLE = Bytes.toBytes("testGetRowOffset");
|
byte [] TABLE = Bytes.toBytes("testGetRowOffset-" + prefetching + "_" + maxSize);
|
||||||
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
|
||||||
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
|
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
|
||||||
|
|
||||||
|
@ -421,7 +474,47 @@ public class TestScannersFromClientSide {
|
||||||
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
|
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE));
|
||||||
verifyResult(result, kvListExp, toLog,
|
verifyResult(result, kvListExp, toLog,
|
||||||
"Testing offset + multiple CFs + maxResults");
|
"Testing offset + multiple CFs + maxResults");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For testing only, find a region scanner holder for a scan.
|
||||||
|
*/
|
||||||
|
RegionScannerHolder findRegionScannerHolder(ClientScanner scanner) {
|
||||||
|
long scannerId = scanner.currentScannerId();
|
||||||
|
if (scannerId == -1L) return null;
|
||||||
|
|
||||||
|
HRegionInfo expectedRegion = scanner.currentRegionInfo();
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) {
|
||||||
|
RegionScannerHolder rsh = rst.getRegionServer().getScannerHolder(scannerId);
|
||||||
|
if (rsh != null && rsh.getRegionInfo().equals(expectedRegion)) {
|
||||||
|
return rsh;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
void verifyPrefetching(ClientScanner scanner) throws IOException {
|
||||||
|
long scannerId = scanner.currentScannerId();
|
||||||
|
if (scannerId == -1L) return; // scanner is already closed
|
||||||
|
RegionScannerHolder rsh = findRegionScannerHolder(scanner);
|
||||||
|
assertNotNull("We should be able to find the scanner", rsh);
|
||||||
|
boolean isPrefetchSubmitted = rsh.isPrefetchSubmitted();
|
||||||
|
if (prefetching && (RegionScannerHolder.getPrefetchedResultSize() < this.maxSize)) {
|
||||||
|
assertTrue("Prefetching should be submitted or no more result",
|
||||||
|
isPrefetchSubmitted || scanner.next() == null);
|
||||||
|
} else if (isPrefetchSubmitted) {
|
||||||
|
// Prefetch submitted, it must be because prefetching is enabled,
|
||||||
|
// and there was still room before it's scheduled
|
||||||
|
long sizeBefore = RegionScannerHolder.getPrefetchedResultSize()
|
||||||
|
- rsh.currentPrefetchedResultSize();
|
||||||
|
assertTrue("There should have room before prefetching is submitted",
|
||||||
|
prefetching && sizeBefore < this.maxSize);
|
||||||
|
}
|
||||||
|
if (isPrefetchSubmitted && rsh.waitForPrefetchingDone()) {
|
||||||
|
assertTrue("Prefetched result size should not be 0",
|
||||||
|
rsh.currentPrefetchedResultSize() > 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void verifyResult(Result result, List<KeyValue> expKvList, boolean toLog,
|
static void verifyResult(Result result, List<KeyValue> expKvList, boolean toLog,
|
||||||
|
@ -449,6 +542,4 @@ public class TestScannersFromClientSide {
|
||||||
|
|
||||||
assertEquals(expKvList.size(), result.size());
|
assertEquals(expKvList.size(), result.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcesso
|
||||||
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
|
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
@ -130,6 +129,7 @@ public class TestRowProcessorEndpoint {
|
||||||
// ignore table not found
|
// ignore table not found
|
||||||
}
|
}
|
||||||
table = util.createTable(TABLE, FAM);
|
table = util.createTable(TABLE, FAM);
|
||||||
|
table.setAutoFlush(false);
|
||||||
{
|
{
|
||||||
Put put = new Put(ROW);
|
Put put = new Put(ROW);
|
||||||
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
||||||
|
@ -143,6 +143,8 @@ public class TestRowProcessorEndpoint {
|
||||||
put.add(FAM, F, G);
|
put.add(FAM, F, G);
|
||||||
table.put(put);
|
table.put(put);
|
||||||
row2Size = put.size();
|
row2Size = put.size();
|
||||||
|
table.clearRegionCache();
|
||||||
|
table.flushCommits();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -284,6 +284,7 @@ public class TestProtobufUtil {
|
||||||
scanBuilder = ClientProtos.Scan.newBuilder(proto);
|
scanBuilder = ClientProtos.Scan.newBuilder(proto);
|
||||||
scanBuilder.setMaxVersions(1);
|
scanBuilder.setMaxVersions(1);
|
||||||
scanBuilder.setCacheBlocks(true);
|
scanBuilder.setCacheBlocks(true);
|
||||||
|
scanBuilder.setPrefetching(true);
|
||||||
|
|
||||||
Scan scan = ProtobufUtil.toScan(proto);
|
Scan scan = ProtobufUtil.toScan(proto);
|
||||||
assertEquals(scanBuilder.build(), ProtobufUtil.toScan(scan));
|
assertEquals(scanBuilder.build(), ProtobufUtil.toScan(scan));
|
||||||
|
|
|
@ -335,6 +335,7 @@ public class TestRegionServerMetrics {
|
||||||
Scan s = new Scan();
|
Scan s = new Scan();
|
||||||
s.setBatch(1);
|
s.setBatch(1);
|
||||||
s.setCaching(1);
|
s.setCaching(1);
|
||||||
|
s.setPrefetching(false);
|
||||||
ResultScanner resultScanners = t.getScanner(s);
|
ResultScanner resultScanners = t.getScanner(s);
|
||||||
|
|
||||||
for (int nextCount = 0; nextCount < 30; nextCount++) {
|
for (int nextCount = 0; nextCount < 30; nextCount++) {
|
||||||
|
|
Loading…
Reference in New Issue