From bf4a3af4a4fcc89a6b8b5df813318bdf0564d8cc Mon Sep 17 00:00:00 2001 From: jxiang Date: Fri, 24 May 2013 22:51:22 +0000 Subject: [PATCH] HBASE-8420 Port HBASE-6874 Implement prefetching for scanners from 0.89-fb git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1486246 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/client/ClientScanner.java | 48 ++- .../org/apache/hadoop/hbase/client/Scan.java | 20 + .../hadoop/hbase/client/ScannerCallable.java | 3 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 10 + .../org/apache/hadoop/hbase/util/Threads.java | 41 +- .../protobuf/generated/ClientProtos.java | 215 ++++++++-- hbase-protocol/src/main/protobuf/Client.proto | 2 + .../hadoop/hbase/regionserver/HRegion.java | 2 +- .../hbase/regionserver/HRegionServer.java | 221 ++++------ .../regionserver/RegionScannerHolder.java | 394 ++++++++++++++++++ .../client/TestScannersFromClientSide.java | 151 +++++-- .../coprocessor/TestRowProcessorEndpoint.java | 4 +- .../hbase/protobuf/TestProtobufUtil.java | 1 + .../regionserver/TestRegionServerMetrics.java | 1 + 14 files changed, 880 insertions(+), 233 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 841857d1917..59047330f63 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -128,7 +128,7 @@ public class ClientScanner extends AbstractClientScanner { } // initialize the scanner - nextScanner(this.caching, false); + nextScanner(false); } protected HConnection getConnection() { @@ -169,10 +169,9 @@ public class ClientScanner extends AbstractClientScanner { * scanner at the scan.getStartRow(). We will go no further, just tidy * up outstanding scanners, if currentRegion != null and * done is true. - * @param nbRows * @param done Server-side says we're done scanning. */ - private boolean nextScanner(int nbRows, final boolean done) + private boolean nextScanner(final boolean done) throws IOException { // Close the previous scanner if it's open if (this.callable != null) { @@ -210,7 +209,7 @@ public class ClientScanner extends AbstractClientScanner { Bytes.toStringBinary(localStartKey) + "'"); } try { - callable = getScannerCallable(localStartKey, nbRows); + callable = getScannerCallable(localStartKey); // Open a scanner on the region server starting at the // beginning of the region callable.withRetries(); @@ -225,12 +224,11 @@ public class ClientScanner extends AbstractClientScanner { return true; } - protected ScannerCallable getScannerCallable(byte [] localStartKey, - int nbRows) { + protected ScannerCallable getScannerCallable(byte [] localStartKey) { scan.setStartRow(localStartKey); ScannerCallable s = new ScannerCallable(getConnection(), getTableName(), scan, this.scanMetrics); - s.setCaching(nbRows); + s.setCaching(this.caching); return s; } @@ -262,27 +260,21 @@ public class ClientScanner extends AbstractClientScanner { Result [] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; - // We need to reset it if it's a new callable that was created - // with a countdown in nextScanner - callable.setCaching(this.caching); + // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. boolean skipFirst = false; boolean retryAfterOutOfOrderException = true; do { try { - if (skipFirst) { - // Skip only the first row (which was the last row of the last - // already-processed batch). - callable.setCaching(1); - values = callable.withRetries(); - callable.setCaching(this.caching); - skipFirst = false; - } // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. values = callable.withRetries(); + if (skipFirst && values != null && values.length == 1) { + skipFirst = false; // Already skipped, unset it before scanning again + values = callable.withRetries(); + } retryAfterOutOfOrderException = true; } catch (DoNotRetryIOException e) { // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us @@ -344,7 +336,15 @@ public class ClientScanner extends AbstractClientScanner { } lastNext = currentTime; if (values != null && values.length > 0) { - for (Result rs : values) { + int i = 0; + if (skipFirst) { + skipFirst = false; + // We will cache one row less, which is fine + countdown--; + i = 1; + } + for (; i < values.length; i++) { + Result rs = values[i]; cache.add(rs); for (KeyValue kv : rs.raw()) { remainingResultSize -= kv.heapSize(); @@ -354,7 +354,7 @@ public class ClientScanner extends AbstractClientScanner { } } // Values == null means server-side filter has determined we must STOP - } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); + } while (remainingResultSize > 0 && countdown > 0 && nextScanner(values == null)); } if (cache.size() > 0) { @@ -411,4 +411,12 @@ public class ClientScanner extends AbstractClientScanner { } closed = true; } + + long currentScannerId() { + return (callable == null) ? -1L : callable.scannerId; + } + + HRegionInfo currentRegionInfo() { + return currentRegion; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index af4561627f3..de2e0cc4afd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -116,6 +116,8 @@ public class Scan extends OperationWithAttributes { new TreeMap>(Bytes.BYTES_COMPARATOR); private Boolean loadColumnFamiliesOnDemand = null; + private boolean prefetching = true; + /** * Create a Scan operation across all rows. */ @@ -168,6 +170,7 @@ public class Scan extends OperationWithAttributes { getScan = scan.isGetScan(); filter = scan.getFilter(); // clone? loadColumnFamiliesOnDemand = scan.getLoadColumnFamiliesOnDemandValue(); + prefetching = scan.getPrefetching(); TimeRange ctr = scan.getTimeRange(); tr = new TimeRange(ctr.getMin(), ctr.getMax()); Map> fams = scan.getFamilyMap(); @@ -201,6 +204,7 @@ public class Scan extends OperationWithAttributes { this.storeOffset = get.getRowOffsetPerColumnFamily(); this.tr = get.getTimeRange(); this.familyMap = get.getFamilyMap(); + this.prefetching = false; this.getScan = true; } @@ -364,6 +368,21 @@ public class Scan extends OperationWithAttributes { } /** + * Set if pre-fetching is enabled. If enabled, the region + * server will try to read the next scan result ahead of time. This + * improves scan performance if we are doing large scans. + * + * @param enablePrefetching if pre-fetching is enabled or not + */ + public void setPrefetching(boolean enablePrefetching) { + this.prefetching = enablePrefetching; + } + + public boolean getPrefetching() { + return prefetching; + } + +/** * @return the maximum result size in bytes. See {@link #setMaxResultSize(long)} */ public long getMaxResultSize() { @@ -613,6 +632,7 @@ public class Scan extends OperationWithAttributes { map.put("maxResultSize", this.maxResultSize); map.put("cacheBlocks", this.cacheBlocks); map.put("loadColumnFamiliesOnDemand", this.loadColumnFamiliesOnDemand); + map.put("prefetching", this.prefetching); List timeRange = new ArrayList(); timeRange.add(this.tr.getMin()); timeRange.add(this.tr.getMax()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 2fa4b306168..bdcce5e309a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -60,7 +60,7 @@ public class ScannerCallable extends ServerCallable { public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; public static final Log LOG = LogFactory.getLog(ScannerCallable.class); - private long scannerId = -1L; + long scannerId = -1L; private boolean instantiated = false; private boolean closed = false; private Scan scan; @@ -130,6 +130,7 @@ public class ScannerCallable extends ServerCallable { /** * @see java.util.concurrent.Callable#call() */ + @SuppressWarnings("deprecation") public Result [] call() throws IOException { if (closed) { if (scannerId != -1) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index e8bf29d9b92..3eb5f946d4d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -709,6 +709,9 @@ public final class ProtobufUtil { if (scan.getBatch() > 0) { scanBuilder.setBatchSize(scan.getBatch()); } + if (scan.getCaching() > 0) { + scanBuilder.setCachingCount(scan.getCaching()); + } if (scan.getMaxResultSize() > 0) { scanBuilder.setMaxResultSize(scan.getMaxResultSize()); } @@ -716,6 +719,7 @@ public final class ProtobufUtil { if (loadColumnFamiliesOnDemand != null) { scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand.booleanValue()); } + scanBuilder.setPrefetching(scan.getPrefetching()); scanBuilder.setMaxVersions(scan.getMaxVersions()); TimeRange timeRange = scan.getTimeRange(); if (!timeRange.isAllTime()) { @@ -793,6 +797,9 @@ public final class ProtobufUtil { if (proto.hasMaxVersions()) { scan.setMaxVersions(proto.getMaxVersions()); } + if (proto.hasPrefetching()) { + scan.setPrefetching(proto.getPrefetching()); + } if (proto.hasStoreLimit()) { scan.setMaxResultsPerColumnFamily(proto.getStoreLimit()); } @@ -821,6 +828,9 @@ public final class ProtobufUtil { if (proto.hasBatchSize()) { scan.setBatch(proto.getBatchSize()); } + if (proto.hasCachingCount()) { + scan.setCaching(proto.getCachingCount()); + } if (proto.hasMaxResultSize()) { scan.setMaxResultSize(proto.getMaxResultSize()); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 236cb06ca3c..d27dce2e244 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.util; import java.io.PrintWriter; import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -180,8 +183,42 @@ public class Threads { boundedCachedThreadPool.allowCoreThreadTimeOut(true); return boundedCachedThreadPool; } - - + + /** + * Creates a ThreadPoolExecutor which has a bound on the number of tasks that can be + * submitted to it, determined by the blockingLimit parameter. Excess tasks + * submitted will block on the calling thread till space frees up. + * + * @param blockingLimit max number of tasks that can be submitted + * @param timeout time value after which unused threads are killed + * @param unit time unit for killing unused threads + * @param threadFactory thread factory to use to spawn threads + * @return the ThreadPoolExecutor + */ + public static ThreadPoolExecutor getBlockingThreadPool( + int blockingLimit, long timeout, TimeUnit unit, + ThreadFactory threadFactory) { + ThreadPoolExecutor blockingThreadPool = + new ThreadPoolExecutor( + 1, blockingLimit, timeout, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // The submitting thread will block until the thread pool frees up. + executor.getQueue().put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException( + "Failed to requeue the rejected request because of ", e); + } + } + }); + blockingThreadPool.allowCoreThreadTimeOut(true); + return blockingThreadPool; + } + /** * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely, * with a common prefix. diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 1686ca14517..6eb798e77d9 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -10622,6 +10622,14 @@ public final class ClientProtos { // optional bool loadColumnFamiliesOnDemand = 13; boolean hasLoadColumnFamiliesOnDemand(); boolean getLoadColumnFamiliesOnDemand(); + + // optional uint32 cachingCount = 14; + boolean hasCachingCount(); + int getCachingCount(); + + // optional bool prefetching = 15; + boolean hasPrefetching(); + boolean getPrefetching(); } public static final class Scan extends com.google.protobuf.GeneratedMessage @@ -10810,6 +10818,26 @@ public final class ClientProtos { return loadColumnFamiliesOnDemand_; } + // optional uint32 cachingCount = 14; + public static final int CACHINGCOUNT_FIELD_NUMBER = 14; + private int cachingCount_; + public boolean hasCachingCount() { + return ((bitField0_ & 0x00000800) == 0x00000800); + } + public int getCachingCount() { + return cachingCount_; + } + + // optional bool prefetching = 15; + public static final int PREFETCHING_FIELD_NUMBER = 15; + private boolean prefetching_; + public boolean hasPrefetching() { + return ((bitField0_ & 0x00001000) == 0x00001000); + } + public boolean getPrefetching() { + return prefetching_; + } + private void initFields() { column_ = java.util.Collections.emptyList(); attribute_ = java.util.Collections.emptyList(); @@ -10824,6 +10852,8 @@ public final class ClientProtos { storeLimit_ = 0; storeOffset_ = 0; loadColumnFamiliesOnDemand_ = false; + cachingCount_ = 0; + prefetching_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -10894,6 +10924,12 @@ public final class ClientProtos { if (((bitField0_ & 0x00000400) == 0x00000400)) { output.writeBool(13, loadColumnFamiliesOnDemand_); } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + output.writeUInt32(14, cachingCount_); + } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + output.writeBool(15, prefetching_); + } getUnknownFields().writeTo(output); } @@ -10955,6 +10991,14 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(13, loadColumnFamiliesOnDemand_); } + if (((bitField0_ & 0x00000800) == 0x00000800)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(14, cachingCount_); + } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(15, prefetching_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -11037,6 +11081,16 @@ public final class ClientProtos { result = result && (getLoadColumnFamiliesOnDemand() == other.getLoadColumnFamiliesOnDemand()); } + result = result && (hasCachingCount() == other.hasCachingCount()); + if (hasCachingCount()) { + result = result && (getCachingCount() + == other.getCachingCount()); + } + result = result && (hasPrefetching() == other.hasPrefetching()); + if (hasPrefetching()) { + result = result && (getPrefetching() + == other.getPrefetching()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -11098,6 +11152,14 @@ public final class ClientProtos { hash = (37 * hash) + LOADCOLUMNFAMILIESONDEMAND_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getLoadColumnFamiliesOnDemand()); } + if (hasCachingCount()) { + hash = (37 * hash) + CACHINGCOUNT_FIELD_NUMBER; + hash = (53 * hash) + getCachingCount(); + } + if (hasPrefetching()) { + hash = (37 * hash) + PREFETCHING_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getPrefetching()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -11260,6 +11322,10 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000800); loadColumnFamiliesOnDemand_ = false; bitField0_ = (bitField0_ & ~0x00001000); + cachingCount_ = 0; + bitField0_ = (bitField0_ & ~0x00002000); + prefetching_ = false; + bitField0_ = (bitField0_ & ~0x00004000); return this; } @@ -11368,6 +11434,14 @@ public final class ClientProtos { to_bitField0_ |= 0x00000400; } result.loadColumnFamiliesOnDemand_ = loadColumnFamiliesOnDemand_; + if (((from_bitField0_ & 0x00002000) == 0x00002000)) { + to_bitField0_ |= 0x00000800; + } + result.cachingCount_ = cachingCount_; + if (((from_bitField0_ & 0x00004000) == 0x00004000)) { + to_bitField0_ |= 0x00001000; + } + result.prefetching_ = prefetching_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -11469,6 +11543,12 @@ public final class ClientProtos { if (other.hasLoadColumnFamiliesOnDemand()) { setLoadColumnFamiliesOnDemand(other.getLoadColumnFamiliesOnDemand()); } + if (other.hasCachingCount()) { + setCachingCount(other.getCachingCount()); + } + if (other.hasPrefetching()) { + setPrefetching(other.getPrefetching()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -11593,6 +11673,16 @@ public final class ClientProtos { loadColumnFamiliesOnDemand_ = input.readBool(); break; } + case 112: { + bitField0_ |= 0x00002000; + cachingCount_ = input.readUInt32(); + break; + } + case 120: { + bitField0_ |= 0x00004000; + prefetching_ = input.readBool(); + break; + } } } } @@ -12346,6 +12436,48 @@ public final class ClientProtos { return this; } + // optional uint32 cachingCount = 14; + private int cachingCount_ ; + public boolean hasCachingCount() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + public int getCachingCount() { + return cachingCount_; + } + public Builder setCachingCount(int value) { + bitField0_ |= 0x00002000; + cachingCount_ = value; + onChanged(); + return this; + } + public Builder clearCachingCount() { + bitField0_ = (bitField0_ & ~0x00002000); + cachingCount_ = 0; + onChanged(); + return this; + } + + // optional bool prefetching = 15; + private boolean prefetching_ ; + public boolean hasPrefetching() { + return ((bitField0_ & 0x00004000) == 0x00004000); + } + public boolean getPrefetching() { + return prefetching_; + } + public Builder setPrefetching(boolean value) { + bitField0_ |= 0x00004000; + prefetching_ = value; + onChanged(); + return this; + } + public Builder clearPrefetching() { + bitField0_ = (bitField0_ & ~0x00004000); + prefetching_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:Scan) } @@ -21467,7 +21599,7 @@ public final class ClientProtos { "ation\030\002 \002(\0132\016.MutationProto\022\035\n\tcondition" + "\030\003 \001(\0132\n.Condition\"<\n\016MutateResponse\022\027\n\006" + "result\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010" + - "\"\307\002\n\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tat" + + "\"\362\002\n\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tat" + "tribute\030\002 \003(\0132\016.NameBytesPair\022\020\n\010startRo" + "w\030\003 \001(\014\022\017\n\007stopRow\030\004 \001(\014\022\027\n\006filter\030\005 \001(\013" + "2\007.Filter\022\035\n\ttimeRange\030\006 \001(\0132\n.TimeRange" + @@ -21475,45 +21607,46 @@ public final class ClientProtos { "\010 \001(\010:\004true\022\021\n\tbatchSize\030\t \001(\r\022\025\n\rmaxRes" + "ultSize\030\n \001(\004\022\022\n\nstoreLimit\030\013 \001(\r\022\023\n\013sto" + "reOffset\030\014 \001(\r\022\"\n\032loadColumnFamiliesOnDe" + - "mand\030\r \001(\010\"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001" + - "(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Sca" + - "n\022\021\n\tscannerId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001" + - "(\r\022\024\n\014closeScanner\030\005 \001(\010\022\023\n\013nextCallSeq\030" + - "\006 \001(\004\"l\n\014ScanResponse\022\'\n\016resultCellMeta\030" + - "\001 \001(\0132\017.ResultCellMeta\022\021\n\tscannerId\030\002 \001(" + - "\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"%\n\016R", - "esultCellMeta\022\023\n\013cellsLength\030\001 \003(\r\"\260\001\n\024B" + - "ulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020.Re" + - "gionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .Bulk" + - "LoadHFileRequest.FamilyPath\022\024\n\014assignSeq" + - "Num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022" + - "\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016" + - "\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceCall" + - "\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n\nme" + - "thodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Copro" + - "cessorServiceRequest\022 \n\006region\030\001 \002(\0132\020.R", - "egionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coprocess" + - "orServiceCall\"]\n\032CoprocessorServiceRespo" + - "nse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\035\n" + - "\005value\030\002 \002(\0132\016.NameBytesPair\"B\n\013MultiAct" + - "ion\022 \n\010mutation\030\001 \001(\0132\016.MutationProto\022\021\n" + - "\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005valu" + - "e\030\001 \001(\0132\007.Result\022!\n\texception\030\002 \001(\0132\016.Na" + - "meBytesPair\"^\n\014MultiRequest\022 \n\006region\030\001 " + - "\002(\0132\020.RegionSpecifier\022\034\n\006action\030\002 \003(\0132\014." + - "MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResp", - "onse\022\035\n\006result\030\001 \003(\0132\r.ActionResult2\342\002\n\r" + - "ClientService\022 \n\003get\022\013.GetRequest\032\014.GetR" + - "esponse\022/\n\010multiGet\022\020.MultiGetRequest\032\021." + - "MultiGetResponse\022)\n\006mutate\022\016.MutateReque" + - "st\032\017.MutateResponse\022#\n\004scan\022\014.ScanReques" + - "t\032\r.ScanResponse\022>\n\rbulkLoadHFile\022\025.Bulk" + - "LoadHFileRequest\032\026.BulkLoadHFileResponse" + - "\022F\n\013execService\022\032.CoprocessorServiceRequ" + - "est\032\033.CoprocessorServiceResponse\022&\n\005mult" + - "i\022\r.MultiRequest\032\016.MultiResponseBB\n*org.", - "apache.hadoop.hbase.protobuf.generatedB\014" + - "ClientProtosH\001\210\001\001\240\001\001" + "mand\030\r \001(\010\022\024\n\014cachingCount\030\016 \001(\r\022\023\n\013pref" + + "etching\030\017 \001(\010\"\230\001\n\013ScanRequest\022 \n\006region\030" + + "\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(\0132\005." + + "Scan\022\021\n\tscannerId\030\003 \001(\004\022\024\n\014numberOfRows\030" + + "\004 \001(\r\022\024\n\014closeScanner\030\005 \001(\010\022\023\n\013nextCallS" + + "eq\030\006 \001(\004\"l\n\014ScanResponse\022\'\n\016resultCellMe" + + "ta\030\001 \001(\0132\017.ResultCellMeta\022\021\n\tscannerId\030\002", + " \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"%" + + "\n\016ResultCellMeta\022\023\n\013cellsLength\030\001 \003(\r\"\260\001" + + "\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020" + + ".RegionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .B" + + "ulkLoadHFileRequest.FamilyPath\022\024\n\014assign" + + "SeqNum\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002" + + "(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" + + "e\022\016\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceC" + + "all\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n" + + "\nmethodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Co", + "processorServiceRequest\022 \n\006region\030\001 \002(\0132" + + "\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coproc" + + "essorServiceCall\"]\n\032CoprocessorServiceRe" + + "sponse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" + + "\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"B\n\013Multi" + + "Action\022 \n\010mutation\030\001 \001(\0132\016.MutationProto" + + "\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005v" + + "alue\030\001 \001(\0132\007.Result\022!\n\texception\030\002 \001(\0132\016" + + ".NameBytesPair\"^\n\014MultiRequest\022 \n\006region" + + "\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action\030\002 \003(\013", + "2\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiR" + + "esponse\022\035\n\006result\030\001 \003(\0132\r.ActionResult2\342" + + "\002\n\rClientService\022 \n\003get\022\013.GetRequest\032\014.G" + + "etResponse\022/\n\010multiGet\022\020.MultiGetRequest" + + "\032\021.MultiGetResponse\022)\n\006mutate\022\016.MutateRe" + + "quest\032\017.MutateResponse\022#\n\004scan\022\014.ScanReq" + + "uest\032\r.ScanResponse\022>\n\rbulkLoadHFile\022\025.B" + + "ulkLoadHFileRequest\032\026.BulkLoadHFileRespo" + + "nse\022F\n\013execService\022\032.CoprocessorServiceR" + + "equest\032\033.CoprocessorServiceResponse\022&\n\005m", + "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" + + "rg.apache.hadoop.hbase.protobuf.generate" + + "dB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -21629,7 +21762,7 @@ public final class ClientProtos { internal_static_Scan_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_Scan_descriptor, - new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", }, + new java.lang.String[] { "Column", "Attribute", "StartRow", "StopRow", "Filter", "TimeRange", "MaxVersions", "CacheBlocks", "BatchSize", "MaxResultSize", "StoreLimit", "StoreOffset", "LoadColumnFamiliesOnDemand", "CachingCount", "Prefetching", }, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.Builder.class); internal_static_ScanRequest_descriptor = diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 094894efa86..283c280f6cd 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -233,6 +233,8 @@ message Scan { optional uint32 storeLimit = 11; optional uint32 storeOffset = 12; optional bool loadColumnFamiliesOnDemand = 13; /* DO NOT add defaults to loadColumnFamiliesOnDemand. */ + optional uint32 cachingCount = 14; + optional bool prefetching = 15; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 902f352a12e..bac6f14839b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -185,6 +185,7 @@ import com.google.protobuf.Service; * defines the keyspace for this HRegion. */ @InterfaceAudience.Private +@SuppressWarnings("deprecation") public class HRegion implements HeapSize { // , Writable{ public static final Log LOG = LogFactory.getLog(HRegion.class); @@ -3603,7 +3604,6 @@ public class HRegion implements HeapSize { // , Writable{ return returnResult; } - private void populateFromJoinedHeap(List results, int limit) throws IOException { assert joinedContinuationRow != null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 08881dd2825..fc1a4b60053 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -45,6 +45,8 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.ObjectName; @@ -59,12 +61,12 @@ import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HealthCheckChore; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; @@ -164,8 +166,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultCellMeta; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; @@ -479,6 +481,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // Table level lock manager for locking for region operations private TableLockManager tableLockManager; + /** + * Threadpool for doing scanner prefetches + */ + protected ThreadPoolExecutor scanPrefetchThreadPool; + /** * Starts a HRegionServer at the default location * @@ -616,14 +623,18 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } RegionScanner getScanner(long scannerId) { - String scannerIdString = Long.toString(scannerId); - RegionScannerHolder scannerHolder = scanners.get(scannerIdString); + RegionScannerHolder scannerHolder = getScannerHolder(scannerId); if (scannerHolder != null) { - return scannerHolder.s; + return scannerHolder.scanner; } return null; } + public RegionScannerHolder getScannerHolder(long scannerId) { + String scannerIdString = Long.toString(scannerId); + return scanners.get(scannerIdString); + } + /** * All initialization needed before we go register with Master. * @@ -837,6 +848,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (this.thriftServer != null) this.thriftServer.shutdown(); this.leases.closeAfterLeasesExpire(); this.rpcServer.stop(); + + if (scanPrefetchThreadPool != null) { + // shutdown the prefetch threads + scanPrefetchThreadPool.shutdownNow(); + } if (this.splitLogWorker != null) { splitLogWorker.stop(); } @@ -1107,7 +1123,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // exception next time they come in. for (Map.Entry e : this.scanners.entrySet()) { try { - e.getValue().s.close(); + e.getValue().closeScanner(); } catch (IOException ioe) { LOG.warn("Closing scanner " + e.getKey(), ioe); } @@ -1537,6 +1553,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa this.replicationSinkHandler.startReplicationService(); } + // start the scanner prefetch threadpool + int numHandlers = conf.getInt("hbase.regionserver.prefetcher.threads.max", + conf.getInt("hbase.regionserver.handler.count", 10) + + conf.getInt("hbase.regionserver.metahandler.count", 10)); + scanPrefetchThreadPool = + Threads.getBlockingThreadPool(numHandlers, 60, TimeUnit.SECONDS, + new DaemonThreadFactory(RegionScannerHolder.PREFETCHER_THREAD_PREFIX)); + // Start Server. This service is like leases in that it internally runs // a thread. this.rpcServer.start(); @@ -1831,8 +1855,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa continue; } - InetSocketAddress isa = - new InetSocketAddress(sn.getHostname(), sn.getPort()); + new InetSocketAddress(sn.getHostname(), sn.getPort()); LOG.info("Attempting connect to Master server at " + this.masterAddressManager.getMasterAddress()); @@ -2325,7 +2348,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa public void leaseExpired() { RegionScannerHolder rsh = scanners.remove(this.scannerName); if (rsh != null) { - RegionScanner s = rsh.s; + RegionScanner s = rsh.scanner; LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { @@ -2334,7 +2357,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa region.getCoprocessorHost().preScannerClose(s); } - s.close(); + rsh.closeScanner(); if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(s); } @@ -2638,20 +2661,22 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa return this.fsOk; } - protected long addScanner(RegionScanner s) throws LeaseStillHeldException { + protected RegionScannerHolder addScanner( + RegionScanner s, HRegion r) throws LeaseStillHeldException { + RegionScannerHolder holder = new RegionScannerHolder(this, s, r); + String scannerName = null; long scannerId = -1; while (true) { - scannerId = rand.nextLong(); - if (scannerId == -1) continue; - String scannerName = String.valueOf(scannerId); - RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s)); + scannerId = nextLong(); + scannerName = String.valueOf(scannerId); + RegionScannerHolder existing = scanners.putIfAbsent(scannerName, holder); if (existing == null) { + holder.scannerName = scannerName; this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, - new ScannerListener(scannerName)); - break; + new ScannerListener(scannerName)); + return holder; } } - return scannerId; } /** @@ -2913,7 +2938,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa @Override public ScanResponse scan(final RpcController controller, final ScanRequest request) throws ServiceException { - Leases.Lease lease = null; String scannerName = null; try { if (!request.hasScannerId() && !request.hasScan()) { @@ -2963,7 +2987,10 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa throw new UnknownScannerException( "Name: " + scannerName + ", already closed?"); } - scanner = rsh.s; + scanner = rsh.scanner; + // Use the region found in the online region list, + // not that one in the RegionScannerHolder. So that we can + // make sure the region is still open in this region server. region = getRegion(scanner.getRegionInfo().getRegionName()); } else { region = getRegion(request.getRegion()); @@ -2974,7 +3001,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (!isLoadingCfsOnDemandSet) { scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault()); } - byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE); region.prepareScanner(scan); if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().preScannerOpen(scan); @@ -2985,9 +3011,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); } - scannerId = addScanner(scanner); - scannerName = String.valueOf(scannerId); + rsh = addScanner(scanner, region); + scannerName = rsh.scannerName; + scannerId = Long.parseLong(scannerName); + ttl = this.scannerLeaseTimeoutPeriod; + if (scan.getPrefetching()) { + rsh.enablePrefetching(scan.getCaching()); + } } if (rows > 0) { @@ -2995,110 +3026,34 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa // performed even before checking of Lease. // See HBASE-5974 if (request.hasNextCallSeq()) { - if (rsh == null) { - rsh = scanners.get(scannerName); - } - if (rsh != null) { - if (request.getNextCallSeq() != rsh.nextCallSeq) { - throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq - + " But the nextCallSeq got from client: " + request.getNextCallSeq() + - "; request=" + TextFormat.shortDebugString(request)); - } - // Increment the nextCallSeq value which is the next expected from client. - rsh.nextCallSeq++; + if (request.getNextCallSeq() != rsh.nextCallSeq) { + throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq + + " But the nextCallSeq got from client: " + request.getNextCallSeq() + + "; request=" + TextFormat.shortDebugString(request)); } + // Increment the nextCallSeq value which is the next expected from client. + rsh.nextCallSeq++; } - try { - // Remove lease while its being processed in server; protects against case - // where processing of request takes > lease expiration time. - lease = leases.removeLease(scannerName); - List results = new ArrayList(rows); - long currentScanResultSize = 0; - boolean done = false; - // Call coprocessor. Get region info from scanner. - if (region != null && region.getCoprocessorHost() != null) { - Boolean bypass = region.getCoprocessorHost().preScannerNext( - scanner, results, rows); - if (!results.isEmpty()) { - for (Result r : results) { - if (maxScannerResultSize < Long.MAX_VALUE){ - for (KeyValue kv : r.raw()) { - currentScanResultSize += kv.heapSize(); - } - } - } - } - if (bypass != null && bypass.booleanValue()) { - done = true; - } - } + ttl = this.scannerLeaseTimeoutPeriod; + ScanResult result = rsh.getScanResult(rows); + if (result.isException) { + throw result.ioException; + } - if (!done) { - long maxResultSize = scanner.getMaxResultSize(); - if (maxResultSize <= 0) { - maxResultSize = maxScannerResultSize; - } - List values = new ArrayList(); - MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint()); - region.startRegionOperation(Operation.SCAN); - try { - int i = 0; - synchronized(scanner) { - for (; i < rows - && currentScanResultSize < maxResultSize; i++) { - // Collect values to be returned here - boolean moreRows = scanner.nextRaw(values); - if (!values.isEmpty()) { - if (maxScannerResultSize < Long.MAX_VALUE){ - for (KeyValue kv : values) { - currentScanResultSize += kv.heapSize(); - } - } - results.add(new Result(values)); - } - if (!moreRows) { - break; - } - values.clear(); - } - } - region.readRequestsCount.add(i); - } finally { - region.closeRegionOperation(); - } - - // coprocessor postNext hook - if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); - } - } - - // If the scanner's filter - if any - is done with the scan - // and wants to tell the client to stop the scan. This is done by passing - // a null result, and setting moreResults to false. - if (scanner.isFilterDone() && results.isEmpty()) { - moreResults = false; - results = null; - } else { - ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder(); - List cellScannables = new ArrayList(results.size()); - for (Result res : results) { - cellScannables.add(res); - rcmBuilder.addCellsLength(res.size()); - } - builder.setResultCellMeta(rcmBuilder.build()); - // TODO is this okey to assume the type and cast - ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil - .createCellScanner(cellScannables)); - } - } finally { - // We're done. On way out re-add the above removed lease. - // Adding resets expiration time on lease. - if (scanners.containsKey(scannerName)) { - if (lease != null) leases.addLease(lease); - ttl = this.scannerLeaseTimeoutPeriod; + moreResults = result.moreResults; + if (result.results != null) { + List cellScannables = + new ArrayList(result.results.size()); + ResultCellMeta.Builder rcmBuilder = ResultCellMeta.newBuilder(); + for (Result res : result.results) { + cellScannables.add(res); + rcmBuilder.addCellsLength(res.size()); } + builder.setResultCellMeta(rcmBuilder.build()); + // TODO is this okey to assume the type and cast + ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil + .createCellScanner(cellScannables)); } } @@ -3112,9 +3067,13 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } rsh = scanners.remove(scannerName); if (rsh != null) { - scanner = rsh.s; - scanner.close(); - leases.cancelLease(scannerName); + rsh.closeScanner(); + try { + leases.cancelLease(scannerName); + } catch (LeaseException le) { + // That's ok, since the lease may be gone with + // the prefetcher when cancelled. + } if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(scanner); } @@ -4181,18 +4140,6 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString()); } - /** - * Holder class which holds the RegionScanner and nextCallSeq together. - */ - private static class RegionScannerHolder { - private RegionScanner s; - private long nextCallSeq = 0L; - - public RegionScannerHolder(RegionScanner s) { - this.s = s; - } - } - private boolean isHealthCheckerConfigured() { String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java new file mode 100644 index 00000000000..b32a082fa70 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerHolder.java @@ -0,0 +1,394 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; + +import com.google.common.base.Preconditions; + +/** + * Holder class which holds the RegionScanner, nextCallSeq, ScanPrefetcher + * and information needed for prefetcher/fetcher. + * + * Originally, this is an inner class of HRegionServer. We moved it out + * since HRegionServer is getting bigger and bigger. + */ +@InterfaceAudience.Private +public class RegionScannerHolder { + public final static String MAX_PREFETCHED_RESULT_SIZE_KEY + = "hbase.hregionserver.prefetcher.resultsize.max"; + public final static int MAX_PREFETCHED_RESULT_SIZE_DEFAULT = 256 * 1024 * 1024; + + final static Log LOG = LogFactory.getLog(RegionScannerHolder.class); + final static String PREFETCHER_THREAD_PREFIX = "scan-prefetch-"; + + private final static AtomicLong globalPrefetchedResultSize = new AtomicLong(); + + private ThreadPoolExecutor scanPrefetchThreadPool; + private Map scanners; + private long maxScannerResultSize; + private Configuration conf; + private Leases leases; + + private boolean prefetching = false; + private long maxGlobalPrefetchedResultSize; + private volatile Future prefetchScanFuture; + private volatile long prefetchedResultSize; + private ScanPrefetcher prefetcher; + private HRegion region; + private int rows; + + RegionScanner scanner; + long nextCallSeq = 0L; + String scannerName; + + /** + * Get the total size of all prefetched results not retrieved yet. + */ + public static long getPrefetchedResultSize() { + return globalPrefetchedResultSize.get(); + } + + /** + * Construct a RegionScanner holder for a specific region server. + * + * @param rs the region server the specific region is on + * @param s the scanner to be held + * @param r the region the scanner is for + */ + RegionScannerHolder(HRegionServer rs, RegionScanner s, HRegion r) { + scanPrefetchThreadPool = rs.scanPrefetchThreadPool; + maxScannerResultSize = rs.maxScannerResultSize; + prefetcher = new ScanPrefetcher(); + scanners = rs.scanners; + leases = rs.leases; + conf = rs.conf; + scanner = s; + region = r; + } + + public boolean isPrefetchSubmitted() { + return prefetchScanFuture != null; + } + + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + /** + * Find the current prefetched result size + */ + public long currentPrefetchedResultSize() { + return prefetchedResultSize; + } + + /** + * Wait till current prefetching task complete, + * return true if any data retrieved, false otherwise. + * Used for unit testing only. + */ + public boolean waitForPrefetchingDone() { + if (prefetchScanFuture != null) { + try { + ScanResult scanResult = prefetchScanFuture.get(); + return scanResult != null && scanResult.results != null + && !scanResult.results.isEmpty(); + } catch (Throwable t) { + LOG.debug("Got exception in getting scan result", t); + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } + } + return false; + } + + /** + * Stop any prefetching task and close the scanner. + * @throws IOException + */ + public void closeScanner() throws IOException { + // stop prefetcher if needed. + if (prefetchScanFuture != null) { + synchronized (prefetcher) { + prefetcher.scannerClosing = true; + prefetchScanFuture.cancel(false); + } + prefetchScanFuture = null; + if (prefetchedResultSize > 0) { + globalPrefetchedResultSize.addAndGet(-prefetchedResultSize); + prefetchedResultSize = 0L; + } + } + scanner.close(); + } + + /** + * Get the prefetched scan result, if any. Otherwise, + * do a scan synchronously and return the result, which + * may take some time. Region scan coprocessor, if specified, + * is invoked properly, which may override the scan result. + * + * @param rows the number of rows to scan, which is preferred + * not to change among scanner.next() calls. + * + * @return scan result, which has the data retrieved from + * the scanner, or some IOException if the scan failed. + * @throws IOException if failed to retrieve from the scanner. + */ + public ScanResult getScanResult(final int rows) throws IOException { + Preconditions.checkArgument(rows > 0, "Number of rows requested must be positive"); + ScanResult scanResult = null; + this.rows = rows; + + if (prefetchScanFuture == null) { + // Need to scan inline if not prefetched + scanResult = prefetcher.call(); + } else { + // if we have a prefetched result, then use it + try { + scanResult = prefetchScanFuture.get(); + if (scanResult.moreResults) { + int prefetchedRows = scanResult.results.size(); + if (prefetchedRows != 0 && this.rows > prefetchedRows) { + // Try to scan more since we haven't prefetched enough + this.rows -= prefetchedRows; + ScanResult tmp = prefetcher.call(); + if (tmp.isException) { + return tmp; // Keep the prefetched results for later + } + if (tmp.results != null && !tmp.results.isEmpty()) { + // Merge new results to the old result list + scanResult.results.addAll(tmp.results); + } + // Reset rows for next prefetching + this.rows = rows; + } + } + prefetchScanFuture = null; + if (prefetchedResultSize > 0) { + globalPrefetchedResultSize.addAndGet(-prefetchedResultSize); + prefetchedResultSize = 0L; + } + } catch (ExecutionException ee) { + throw new IOException("failed to run prefetching task", ee.getCause()); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + IOException iie = new InterruptedIOException("scan was interrupted"); + iie.initCause(ie); + throw iie; + } + } + + if (prefetching + && scanResult.moreResults && !scanResult.results.isEmpty()) { + long totalPrefetchedResultSize = globalPrefetchedResultSize.get(); + if (totalPrefetchedResultSize < maxGlobalPrefetchedResultSize) { + // Schedule a background prefetch for the next result + // if prefetch is enabled on scans and there are more results + prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher); + } else if (LOG.isTraceEnabled()) { + LOG.trace("One prefetching is skipped for scanner " + scannerName + + " since total prefetched result size " + totalPrefetchedResultSize + + " is more than the maximum configured " + + maxGlobalPrefetchedResultSize); + } + } + return scanResult; + } + + /** + * Set the rows to prefetch, and start the prefetching task. + */ + public void enablePrefetching(int caching) { + if (caching > 0) { + rows = caching; + } else { + rows = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + } + maxGlobalPrefetchedResultSize = conf.getLong( + MAX_PREFETCHED_RESULT_SIZE_KEY, MAX_PREFETCHED_RESULT_SIZE_DEFAULT); + if (globalPrefetchedResultSize.get() < maxGlobalPrefetchedResultSize) { + prefetchScanFuture = scanPrefetchThreadPool.submit(prefetcher); + } + prefetching = true; + } + + /** + * This Callable abstracts calling a pre-fetch next. This is called on a + * threadpool. It makes a pre-fetch next call with the same parameters as + * the incoming next call. Note that the number of rows to return (nbRows) + * and/or the memory size for the result is the same as the previous call if + * pre-fetching is enabled. If these parameters change dynamically, + * they will take effect in the subsequent iteration. + */ + class ScanPrefetcher implements Callable { + boolean scannerClosing = false; + + public ScanResult call() { + ScanResult scanResult = null; + Leases.Lease lease = null; + try { + // Remove lease while its being processed in server; protects against case + // where processing of request takes > lease expiration time. + lease = leases.removeLease(scannerName); + List results = new ArrayList(rows); + long currentScanResultSize = 0; + boolean moreResults = true; + + boolean done = false; + long maxResultSize = scanner.getMaxResultSize(); + if (maxResultSize <= 0) { + maxResultSize = maxScannerResultSize; + } + String threadName = Thread.currentThread().getName(); + boolean prefetchingThread = threadName.startsWith(PREFETCHER_THREAD_PREFIX); + // Call coprocessor. Get region info from scanner. + if (region != null && region.getCoprocessorHost() != null) { + Boolean bypass = region.getCoprocessorHost().preScannerNext( + scanner, results, rows); + if (!results.isEmpty() + && (prefetchingThread || maxResultSize < Long.MAX_VALUE)) { + for (Result r : results) { + for (KeyValue kv : r.raw()) { + currentScanResultSize += kv.heapSize(); + } + } + } + if (bypass != null && bypass.booleanValue()) { + done = true; + } + } + + if (!done) { + List values = new ArrayList(); + MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint()); + region.startRegionOperation(); + try { + int i = 0; + synchronized(scanner) { + for (; i < rows + && currentScanResultSize < maxResultSize; i++) { + // Collect values to be returned here + boolean moreRows = scanner.nextRaw(values); + if (!values.isEmpty()) { + if (prefetchingThread || maxResultSize < Long.MAX_VALUE){ + for (KeyValue kv : values) { + currentScanResultSize += kv.heapSize(); + } + } + results.add(new Result(values)); + } + if (!moreRows) { + break; + } + values.clear(); + } + } + region.readRequestsCount.add(i); + } finally { + region.closeRegionOperation(); + } + + // coprocessor postNext hook + if (region != null && region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postScannerNext(scanner, results, rows, true); + } + } + + // If the scanner's filter - if any - is done with the scan + // and wants to tell the client to stop the scan. This is done by passing + // a null result, and setting moreResults to false. + if (scanner.isFilterDone() && results.isEmpty()) { + moreResults = false; + results = null; + } + scanResult = new ScanResult(moreResults, results); + if (prefetchingThread && currentScanResultSize > 0) { + synchronized (prefetcher) { + if (!scannerClosing) { + globalPrefetchedResultSize.addAndGet(currentScanResultSize); + prefetchedResultSize = currentScanResultSize; + } + } + } + } catch (IOException e) { + // we should queue the exception as the result so that we can return + // this when the result is asked for + scanResult = new ScanResult(e); + } finally { + // We're done. On way out re-add the above removed lease. + // Adding resets expiration time on lease. + if (scanners.containsKey(scannerName)) { + if (lease != null) { + try { + leases.addLease(lease); + } catch (LeaseStillHeldException e) { + LOG.error("THIS SHOULD NOT HAPPEN", e); + } + } + } + } + return scanResult; + } + } +} + +/** + * This class abstracts the results of a single scanner's result. It tracks + * the list of Result objects if the pre-fetch next was successful, and + * tracks the exception if the next failed. + */ +class ScanResult { + final boolean isException; + IOException ioException = null; + + List results = null; + boolean moreResults = false; + + public ScanResult(IOException ioException) { + this.ioException = ioException; + isException = true; + } + + public ScanResult(boolean moreResults, List results) { + this.moreResults = moreResults; + this.results = results; + isException = false; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 28506bd0758..c258d1c2f2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -17,31 +17,41 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.regionserver.RegionScannerHolder; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.After; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * A client-side test, mostly testing scanners with various parameters. */ @Category(MediumTests.class) +@RunWith(Parameterized.class) public class TestScannersFromClientSide { private static final Log LOG = LogFactory.getLog(TestScannersFromClientSide.class); @@ -51,6 +61,37 @@ public class TestScannersFromClientSide { private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] VALUE = Bytes.toBytes("testValue"); + private final boolean prefetching; + private long maxSize; + + @Parameters + public static final Collection parameters() { + List prefetchings = new ArrayList(); + prefetchings.add(new Object[] {Long.valueOf(-1)}); + prefetchings.add(new Object[] {Long.valueOf(0)}); + prefetchings.add(new Object[] {Long.valueOf(1)}); + prefetchings.add(new Object[] {Long.valueOf(1024)}); + return prefetchings; + } + + public TestScannersFromClientSide(Long maxPrefetchedResultSize) { + this.maxSize = maxPrefetchedResultSize.longValue(); + if (this.maxSize < 0) { + this.prefetching = false; + } else { + this.prefetching = true; + if (this.maxSize == 0) { + this.maxSize = RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_DEFAULT; + } else { + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) { + Configuration conf = rst.getRegionServer().getConfiguration(); + conf.setLong(RegionScannerHolder.MAX_PREFETCHED_RESULT_SIZE_KEY, maxSize); + } + } + } + } + /** * @throws java.lang.Exception */ @@ -65,22 +106,9 @@ public class TestScannersFromClientSide { @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); - } - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - // Nothing to do. - } - - /** - * @throws java.lang.Exception - */ - @After - public void tearDown() throws Exception { - // Nothing to do. + long remainingPrefetchedSize = RegionScannerHolder.getPrefetchedResultSize(); + assertEquals("All prefetched results should be gone", + 0, remainingPrefetchedSize); } /** @@ -89,8 +117,23 @@ public class TestScannersFromClientSide { * @throws Exception */ @Test + public void testScanBatchWithDefaultCaching() throws Exception { + batchedScanWithCachingSpecified(-1); // Using default caching which is 100 + } + + /** + * Test from client side for batch of scan + * + * @throws Exception + */ + @Test public void testScanBatch() throws Exception { - byte [] TABLE = Bytes.toBytes("testScanBatch"); + batchedScanWithCachingSpecified(1); + } + + private void batchedScanWithCachingSpecified(int caching) throws Exception { + byte [] TABLE = Bytes.toBytes( + "testScanBatch-" + prefetching + "_" + maxSize + "_" + caching); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); HTable ht = TEST_UTIL.createTable(TABLE, FAMILY); @@ -99,7 +142,7 @@ public class TestScannersFromClientSide { Scan scan; Delete delete; Result result; - ResultScanner scanner; + ClientScanner scanner; boolean toLog = true; List kvListExp; @@ -124,8 +167,11 @@ public class TestScannersFromClientSide { // without batch scan = new Scan(ROW); + scan.setCaching(caching); scan.setMaxVersions(); - scanner = ht.getScanner(scan); + scan.setPrefetching(prefetching); + scanner = (ClientScanner)ht.getScanner(scan); + verifyPrefetching(scanner); // c4:4, c5:5, c6:6, c7:7 kvListExp = new ArrayList(); @@ -135,12 +181,16 @@ public class TestScannersFromClientSide { kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); result = scanner.next(); verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); + verifyPrefetching(scanner); // with batch scan = new Scan(ROW); + scan.setCaching(caching); scan.setMaxVersions(); scan.setBatch(2); - scanner = ht.getScanner(scan); + scan.setPrefetching(prefetching); + scanner = (ClientScanner)ht.getScanner(scan); + verifyPrefetching(scanner); // First batch: c4:4, c5:5 kvListExp = new ArrayList(); @@ -148,6 +198,7 @@ public class TestScannersFromClientSide { kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[5], 5, VALUE)); result = scanner.next(); verifyResult(result, kvListExp, toLog, "Testing first batch of scan"); + verifyPrefetching(scanner); // Second batch: c6:6, c7:7 kvListExp = new ArrayList(); @@ -155,7 +206,7 @@ public class TestScannersFromClientSide { kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[7], 7, VALUE)); result = scanner.next(); verifyResult(result, kvListExp, toLog, "Testing second batch of scan"); - + verifyPrefetching(scanner); } /** @@ -165,7 +216,7 @@ public class TestScannersFromClientSide { */ @Test public void testGetMaxResults() throws Exception { - byte [] TABLE = Bytes.toBytes("testGetMaxResults"); + byte [] TABLE = Bytes.toBytes("testGetMaxResults-" + prefetching + "_" + maxSize); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); @@ -285,7 +336,7 @@ public class TestScannersFromClientSide { */ @Test public void testScanMaxResults() throws Exception { - byte [] TABLE = Bytes.toBytes("testScanLimit"); + byte [] TABLE = Bytes.toBytes("testScanLimit-" + prefetching + "_" + maxSize); byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); @@ -315,17 +366,19 @@ public class TestScannersFromClientSide { } scan = new Scan(); + scan.setCaching(1); + scan.setPrefetching(prefetching); scan.setMaxResultsPerColumnFamily(4); - ResultScanner scanner = ht.getScanner(scan); + ClientScanner scanner = (ClientScanner)ht.getScanner(scan); kvListScan = new ArrayList(); while ((result = scanner.next()) != null) { + verifyPrefetching(scanner); for (KeyValue kv : result.list()) { kvListScan.add(kv); } } result = new Result(kvListScan); verifyResult(result, kvListExp, toLog, "Testing scan with maxResults"); - } /** @@ -335,7 +388,7 @@ public class TestScannersFromClientSide { */ @Test public void testGetRowOffset() throws Exception { - byte [] TABLE = Bytes.toBytes("testGetRowOffset"); + byte [] TABLE = Bytes.toBytes("testGetRowOffset-" + prefetching + "_" + maxSize); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); @@ -421,7 +474,47 @@ public class TestScannersFromClientSide { kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[5], 1, VALUE)); verifyResult(result, kvListExp, toLog, "Testing offset + multiple CFs + maxResults"); + } + /** + * For testing only, find a region scanner holder for a scan. + */ + RegionScannerHolder findRegionScannerHolder(ClientScanner scanner) { + long scannerId = scanner.currentScannerId(); + if (scannerId == -1L) return null; + + HRegionInfo expectedRegion = scanner.currentRegionInfo(); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + for (JVMClusterUtil.RegionServerThread rst: cluster.getLiveRegionServerThreads()) { + RegionScannerHolder rsh = rst.getRegionServer().getScannerHolder(scannerId); + if (rsh != null && rsh.getRegionInfo().equals(expectedRegion)) { + return rsh; + } + } + return null; + } + + void verifyPrefetching(ClientScanner scanner) throws IOException { + long scannerId = scanner.currentScannerId(); + if (scannerId == -1L) return; // scanner is already closed + RegionScannerHolder rsh = findRegionScannerHolder(scanner); + assertNotNull("We should be able to find the scanner", rsh); + boolean isPrefetchSubmitted = rsh.isPrefetchSubmitted(); + if (prefetching && (RegionScannerHolder.getPrefetchedResultSize() < this.maxSize)) { + assertTrue("Prefetching should be submitted or no more result", + isPrefetchSubmitted || scanner.next() == null); + } else if (isPrefetchSubmitted) { + // Prefetch submitted, it must be because prefetching is enabled, + // and there was still room before it's scheduled + long sizeBefore = RegionScannerHolder.getPrefetchedResultSize() + - rsh.currentPrefetchedResultSize(); + assertTrue("There should have room before prefetching is submitted", + prefetching && sizeBefore < this.maxSize); + } + if (isPrefetchSubmitted && rsh.waitForPrefetchingDone()) { + assertTrue("Prefetched result size should not be 0", + rsh.currentPrefetchedResultSize() > 0); + } } static void verifyResult(Result result, List expKvList, boolean toLog, @@ -449,6 +542,4 @@ public class TestScannersFromClientSide { assertEquals(expKvList.size(), result.size()); } - - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 2dd1189de4b..9ddd1762cba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcesso import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -130,6 +129,7 @@ public class TestRowProcessorEndpoint { // ignore table not found } table = util.createTable(TABLE, FAM); + table.setAutoFlush(false); { Put put = new Put(ROW); put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A @@ -143,6 +143,8 @@ public class TestRowProcessorEndpoint { put.add(FAM, F, G); table.put(put); row2Size = put.size(); + table.clearRegionCache(); + table.flushCommits(); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java index 9d7b076981e..a686ba25974 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java @@ -284,6 +284,7 @@ public class TestProtobufUtil { scanBuilder = ClientProtos.Scan.newBuilder(proto); scanBuilder.setMaxVersions(1); scanBuilder.setCacheBlocks(true); + scanBuilder.setPrefetching(true); Scan scan = ProtobufUtil.toScan(proto); assertEquals(scanBuilder.build(), ProtobufUtil.toScan(scan)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 7cd7ea687c5..b406dcda0d4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -335,6 +335,7 @@ public class TestRegionServerMetrics { Scan s = new Scan(); s.setBatch(1); s.setCaching(1); + s.setPrefetching(false); ResultScanner resultScanners = t.getScanner(s); for (int nextCount = 0; nextCount < 30; nextCount++) {