diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index b99406330c3..22c1c9535ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -760,13 +760,13 @@ public class ClientScanner extends AbstractClientScanner { public boolean renewLease() { if (callable != null) { // do not return any rows, do not advance the scanner - callable.setCaching(0); + callable.setRenew(true); try { this.caller.callWithoutRetries(callable, this.scannerTimeout); } catch (Exception e) { return false; } finally { - callable.setCaching(this.caching); + callable.setRenew(false); } return true; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 93d3fb5328d..51003149af2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -72,6 +72,7 @@ public class ScannerCallable extends RegionServerCallable { protected long scannerId = -1L; protected boolean instantiated = false; protected boolean closed = false; + protected boolean renew = false; private Scan scan; private int caching = 1; protected final ClusterConnection cConnection; @@ -209,7 +210,7 @@ public class ScannerCallable extends RegionServerCallable { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, - this.scanMetrics != null); + this.scanMetrics != null, renew); ScanResponse response = null; controller = controllerFactory.newController(); controller.setPriority(getTableName()); @@ -413,6 +414,15 @@ public class ScannerCallable extends RegionServerCallable { this.closed = true; } + /** + * Indicate whether we make a call only to renew the lease, but without affected the scanner in + * any other way. + * @param val true if only the lease should be renewed + */ + public void setRenew(boolean val) { + this.renew = val; + } + /** * @return the HRegionInfo for the current region */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 7418292bad6..f8feca168bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -98,6 +98,10 @@ class ScannerCallableWithReplicas implements RetryingCallable { currentScannerCallable.setClose(); } + public void setRenew(boolean val) { + currentScannerCallable.setRenew(val); + } + public void setCaching(int caching) { currentScannerCallable.setCaching(caching); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 1a22106bb51..74c18b09226 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -529,7 +529,8 @@ public final class RequestConverter { * @return a scan request */ public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics) { + final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics, + final boolean renew) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); @@ -538,6 +539,7 @@ public final class RequestConverter { builder.setClientHandlesPartials(true); builder.setClientHandlesHeartbeats(true); builder.setTrackScanMetrics(trackMetrics); + builder.setRenew(renew); return builder.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 315eee1ec30..e6e715dff3b 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -17303,6 +17303,16 @@ public final class ClientProtos { * optional bool track_scan_metrics = 9; */ boolean getTrackScanMetrics(); + + // optional bool renew = 10 [default = false]; + /** + * optional bool renew = 10 [default = false]; + */ + boolean hasRenew(); + /** + * optional bool renew = 10 [default = false]; + */ + boolean getRenew(); } /** * Protobuf type {@code hbase.pb.ScanRequest} @@ -17429,6 +17439,11 @@ public final class ClientProtos { trackScanMetrics_ = input.readBool(); break; } + case 80: { + bitField0_ |= 0x00000200; + renew_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -17625,6 +17640,22 @@ public final class ClientProtos { return trackScanMetrics_; } + // optional bool renew = 10 [default = false]; + public static final int RENEW_FIELD_NUMBER = 10; + private boolean renew_; + /** + * optional bool renew = 10 [default = false]; + */ + public boolean hasRenew() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + /** + * optional bool renew = 10 [default = false]; + */ + public boolean getRenew() { + return renew_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -17635,6 +17666,7 @@ public final class ClientProtos { clientHandlesPartials_ = false; clientHandlesHeartbeats_ = false; trackScanMetrics_ = false; + renew_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -17687,6 +17719,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000100) == 0x00000100)) { output.writeBool(9, trackScanMetrics_); } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + output.writeBool(10, renew_); + } getUnknownFields().writeTo(output); } @@ -17732,6 +17767,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(9, trackScanMetrics_); } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(10, renew_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -17800,6 +17839,11 @@ public final class ClientProtos { result = result && (getTrackScanMetrics() == other.getTrackScanMetrics()); } + result = result && (hasRenew() == other.hasRenew()); + if (hasRenew()) { + result = result && (getRenew() + == other.getRenew()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -17849,6 +17893,10 @@ public final class ClientProtos { hash = (37 * hash) + TRACK_SCAN_METRICS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getTrackScanMetrics()); } + if (hasRenew()) { + hash = (37 * hash) + RENEW_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getRenew()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17999,6 +18047,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000080); trackScanMetrics_ = false; bitField0_ = (bitField0_ & ~0x00000100); + renew_ = false; + bitField0_ = (bitField0_ & ~0x00000200); return this; } @@ -18071,6 +18121,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000100; } result.trackScanMetrics_ = trackScanMetrics_; + if (((from_bitField0_ & 0x00000200) == 0x00000200)) { + to_bitField0_ |= 0x00000200; + } + result.renew_ = renew_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18114,6 +18168,9 @@ public final class ClientProtos { if (other.hasTrackScanMetrics()) { setTrackScanMetrics(other.getTrackScanMetrics()); } + if (other.hasRenew()) { + setRenew(other.getRenew()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -18618,6 +18675,39 @@ public final class ClientProtos { return this; } + // optional bool renew = 10 [default = false]; + private boolean renew_ ; + /** + * optional bool renew = 10 [default = false]; + */ + public boolean hasRenew() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + /** + * optional bool renew = 10 [default = false]; + */ + public boolean getRenew() { + return renew_; + } + /** + * optional bool renew = 10 [default = false]; + */ + public Builder setRenew(boolean value) { + bitField0_ |= 0x00000200; + renew_ = value; + onChanged(); + return this; + } + /** + * optional bool renew = 10 [default = false]; + */ + public Builder clearRenew() { + bitField0_ = (bitField0_ & ~0x00000200); + renew_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.ScanRequest) } @@ -34289,76 +34379,76 @@ public final class ClientProtos { "\025.hbase.pb.Consistency:\006STRONG\022\017\n\007cachin" + "g\030\021 \001(\r\022\035\n\025allow_partial_results\030\022 \001(\010\0226" + "\n\rcf_time_range\030\023 \003(\0132\037.hbase.pb.ColumnF" + - "amilyTimeRange\"\220\002\n\013ScanRequest\022)\n\006region", + "amilyTimeRange\"\246\002\n\013ScanRequest\022)\n\006region", "\030\001 \001(\0132\031.hbase.pb.RegionSpecifier\022\034\n\004sca" + "n\030\002 \001(\0132\016.hbase.pb.Scan\022\022\n\nscanner_id\030\003 " + "\001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rclose_sca" + "nner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(\004\022\037\n\027cli" + "ent_handles_partials\030\007 \001(\010\022!\n\031client_han" + "dles_heartbeats\030\010 \001(\010\022\032\n\022track_scan_metr" + - "ics\030\t \001(\010\"\232\002\n\014ScanResponse\022\030\n\020cells_per_" + - "result\030\001 \003(\r\022\022\n\nscanner_id\030\002 \001(\004\022\024\n\014more" + - "_results\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\022!\n\007results\030\005" + - " \003(\0132\020.hbase.pb.Result\022\r\n\005stale\030\006 \001(\010\022\037\n", - "\027partial_flag_per_result\030\007 \003(\010\022\036\n\026more_r" + - "esults_in_region\030\010 \001(\010\022\031\n\021heartbeat_mess" + - "age\030\t \001(\010\022+\n\014scan_metrics\030\n \001(\0132\025.hbase." + - "pb.ScanMetrics\"\305\001\n\024BulkLoadHFileRequest\022" + - ")\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifi" + - "er\022>\n\013family_path\030\002 \003(\0132).hbase.pb.BulkL" + - "oadHFileRequest.FamilyPath\022\026\n\016assign_seq" + - "_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014" + - "\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022" + - "\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServiceCal", - "l\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013" + - "method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Co" + - "processorServiceResult\022&\n\005value\030\001 \001(\0132\027." + - "hbase.pb.NameBytesPair\"v\n\031CoprocessorSer" + - "viceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" + - "egionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb." + - "CoprocessorServiceCall\"o\n\032CoprocessorSer" + - "viceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb." + - "RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.p" + - "b.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(", - "\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationP" + - "roto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014serv" + - "ice_call\030\004 \001(\0132 .hbase.pb.CoprocessorSer" + - "viceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\013" + - "2\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 " + - "\001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n" + - "\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\001" + - "0\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compactio" + - "nPressure\030\003 \001(\005:\0010\"\332\001\n\021ResultOrException" + - "\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.", - "pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb." + - "NameBytesPair\022:\n\016service_result\030\004 \001(\0132\"." + - "hbase.pb.CoprocessorServiceResult\022,\n\tloa" + - "dStats\030\005 \001(\0132\031.hbase.pb.RegionLoadStats\"" + - "x\n\022RegionActionResult\0226\n\021resultOrExcepti" + - "on\030\001 \003(\0132\033.hbase.pb.ResultOrException\022*\n" + - "\texception\030\002 \001(\0132\027.hbase.pb.NameBytesPai" + - "r\"x\n\014MultiRequest\022,\n\014regionAction\030\001 \003(\0132" + - "\026.hbase.pb.RegionAction\022\022\n\nnonceGroup\030\002 " + - "\001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.Condit", - "ion\"\\\n\rMultiResponse\0228\n\022regionActionResu" + - "lt\030\001 \003(\0132\034.hbase.pb.RegionActionResult\022\021" + - "\n\tprocessed\030\002 \001(\010*\'\n\013Consistency\022\n\n\006STRO" + - "NG\020\000\022\014\n\010TIMELINE\020\0012\203\004\n\rClientService\0222\n\003" + - "Get\022\024.hbase.pb.GetRequest\032\025.hbase.pb.Get" + - "Response\022;\n\006Mutate\022\027.hbase.pb.MutateRequ" + - "est\032\030.hbase.pb.MutateResponse\0225\n\004Scan\022\025." + - "hbase.pb.ScanRequest\032\026.hbase.pb.ScanResp" + - "onse\022P\n\rBulkLoadHFile\022\036.hbase.pb.BulkLoa" + - "dHFileRequest\032\037.hbase.pb.BulkLoadHFileRe", - "sponse\022X\n\013ExecService\022#.hbase.pb.Coproce" + - "ssorServiceRequest\032$.hbase.pb.Coprocesso" + - "rServiceResponse\022d\n\027ExecRegionServerServ" + + "ics\030\t \001(\010\022\024\n\005renew\030\n \001(\010:\005false\"\232\002\n\014Scan" + + "Response\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nsc" + + "anner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003" + + "ttl\030\004 \001(\r\022!\n\007results\030\005 \003(\0132\020.hbase.pb.Re", + "sult\022\r\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_" + + "result\030\007 \003(\010\022\036\n\026more_results_in_region\030\010" + + " \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014scan_" + + "metrics\030\n \001(\0132\025.hbase.pb.ScanMetrics\"\305\001\n" + + "\024BulkLoadHFileRequest\022)\n\006region\030\001 \002(\0132\031." + + "hbase.pb.RegionSpecifier\022>\n\013family_path\030" + + "\002 \003(\0132).hbase.pb.BulkLoadHFileRequest.Fa" + + "milyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*\n\nFami" + + "lyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025" + + "BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a\n", + "\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014" + + "service_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022" + + "\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServiceRe" + + "sult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.NameBytes" + + "Pair\"v\n\031CoprocessorServiceRequest\022)\n\006reg" + + "ion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022.\n\004" + + "call\030\002 \002(\0132 .hbase.pb.CoprocessorService" + + "Call\"o\n\032CoprocessorServiceResponse\022)\n\006re" + + "gion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022&\n" + + "\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226\001", + "\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(" + + "\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003 \001(\0132" + + "\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(\0132 .h" + + "base.pb.CoprocessorServiceCall\"k\n\014Region" + + "Action\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Region" + + "Specifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(" + + "\0132\020.hbase.pb.Action\"c\n\017RegionLoadStats\022\027" + + "\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy" + + "\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001(\005:\0010" + + "\"\332\001\n\021ResultOrException\022\r\n\005index\030\001 \001(\r\022 \n", + "\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\texcep" + + "tion\030\003 \001(\0132\027.hbase.pb.NameBytesPair\022:\n\016s" + + "ervice_result\030\004 \001(\0132\".hbase.pb.Coprocess" + + "orServiceResult\022,\n\tloadStats\030\005 \001(\0132\031.hba" + + "se.pb.RegionLoadStats\"x\n\022RegionActionRes" + + "ult\0226\n\021resultOrException\030\001 \003(\0132\033.hbase.p" + + "b.ResultOrException\022*\n\texception\030\002 \001(\0132\027" + + ".hbase.pb.NameBytesPair\"x\n\014MultiRequest\022" + + ",\n\014regionAction\030\001 \003(\0132\026.hbase.pb.RegionA" + + "ction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003", + " \001(\0132\023.hbase.pb.Condition\"\\\n\rMultiRespon" + + "se\0228\n\022regionActionResult\030\001 \003(\0132\034.hbase.p" + + "b.RegionActionResult\022\021\n\tprocessed\030\002 \001(\010*" + + "\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\001" + + "2\203\004\n\rClientService\0222\n\003Get\022\024.hbase.pb.Get" + + "Request\032\025.hbase.pb.GetResponse\022;\n\006Mutate" + + "\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.Mut" + + "ateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReque" + + "st\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoadHF" + + "ile\022\036.hbase.pb.BulkLoadHFileRequest\032\037.hb", + "ase.pb.BulkLoadHFileResponse\022X\n\013ExecServ" + "ice\022#.hbase.pb.CoprocessorServiceRequest" + - "\032$.hbase.pb.CoprocessorServiceResponse\0228" + - "\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbase." + - "pb.MultiResponseBB\n*org.apache.hadoop.hb" + - "ase.protobuf.generatedB\014ClientProtosH\001\210\001" + - "\001\240\001\001" + "\032$.hbase.pb.CoprocessorServiceResponse\022d" + + "\n\027ExecRegionServerService\022#.hbase.pb.Cop" + + "rocessorServiceRequest\032$.hbase.pb.Coproc" + + "essorServiceResponse\0228\n\005Multi\022\026.hbase.pb" + + ".MultiRequest\032\027.hbase.pb.MultiResponseBB" + + "\n*org.apache.hadoop.hbase.protobuf.gener" + + "atedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -34454,7 +34544,7 @@ public final class ClientProtos { internal_static_hbase_pb_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", "Renew", }); internal_static_hbase_pb_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_hbase_pb_ScanResponse_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 339b98b7a7a..1e48ef09f58 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -282,6 +282,7 @@ message ScanRequest { optional bool client_handles_partials = 7; optional bool client_handles_heartbeats = 8; optional bool track_scan_metrics = 9; + optional bool renew = 10 [default = false]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2cdf46dbcfd..325f7bc5169 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2385,6 +2385,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, scannerName = String.valueOf(scannerId); ttl = this.scannerLeaseTimeoutPeriod; } + if (request.hasRenew() && request.getRenew()) { + lease = regionServer.leases.removeLease(scannerName); + if (lease != null && scanners.containsKey(scannerName)) { + regionServer.leases.addLease(lease); + } + return builder.build(); + } quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index bb2784d87ce..a0a8747bb9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -487,29 +487,4 @@ public class TestFromClientSide3 { assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES)); table.close(); } - - @Test - public void testLeaseRenewal() throws Exception { - HTable table = TEST_UTIL.createTable( - Bytes.toBytes("testLeaseRenewal"), FAMILY); - Put p = new Put(ROW_BYTES); - p.add(FAMILY, COL_QUAL, VAL_BYTES); - table.put(p); - p = new Put(ANOTHERROW); - p.add(FAMILY, COL_QUAL, VAL_BYTES); - table.put(p); - Scan s = new Scan(); - s.setCaching(1); - ResultScanner rs = table.getScanner(s); - // make sure that calling renewLease does not impact the scan results - assertTrue(((AbstractClientScanner)rs).renewLease()); - assertTrue(Arrays.equals(rs.next().getRow(), ANOTHERROW)); - assertTrue(((AbstractClientScanner)rs).renewLease()); - assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES)); - assertTrue(((AbstractClientScanner)rs).renewLease()); - assertNull(rs.next()); - assertFalse(((AbstractClientScanner)rs).renewLease()); - rs.close(); - table.close(); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java new file mode 100644 index 00000000000..fd9c9bb1236 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java @@ -0,0 +1,125 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestLeaseRenewal { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow"); + private final static byte[] COL_QUAL = Bytes.toBytes("f1"); + private final static byte[] VAL_BYTES = Bytes.toBytes("v1"); + private final static byte[] ROW_BYTES = Bytes.toBytes("r1"); + private final static int leaseTimeout = + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 4; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + leaseTimeout); + TEST_UTIL.startMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + for (HTableDescriptor htd : TEST_UTIL.getHBaseAdmin().listTables()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + TEST_UTIL.deleteTable(htd.getTableName()); + } + } + + @Test + public void testLeaseRenewal() throws Exception { + HTable table = TEST_UTIL.createTable( + TableName.valueOf("testLeaseRenewal"), FAMILY); + Put p = new Put(ROW_BYTES); + p.addColumn(FAMILY, COL_QUAL, VAL_BYTES); + table.put(p); + p = new Put(ANOTHERROW); + p.addColumn(FAMILY, COL_QUAL, VAL_BYTES); + table.put(p); + Scan s = new Scan(); + s.setCaching(1); + ResultScanner rs = table.getScanner(s); + // make sure that calling renewLease does not impact the scan results + assertTrue(((AbstractClientScanner)rs).renewLease()); + assertTrue(Arrays.equals(rs.next().getRow(), ANOTHERROW)); + // renew the lease a few times, long enough to be sure + // the lease would have expired otherwise + Thread.sleep(leaseTimeout/2); + assertTrue(((AbstractClientScanner)rs).renewLease()); + Thread.sleep(leaseTimeout/2); + assertTrue(((AbstractClientScanner)rs).renewLease()); + Thread.sleep(leaseTimeout/2); + assertTrue(((AbstractClientScanner)rs).renewLease()); + // make sure we haven't advanced the scanner + assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES)); + assertTrue(((AbstractClientScanner)rs).renewLease()); + // make sure scanner is exhausted now + assertNull(rs.next()); + // renewLease should return false now + assertFalse(((AbstractClientScanner)rs).renewLease()); + rs.close(); + table.close(); + } +}