From c9073c463190d38f3ade7b836101428219dc9c3d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 25 Oct 2012 16:43:22 +0000 Subject: [PATCH] HBASE-5974 Scanner retry behavior with RPC timeout on next() seems incorrect git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1402214 13f79535-47bb-0310-9956-ffa450edef68 --- .../hbase/OutOfOrderScannerNextException.java | 38 ++++ .../hadoop/hbase/client/ClientScanner.java | 6 +- .../hadoop/hbase/client/ScannerCallable.java | 15 +- .../hbase/protobuf/RequestConverter.java | 19 ++ .../protobuf/generated/ClientProtos.java | 169 ++++++++++++------ .../hbase/regionserver/HRegionServer.java | 61 +++++-- .../hadoop/hbase/util/JVMClusterUtil.java | 5 +- hbase-server/src/main/protobuf/Client.proto | 1 + .../client/TestClientScannerRPCTimeout.java | 129 +++++++++++++ 9 files changed, 375 insertions(+), 68 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java new file mode 100644 index 00000000000..b84e705fb14 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/OutOfOrderScannerNextException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown by a RegionServer while doing next() calls on a ResultScanner. Both client and server + * maintain a nextCallSeq and if they do not match, RS will throw this exception. + */ +@InterfaceAudience.Private +public class OutOfOrderScannerNextException extends DoNotRetryIOException { + + private static final long serialVersionUID = 4595751007554273567L; + + public OutOfOrderScannerNextException() { + super(); + } + + public OutOfOrderScannerNextException(String msg) { + super(msg); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 088753c52fb..79d42a671d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; @@ -295,8 +296,9 @@ public class ClientScanner extends AbstractClientScanner { } } else { Throwable cause = e.getCause(); - if (cause == null || (!(cause instanceof NotServingRegionException) - && !(cause instanceof RegionServerStoppedException))) { + if ((cause == null || (!(cause instanceof NotServingRegionException) + && !(cause instanceof RegionServerStoppedException))) + && !(e instanceof OutOfOrderScannerNextException)) { throw e; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 7f1efc6d91a..e4f3b9927a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -67,7 +67,8 @@ public class ScannerCallable extends ServerCallable { // indicate if it is a remote server call private boolean isRegionServerRemote = true; - + private long nextCallSeq = 0; + /** * @param connection which connection * @param tableName table callable is on @@ -138,9 +139,19 @@ public class ScannerCallable extends ServerCallable { try { incRPCcallsMetrics(); ScanRequest request = - RequestConverter.buildScanRequest(scannerId, caching, false); + RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); try { ScanResponse response = server.scan(null, request); + // Client and RS maintain a nextCallSeq number during the scan. Every next() call + // from client to server will increment this number in both sides. Client passes this + // number along with the request and at RS side both the incoming nextCallSeq and its + // nextCallSeq will be matched. In case of a timeout this increment at the client side + // should not happen. If at the server side fetching of next batch of data was over, + // there will be mismatch in the nextCallSeq number. Server will throw + // OutOfOrderScannerNextException and then client will reopen the scanner with startrow + // as the last successfully retrieved row. + // See HBASE-5974 + nextCallSeq++; long timestamp = System.currentTimeMillis(); rrs = ResponseConverter.getResults(response); if (logScannerActivity) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 37e44e4236f..7c30dab6c44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -416,6 +416,25 @@ public final class RequestConverter { builder.setScannerId(scannerId); return builder.build(); } + + /** + * Create a protocol buffer ScanRequest for a scanner id + * + * @param scannerId + * @param numberOfRows + * @param closeScanner + * @param nextCallSeq + * @return a scan request + */ + public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, + final boolean closeScanner, final long nextCallSeq) { + ScanRequest.Builder builder = ScanRequest.newBuilder(); + builder.setNumberOfRows(numberOfRows); + builder.setCloseScanner(closeScanner); + builder.setScannerId(scannerId); + builder.setNextCallSeq(nextCallSeq); + return builder.build(); + } /** * Create a protocol buffer LockRowRequest diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 4a8c95fff03..05a43f6416f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -10485,6 +10485,10 @@ public final class ClientProtos { // optional bool closeScanner = 5; boolean hasCloseScanner(); boolean getCloseScanner(); + + // optional uint64 nextCallSeq = 6; + boolean hasNextCallSeq(); + long getNextCallSeq(); } public static final class ScanRequest extends com.google.protobuf.GeneratedMessage @@ -10571,12 +10575,23 @@ public final class ClientProtos { return closeScanner_; } + // optional uint64 nextCallSeq = 6; + public static final int NEXTCALLSEQ_FIELD_NUMBER = 6; + private long nextCallSeq_; + public boolean hasNextCallSeq() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getNextCallSeq() { + return nextCallSeq_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); scannerId_ = 0L; numberOfRows_ = 0; closeScanner_ = false; + nextCallSeq_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -10617,6 +10632,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeBool(5, closeScanner_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeUInt64(6, nextCallSeq_); + } getUnknownFields().writeTo(output); } @@ -10646,6 +10664,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(5, closeScanner_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(6, nextCallSeq_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10694,6 +10716,11 @@ public final class ClientProtos { result = result && (getCloseScanner() == other.getCloseScanner()); } + result = result && (hasNextCallSeq() == other.hasNextCallSeq()); + if (hasNextCallSeq()) { + result = result && (getNextCallSeq() + == other.getNextCallSeq()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -10723,6 +10750,10 @@ public final class ClientProtos { hash = (37 * hash) + CLOSESCANNER_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getCloseScanner()); } + if (hasNextCallSeq()) { + hash = (37 * hash) + NEXTCALLSEQ_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getNextCallSeq()); + } hash = (29 * hash) + getUnknownFields().hashCode(); return hash; } @@ -10859,6 +10890,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000008); closeScanner_ = false; bitField0_ = (bitField0_ & ~0x00000010); + nextCallSeq_ = 0L; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -10925,6 +10958,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000010; } result.closeScanner_ = closeScanner_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.nextCallSeq_ = nextCallSeq_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10956,6 +10993,9 @@ public final class ClientProtos { if (other.hasCloseScanner()) { setCloseScanner(other.getCloseScanner()); } + if (other.hasNextCallSeq()) { + setNextCallSeq(other.getNextCallSeq()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -11032,6 +11072,11 @@ public final class ClientProtos { closeScanner_ = input.readBool(); break; } + case 48: { + bitField0_ |= 0x00000020; + nextCallSeq_ = input.readUInt64(); + break; + } } } } @@ -11281,6 +11326,27 @@ public final class ClientProtos { return this; } + // optional uint64 nextCallSeq = 6; + private long nextCallSeq_ ; + public boolean hasNextCallSeq() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getNextCallSeq() { + return nextCallSeq_; + } + public Builder setNextCallSeq(long value) { + bitField0_ |= 0x00000020; + nextCallSeq_ = value; + onChanged(); + return this; + } + public Builder clearNextCallSeq() { + bitField0_ = (bitField0_ & ~0x00000020); + nextCallSeq_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -23715,59 +23781,60 @@ public final class ClientProtos { "Range\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVersions\030\007", " \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true\022\021\n\tba" + "tchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001(\004\022\022\n\n" + - "storeLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 \001(\r\"\203\001" + + "storeLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 \001(\r\"\230\001" + "\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.RegionSp" + "ecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscannerI" + "d\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014closeSc" + - "anner\030\005 \001(\010\"\\\n\014ScanResponse\022\027\n\006result\030\001 " + - "\003(\0132\007.Result\022\021\n\tscannerId\030\002 \001(\004\022\023\n\013moreR" + - "esults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"?\n\016LockRowRequ" + - "est\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\013\n", - "\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006lockId\030" + - "\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowRequest\022 " + - "\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006lock" + - "Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\260\001\n\024BulkLo" + - "adHFileRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" + - "pecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkLoadH" + - "FileRequest.FamilyPath\022\024\n\014assignSeqNum\030\003" + - " \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pa" + - "th\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loa" + - "ded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003row\030\001 \002(\014\022\024\n\014proto", - "colName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022!\n\010pro" + - "perty\030\004 \003(\0132\017.NameStringPair\022!\n\tparamete" + - "r\030\005 \003(\0132\016.NameBytesPair\"O\n\026ExecCoprocess" + - "orRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" + - "ier\022\023\n\004call\030\002 \002(\0132\005.Exec\"8\n\027ExecCoproces" + - "sorResponse\022\035\n\005value\030\001 \002(\0132\016.NameBytesPa" + - "ir\"_\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(" + - "\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002" + - "(\t\022\017\n\007request\030\004 \002(\014\"d\n\031CoprocessorServic" + - "eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi", - "er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" + - "l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" + - "n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" + - "2\016.NameBytesPair\"N\n\013MultiAction\022\027\n\006mutat" + - "e\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\022\023\n\004e" + - "xec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult\022\035\n\005valu" + - "e\030\001 \001(\0132\016.NameBytesPair\022!\n\texception\030\002 \001" + - "(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n\006re" + - "gion\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action\030\002" + - " \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMu", - "ltiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionResu" + - "lt2\331\003\n\rClientService\022 \n\003get\022\013.GetRequest" + - "\032\014.GetResponse\022)\n\006mutate\022\016.MutateRequest" + - "\032\017.MutateResponse\022#\n\004scan\022\014.ScanRequest\032" + - "\r.ScanResponse\022,\n\007lockRow\022\017.LockRowReque" + - "st\032\020.LockRowResponse\0222\n\tunlockRow\022\021.Unlo" + - "ckRowRequest\032\022.UnlockRowResponse\022>\n\rbulk" + - "LoadHFile\022\025.BulkLoadHFileRequest\032\026.BulkL" + - "oadHFileResponse\022D\n\017execCoprocessor\022\027.Ex" + - "ecCoprocessorRequest\032\030.ExecCoprocessorRe", - "sponse\022F\n\013execService\022\032.CoprocessorServi" + - "ceRequest\032\033.CoprocessorServiceResponse\022&" + - "\n\005multi\022\r.MultiRequest\032\016.MultiResponseBB" + - "\n*org.apache.hadoop.hbase.protobuf.gener" + - "atedB\014ClientProtosH\001\210\001\001\240\001\001" + "anner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\\n\014Scan" + + "Response\022\027\n\006result\030\001 \003(\0132\007.Result\022\021\n\tsca" + + "nnerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl" + + "\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region\030\001 \002(\013", + "2\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n\017LockR" + + "owResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"" + + "D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(\0132\020.Re" + + "gionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021UnlockR" + + "owResponse\"\260\001\n\024BulkLoadHFileRequest\022 \n\006r" + + "egion\030\001 \002(\0132\020.RegionSpecifier\0224\n\nfamilyP" + + "ath\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyP" + + "ath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamilyPath\022" + + "\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoa" + + "dHFileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec\022", + "\013\n\003row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nme" + + "thodName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.NameS" + + "tringPair\022!\n\tparameter\030\005 \003(\0132\016.NameBytes" + + "Pair\"O\n\026ExecCoprocessorRequest\022 \n\006region" + + "\030\001 \002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132\005" + + ".Exec\"8\n\027ExecCoprocessorResponse\022\035\n\005valu" + + "e\030\001 \002(\0132\016.NameBytesPair\"_\n\026CoprocessorSe" + + "rviceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 " + + "\002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" + + "\"d\n\031CoprocessorServiceRequest\022 \n\006region\030", + "\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027." + + "CoprocessorServiceCall\"]\n\032CoprocessorSer" + + "viceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSpe" + + "cifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"N\n" + + "\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n" + + "\003get\030\002 \001(\0132\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n" + + "\014ActionResult\022\035\n\005value\030\001 \001(\0132\016.NameBytes" + + "Pair\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" + + "^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.Region" + + "Specifier\022\034\n\006action\030\002 \003(\0132\014.MultiAction\022", + "\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006resu" + + "lt\030\001 \003(\0132\r.ActionResult2\331\003\n\rClientServic" + + "e\022 \n\003get\022\013.GetRequest\032\014.GetResponse\022)\n\006m" + + "utate\022\016.MutateRequest\032\017.MutateResponse\022#" + + "\n\004scan\022\014.ScanRequest\032\r.ScanResponse\022,\n\007l" + + "ockRow\022\017.LockRowRequest\032\020.LockRowRespons" + + "e\0222\n\tunlockRow\022\021.UnlockRowRequest\032\022.Unlo" + + "ckRowResponse\022>\n\rbulkLoadHFile\022\025.BulkLoa" + + "dHFileRequest\032\026.BulkLoadHFileResponse\022D\n" + + "\017execCoprocessor\022\027.ExecCoprocessorReques", + "t\032\030.ExecCoprocessorResponse\022F\n\013execServi" + + "ce\022\032.CoprocessorServiceRequest\032\033.Coproce" + + "ssorServiceResponse\022&\n\005multi\022\r.MultiRequ" + + "est\032\016.MultiResponseBB\n*org.apache.hadoop" + + ".hbase.protobuf.generatedB\014ClientProtosH" + + "\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -23875,7 +23942,7 @@ public final class ClientProtos { internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", }, + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", }, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.Builder.class); internal_static_ScanResponse_descriptor = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a3d98a787de..06cbcce8ec8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.RegionMovedException; import org.apache.hadoop.hbase.RegionServerStatusProtocol; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -280,8 +281,8 @@ public class HRegionServer implements ClientProtocol, // Compactions public CompactSplitThread compactSplitThread; - final ConcurrentHashMap scanners = - new ConcurrentHashMap(); + final ConcurrentHashMap scanners = + new ConcurrentHashMap(); /** * Map of regions currently being served by this region server. Key is the @@ -560,7 +561,11 @@ public class HRegionServer implements ClientProtocol, RegionScanner getScanner(long scannerId) { String scannerIdString = Long.toString(scannerId); - return scanners.get(scannerIdString); + RegionScannerHolder scannerHolder = scanners.get(scannerIdString); + if (scannerHolder != null) { + return scannerHolder.s; + } + return null; } /** @@ -1140,9 +1145,9 @@ public class HRegionServer implements ClientProtocol, private void closeAllScanners() { // Close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. - for (Map.Entry e : this.scanners.entrySet()) { + for (Map.Entry e : this.scanners.entrySet()) { try { - e.getValue().close(); + e.getValue().s.close(); } catch (IOException ioe) { LOG.warn("Closing scanner " + e.getKey(), ioe); } @@ -2536,8 +2541,9 @@ public class HRegionServer implements ClientProtocol, } public void leaseExpired() { - RegionScanner s = scanners.remove(this.scannerName); - if (s != null) { + RegionScannerHolder rsh = scanners.remove(this.scannerName); + if (rsh != null) { + RegionScanner s = rsh.s; LOG.info("Scanner " + this.scannerName + " lease expired on region " + s.getRegionInfo().getRegionNameAsString()); try { @@ -2841,7 +2847,7 @@ public class HRegionServer implements ClientProtocol, scannerId = rand.nextLong(); if (scannerId == -1) continue; String scannerName = String.valueOf(scannerId); - RegionScanner existing = scanners.putIfAbsent(scannerName, s); + RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s)); if (existing == null) { this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); @@ -3073,6 +3079,7 @@ public class HRegionServer implements ClientProtocol, int ttl = 0; HRegion region = null; RegionScanner scanner = null; + RegionScannerHolder rsh = null; boolean moreResults = true; boolean closeScanner = false; ScanResponse.Builder builder = ScanResponse.newBuilder(); @@ -3084,11 +3091,12 @@ public class HRegionServer implements ClientProtocol, rows = request.getNumberOfRows(); } if (request.hasScannerId()) { - scanner = scanners.get(scannerName); - if (scanner == null) { + rsh = scanners.get(scannerName); + if (rsh == null) { throw new UnknownScannerException( "Name: " + scannerName + ", already closed?"); } + scanner = rsh.s; region = getRegion(scanner.getRegionInfo().getRegionName()); } else { region = getRegion(request.getRegion()); @@ -3110,6 +3118,22 @@ public class HRegionServer implements ClientProtocol, } if (rows > 0) { + // if nextCallSeq does not match throw Exception straight away. This needs to be + // performed even before checking of Lease. + // See HBASE-5974 + if (request.hasNextCallSeq()) { + if (rsh == null) { + rsh = scanners.get(scannerName); + } + if (rsh != null) { + if (request.getNextCallSeq() != rsh.nextCallSeq) { + throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq + + " But the nextCallSeq got from client: " + request.getNextCallSeq()); + } + // Increment the nextCallSeq value which is the next expected from client. + rsh.nextCallSeq++; + } + } try { // Remove lease while its being processed in server; protects against case // where processing of request takes > lease expiration time. @@ -3193,8 +3217,9 @@ public class HRegionServer implements ClientProtocol, return builder.build(); // bypass } } - scanner = scanners.remove(scannerName); - if (scanner != null) { + rsh = scanners.remove(scannerName); + if (rsh != null) { + scanner = rsh.s; scanner.close(); leases.cancelLease(scannerName); if (region != null && region.getCoprocessorHost() != null) { @@ -4135,4 +4160,16 @@ public class HRegionServer implements ClientProtocol, private String getMyEphemeralNodePath() { return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString()); } + + /** + * Holder class which holds the RegionScanner and nextCallSeq together. + */ + private static class RegionScannerHolder { + private RegionScanner s; + private long nextCallSeq = 0L; + + public RegionScannerHolder(RegionScanner s) { + this.s = s; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java index fc08dc26567..dc370989435 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.List; @@ -82,7 +83,9 @@ public class JVMClusterUtil { throws IOException { HRegionServer server; try { - server = hrsc.getConstructor(Configuration.class).newInstance(c); + Constructor ctor = hrsc.getConstructor(Configuration.class); + ctor.setAccessible(true); + server = ctor.newInstance(c); } catch (InvocationTargetException ite) { Throwable target = ite.getTargetException(); throw new RuntimeException("Failed construction of RegionServer: " + diff --git a/hbase-server/src/main/protobuf/Client.proto b/hbase-server/src/main/protobuf/Client.proto index c05a33d6c25..dd349980b85 100644 --- a/hbase-server/src/main/protobuf/Client.proto +++ b/hbase-server/src/main/protobuf/Client.proto @@ -209,6 +209,7 @@ message ScanRequest { optional uint64 scannerId = 3; optional uint32 numberOfRows = 4; optional bool closeScanner = 5; + optional uint64 nextCallSeq = 6; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java new file mode 100644 index 00000000000..3c90fdc781d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerRPCTimeout.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client side and + * getting retried. This scenario should not result in some data being skipped at RS side. + */ +@Category(MediumTests.class) +public class TestClientScannerRPCTimeout { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + private static final int rpcTimeout = 2 * 1000; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testScannerNextRPCTimesout() throws Exception { + final byte[] TABLE_NAME = Bytes.toBytes("testScannerNextRPCTimesout"); + HTable ht = TEST_UTIL.createTable(TABLE_NAME, FAMILY); + byte[] r1 = Bytes.toBytes("row-1"); + byte[] r2 = Bytes.toBytes("row-2"); + byte[] r3 = Bytes.toBytes("row-3"); + putToTable(ht, r1); + putToTable(ht, r2); + putToTable(ht, r3); + RegionServerWithScanTimeout.seqNoToSleepOn = 1; + Scan scan = new Scan(); + scan.setCaching(1); + ResultScanner scanner = ht.getScanner(scan); + Result result = scanner.next(); + assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow())); + long t1 = System.currentTimeMillis(); + result = scanner.next(); + assertTrue((System.currentTimeMillis() - t1) > rpcTimeout); + assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow())); + RegionServerWithScanTimeout.seqNoToSleepOn = -1;// No need of sleep + result = scanner.next(); + assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow())); + scanner.close(); + } + + private void putToTable(HTable ht, byte[] rowkey) throws IOException { + Put put = new Put(rowkey); + put.add(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer { + private long tableScannerId; + private boolean slept; + private static long seqNoToSleepOn = -1; + + public RegionServerWithScanTimeout(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + @Override + public ScanResponse scan(final RpcController controller, final ScanRequest request) + throws ServiceException { + if (request.hasScannerId()) { + if (!slept && this.tableScannerId == request.getScannerId() + && seqNoToSleepOn == request.getNextCallSeq()) { + try { + Thread.sleep(rpcTimeout + 500); + } catch (InterruptedException e) { + } + slept = true; + } + return super.scan(controller, request); + } else { + ScanResponse scanRes = super.scan(controller, request); + String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); + if (!regionName.contains("-ROOT-") && !regionName.contains(".META.")) { + tableScannerId = scanRes.getScannerId(); + } + return scanRes; + } + } + } +} \ No newline at end of file