HBASE-5974 Scanner retry behavior with RPC timeout on next() seems incorrect
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1402214 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bd7c0c96f9
commit
c9073c4631
|
@ -0,0 +1,38 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown by a RegionServer while doing next() calls on a ResultScanner. Both client and server
|
||||||
|
* maintain a nextCallSeq and if they do not match, RS will throw this exception.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class OutOfOrderScannerNextException extends DoNotRetryIOException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 4595751007554273567L;
|
||||||
|
|
||||||
|
public OutOfOrderScannerNextException() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public OutOfOrderScannerNextException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
|
import org.apache.hadoop.hbase.OutOfOrderScannerNextException;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
|
@ -295,8 +296,9 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Throwable cause = e.getCause();
|
Throwable cause = e.getCause();
|
||||||
if (cause == null || (!(cause instanceof NotServingRegionException)
|
if ((cause == null || (!(cause instanceof NotServingRegionException)
|
||||||
&& !(cause instanceof RegionServerStoppedException))) {
|
&& !(cause instanceof RegionServerStoppedException)))
|
||||||
|
&& !(e instanceof OutOfOrderScannerNextException)) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,8 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
|
|
||||||
// indicate if it is a remote server call
|
// indicate if it is a remote server call
|
||||||
private boolean isRegionServerRemote = true;
|
private boolean isRegionServerRemote = true;
|
||||||
|
private long nextCallSeq = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param connection which connection
|
* @param connection which connection
|
||||||
* @param tableName table callable is on
|
* @param tableName table callable is on
|
||||||
|
@ -138,9 +139,19 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
try {
|
try {
|
||||||
incRPCcallsMetrics();
|
incRPCcallsMetrics();
|
||||||
ScanRequest request =
|
ScanRequest request =
|
||||||
RequestConverter.buildScanRequest(scannerId, caching, false);
|
RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
|
||||||
try {
|
try {
|
||||||
ScanResponse response = server.scan(null, request);
|
ScanResponse response = server.scan(null, request);
|
||||||
|
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
|
||||||
|
// from client to server will increment this number in both sides. Client passes this
|
||||||
|
// number along with the request and at RS side both the incoming nextCallSeq and its
|
||||||
|
// nextCallSeq will be matched. In case of a timeout this increment at the client side
|
||||||
|
// should not happen. If at the server side fetching of next batch of data was over,
|
||||||
|
// there will be mismatch in the nextCallSeq number. Server will throw
|
||||||
|
// OutOfOrderScannerNextException and then client will reopen the scanner with startrow
|
||||||
|
// as the last successfully retrieved row.
|
||||||
|
// See HBASE-5974
|
||||||
|
nextCallSeq++;
|
||||||
long timestamp = System.currentTimeMillis();
|
long timestamp = System.currentTimeMillis();
|
||||||
rrs = ResponseConverter.getResults(response);
|
rrs = ResponseConverter.getResults(response);
|
||||||
if (logScannerActivity) {
|
if (logScannerActivity) {
|
||||||
|
|
|
@ -416,6 +416,25 @@ public final class RequestConverter {
|
||||||
builder.setScannerId(scannerId);
|
builder.setScannerId(scannerId);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a protocol buffer ScanRequest for a scanner id
|
||||||
|
*
|
||||||
|
* @param scannerId
|
||||||
|
* @param numberOfRows
|
||||||
|
* @param closeScanner
|
||||||
|
* @param nextCallSeq
|
||||||
|
* @return a scan request
|
||||||
|
*/
|
||||||
|
public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
|
||||||
|
final boolean closeScanner, final long nextCallSeq) {
|
||||||
|
ScanRequest.Builder builder = ScanRequest.newBuilder();
|
||||||
|
builder.setNumberOfRows(numberOfRows);
|
||||||
|
builder.setCloseScanner(closeScanner);
|
||||||
|
builder.setScannerId(scannerId);
|
||||||
|
builder.setNextCallSeq(nextCallSeq);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a protocol buffer LockRowRequest
|
* Create a protocol buffer LockRowRequest
|
||||||
|
|
|
@ -10485,6 +10485,10 @@ public final class ClientProtos {
|
||||||
// optional bool closeScanner = 5;
|
// optional bool closeScanner = 5;
|
||||||
boolean hasCloseScanner();
|
boolean hasCloseScanner();
|
||||||
boolean getCloseScanner();
|
boolean getCloseScanner();
|
||||||
|
|
||||||
|
// optional uint64 nextCallSeq = 6;
|
||||||
|
boolean hasNextCallSeq();
|
||||||
|
long getNextCallSeq();
|
||||||
}
|
}
|
||||||
public static final class ScanRequest extends
|
public static final class ScanRequest extends
|
||||||
com.google.protobuf.GeneratedMessage
|
com.google.protobuf.GeneratedMessage
|
||||||
|
@ -10571,12 +10575,23 @@ public final class ClientProtos {
|
||||||
return closeScanner_;
|
return closeScanner_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional uint64 nextCallSeq = 6;
|
||||||
|
public static final int NEXTCALLSEQ_FIELD_NUMBER = 6;
|
||||||
|
private long nextCallSeq_;
|
||||||
|
public boolean hasNextCallSeq() {
|
||||||
|
return ((bitField0_ & 0x00000020) == 0x00000020);
|
||||||
|
}
|
||||||
|
public long getNextCallSeq() {
|
||||||
|
return nextCallSeq_;
|
||||||
|
}
|
||||||
|
|
||||||
private void initFields() {
|
private void initFields() {
|
||||||
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
|
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
|
||||||
scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
|
scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
|
||||||
scannerId_ = 0L;
|
scannerId_ = 0L;
|
||||||
numberOfRows_ = 0;
|
numberOfRows_ = 0;
|
||||||
closeScanner_ = false;
|
closeScanner_ = false;
|
||||||
|
nextCallSeq_ = 0L;
|
||||||
}
|
}
|
||||||
private byte memoizedIsInitialized = -1;
|
private byte memoizedIsInitialized = -1;
|
||||||
public final boolean isInitialized() {
|
public final boolean isInitialized() {
|
||||||
|
@ -10617,6 +10632,9 @@ public final class ClientProtos {
|
||||||
if (((bitField0_ & 0x00000010) == 0x00000010)) {
|
if (((bitField0_ & 0x00000010) == 0x00000010)) {
|
||||||
output.writeBool(5, closeScanner_);
|
output.writeBool(5, closeScanner_);
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000020) == 0x00000020)) {
|
||||||
|
output.writeUInt64(6, nextCallSeq_);
|
||||||
|
}
|
||||||
getUnknownFields().writeTo(output);
|
getUnknownFields().writeTo(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10646,6 +10664,10 @@ public final class ClientProtos {
|
||||||
size += com.google.protobuf.CodedOutputStream
|
size += com.google.protobuf.CodedOutputStream
|
||||||
.computeBoolSize(5, closeScanner_);
|
.computeBoolSize(5, closeScanner_);
|
||||||
}
|
}
|
||||||
|
if (((bitField0_ & 0x00000020) == 0x00000020)) {
|
||||||
|
size += com.google.protobuf.CodedOutputStream
|
||||||
|
.computeUInt64Size(6, nextCallSeq_);
|
||||||
|
}
|
||||||
size += getUnknownFields().getSerializedSize();
|
size += getUnknownFields().getSerializedSize();
|
||||||
memoizedSerializedSize = size;
|
memoizedSerializedSize = size;
|
||||||
return size;
|
return size;
|
||||||
|
@ -10694,6 +10716,11 @@ public final class ClientProtos {
|
||||||
result = result && (getCloseScanner()
|
result = result && (getCloseScanner()
|
||||||
== other.getCloseScanner());
|
== other.getCloseScanner());
|
||||||
}
|
}
|
||||||
|
result = result && (hasNextCallSeq() == other.hasNextCallSeq());
|
||||||
|
if (hasNextCallSeq()) {
|
||||||
|
result = result && (getNextCallSeq()
|
||||||
|
== other.getNextCallSeq());
|
||||||
|
}
|
||||||
result = result &&
|
result = result &&
|
||||||
getUnknownFields().equals(other.getUnknownFields());
|
getUnknownFields().equals(other.getUnknownFields());
|
||||||
return result;
|
return result;
|
||||||
|
@ -10723,6 +10750,10 @@ public final class ClientProtos {
|
||||||
hash = (37 * hash) + CLOSESCANNER_FIELD_NUMBER;
|
hash = (37 * hash) + CLOSESCANNER_FIELD_NUMBER;
|
||||||
hash = (53 * hash) + hashBoolean(getCloseScanner());
|
hash = (53 * hash) + hashBoolean(getCloseScanner());
|
||||||
}
|
}
|
||||||
|
if (hasNextCallSeq()) {
|
||||||
|
hash = (37 * hash) + NEXTCALLSEQ_FIELD_NUMBER;
|
||||||
|
hash = (53 * hash) + hashLong(getNextCallSeq());
|
||||||
|
}
|
||||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||||
return hash;
|
return hash;
|
||||||
}
|
}
|
||||||
|
@ -10859,6 +10890,8 @@ public final class ClientProtos {
|
||||||
bitField0_ = (bitField0_ & ~0x00000008);
|
bitField0_ = (bitField0_ & ~0x00000008);
|
||||||
closeScanner_ = false;
|
closeScanner_ = false;
|
||||||
bitField0_ = (bitField0_ & ~0x00000010);
|
bitField0_ = (bitField0_ & ~0x00000010);
|
||||||
|
nextCallSeq_ = 0L;
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000020);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10925,6 +10958,10 @@ public final class ClientProtos {
|
||||||
to_bitField0_ |= 0x00000010;
|
to_bitField0_ |= 0x00000010;
|
||||||
}
|
}
|
||||||
result.closeScanner_ = closeScanner_;
|
result.closeScanner_ = closeScanner_;
|
||||||
|
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
|
||||||
|
to_bitField0_ |= 0x00000020;
|
||||||
|
}
|
||||||
|
result.nextCallSeq_ = nextCallSeq_;
|
||||||
result.bitField0_ = to_bitField0_;
|
result.bitField0_ = to_bitField0_;
|
||||||
onBuilt();
|
onBuilt();
|
||||||
return result;
|
return result;
|
||||||
|
@ -10956,6 +10993,9 @@ public final class ClientProtos {
|
||||||
if (other.hasCloseScanner()) {
|
if (other.hasCloseScanner()) {
|
||||||
setCloseScanner(other.getCloseScanner());
|
setCloseScanner(other.getCloseScanner());
|
||||||
}
|
}
|
||||||
|
if (other.hasNextCallSeq()) {
|
||||||
|
setNextCallSeq(other.getNextCallSeq());
|
||||||
|
}
|
||||||
this.mergeUnknownFields(other.getUnknownFields());
|
this.mergeUnknownFields(other.getUnknownFields());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -11032,6 +11072,11 @@ public final class ClientProtos {
|
||||||
closeScanner_ = input.readBool();
|
closeScanner_ = input.readBool();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case 48: {
|
||||||
|
bitField0_ |= 0x00000020;
|
||||||
|
nextCallSeq_ = input.readUInt64();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -11281,6 +11326,27 @@ public final class ClientProtos {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// optional uint64 nextCallSeq = 6;
|
||||||
|
private long nextCallSeq_ ;
|
||||||
|
public boolean hasNextCallSeq() {
|
||||||
|
return ((bitField0_ & 0x00000020) == 0x00000020);
|
||||||
|
}
|
||||||
|
public long getNextCallSeq() {
|
||||||
|
return nextCallSeq_;
|
||||||
|
}
|
||||||
|
public Builder setNextCallSeq(long value) {
|
||||||
|
bitField0_ |= 0x00000020;
|
||||||
|
nextCallSeq_ = value;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
public Builder clearNextCallSeq() {
|
||||||
|
bitField0_ = (bitField0_ & ~0x00000020);
|
||||||
|
nextCallSeq_ = 0L;
|
||||||
|
onChanged();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
// @@protoc_insertion_point(builder_scope:ScanRequest)
|
// @@protoc_insertion_point(builder_scope:ScanRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23715,59 +23781,60 @@ public final class ClientProtos {
|
||||||
"Range\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVersions\030\007",
|
"Range\030\006 \001(\0132\n.TimeRange\022\026\n\013maxVersions\030\007",
|
||||||
" \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true\022\021\n\tba" +
|
" \001(\r:\0011\022\031\n\013cacheBlocks\030\010 \001(\010:\004true\022\021\n\tba" +
|
||||||
"tchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001(\004\022\022\n\n" +
|
"tchSize\030\t \001(\r\022\025\n\rmaxResultSize\030\n \001(\004\022\022\n\n" +
|
||||||
"storeLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 \001(\r\"\203\001" +
|
"storeLimit\030\013 \001(\r\022\023\n\013storeOffset\030\014 \001(\r\"\230\001" +
|
||||||
"\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.RegionSp" +
|
"\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.RegionSp" +
|
||||||
"ecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscannerI" +
|
"ecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscannerI" +
|
||||||
"d\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014closeSc" +
|
"d\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014closeSc" +
|
||||||
"anner\030\005 \001(\010\"\\\n\014ScanResponse\022\027\n\006result\030\001 " +
|
"anner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\\n\014Scan" +
|
||||||
"\003(\0132\007.Result\022\021\n\tscannerId\030\002 \001(\004\022\023\n\013moreR" +
|
"Response\022\027\n\006result\030\001 \003(\0132\007.Result\022\021\n\tsca" +
|
||||||
"esults\030\003 \001(\010\022\013\n\003ttl\030\004 \001(\r\"?\n\016LockRowRequ" +
|
"nnerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003ttl" +
|
||||||
"est\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022\013\n",
|
"\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region\030\001 \002(\013",
|
||||||
"\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n\006lockId\030" +
|
"2\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n\017LockR" +
|
||||||
"\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowRequest\022 " +
|
"owResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"" +
|
||||||
"\n\006region\030\001 \002(\0132\020.RegionSpecifier\022\016\n\006lock" +
|
"D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(\0132\020.Re" +
|
||||||
"Id\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\260\001\n\024BulkLo" +
|
"gionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021UnlockR" +
|
||||||
"adHFileRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
|
"owResponse\"\260\001\n\024BulkLoadHFileRequest\022 \n\006r" +
|
||||||
"pecifier\0224\n\nfamilyPath\030\002 \003(\0132 .BulkLoadH" +
|
"egion\030\001 \002(\0132\020.RegionSpecifier\0224\n\nfamilyP" +
|
||||||
"FileRequest.FamilyPath\022\024\n\014assignSeqNum\030\003" +
|
"ath\030\002 \003(\0132 .BulkLoadHFileRequest.FamilyP" +
|
||||||
" \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004pa" +
|
"ath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamilyPath\022" +
|
||||||
"th\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loa" +
|
"\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoa" +
|
||||||
"ded\030\001 \002(\010\"\203\001\n\004Exec\022\013\n\003row\030\001 \002(\014\022\024\n\014proto",
|
"dHFileResponse\022\016\n\006loaded\030\001 \002(\010\"\203\001\n\004Exec\022",
|
||||||
"colName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022!\n\010pro" +
|
"\013\n\003row\030\001 \002(\014\022\024\n\014protocolName\030\002 \002(\t\022\022\n\nme" +
|
||||||
"perty\030\004 \003(\0132\017.NameStringPair\022!\n\tparamete" +
|
"thodName\030\003 \002(\t\022!\n\010property\030\004 \003(\0132\017.NameS" +
|
||||||
"r\030\005 \003(\0132\016.NameBytesPair\"O\n\026ExecCoprocess" +
|
"tringPair\022!\n\tparameter\030\005 \003(\0132\016.NameBytes" +
|
||||||
"orRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecif" +
|
"Pair\"O\n\026ExecCoprocessorRequest\022 \n\006region" +
|
||||||
"ier\022\023\n\004call\030\002 \002(\0132\005.Exec\"8\n\027ExecCoproces" +
|
"\030\001 \002(\0132\020.RegionSpecifier\022\023\n\004call\030\002 \002(\0132\005" +
|
||||||
"sorResponse\022\035\n\005value\030\001 \002(\0132\016.NameBytesPa" +
|
".Exec\"8\n\027ExecCoprocessorResponse\022\035\n\005valu" +
|
||||||
"ir\"_\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(" +
|
"e\030\001 \002(\0132\016.NameBytesPair\"_\n\026CoprocessorSe" +
|
||||||
"\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002" +
|
"rviceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 " +
|
||||||
"(\t\022\017\n\007request\030\004 \002(\014\"d\n\031CoprocessorServic" +
|
"\002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014" +
|
||||||
"eRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifi",
|
"\"d\n\031CoprocessorServiceRequest\022 \n\006region\030",
|
||||||
"er\022%\n\004call\030\002 \002(\0132\027.CoprocessorServiceCal" +
|
"\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027." +
|
||||||
"l\"]\n\032CoprocessorServiceResponse\022 \n\006regio" +
|
"CoprocessorServiceCall\"]\n\032CoprocessorSer" +
|
||||||
"n\030\001 \002(\0132\020.RegionSpecifier\022\035\n\005value\030\002 \002(\013" +
|
"viceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSpe" +
|
||||||
"2\016.NameBytesPair\"N\n\013MultiAction\022\027\n\006mutat" +
|
"cifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"N\n" +
|
||||||
"e\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\022\023\n\004e" +
|
"\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n" +
|
||||||
"xec\030\003 \001(\0132\005.Exec\"P\n\014ActionResult\022\035\n\005valu" +
|
"\003get\030\002 \001(\0132\004.Get\022\023\n\004exec\030\003 \001(\0132\005.Exec\"P\n" +
|
||||||
"e\030\001 \001(\0132\016.NameBytesPair\022!\n\texception\030\002 \001" +
|
"\014ActionResult\022\035\n\005value\030\001 \001(\0132\016.NameBytes" +
|
||||||
"(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n\006re" +
|
"Pair\022!\n\texception\030\002 \001(\0132\016.NameBytesPair\"" +
|
||||||
"gion\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action\030\002" +
|
"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020.Region" +
|
||||||
" \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\rMu",
|
"Specifier\022\034\n\006action\030\002 \003(\0132\014.MultiAction\022",
|
||||||
"ltiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionResu" +
|
"\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035\n\006resu" +
|
||||||
"lt2\331\003\n\rClientService\022 \n\003get\022\013.GetRequest" +
|
"lt\030\001 \003(\0132\r.ActionResult2\331\003\n\rClientServic" +
|
||||||
"\032\014.GetResponse\022)\n\006mutate\022\016.MutateRequest" +
|
"e\022 \n\003get\022\013.GetRequest\032\014.GetResponse\022)\n\006m" +
|
||||||
"\032\017.MutateResponse\022#\n\004scan\022\014.ScanRequest\032" +
|
"utate\022\016.MutateRequest\032\017.MutateResponse\022#" +
|
||||||
"\r.ScanResponse\022,\n\007lockRow\022\017.LockRowReque" +
|
"\n\004scan\022\014.ScanRequest\032\r.ScanResponse\022,\n\007l" +
|
||||||
"st\032\020.LockRowResponse\0222\n\tunlockRow\022\021.Unlo" +
|
"ockRow\022\017.LockRowRequest\032\020.LockRowRespons" +
|
||||||
"ckRowRequest\032\022.UnlockRowResponse\022>\n\rbulk" +
|
"e\0222\n\tunlockRow\022\021.UnlockRowRequest\032\022.Unlo" +
|
||||||
"LoadHFile\022\025.BulkLoadHFileRequest\032\026.BulkL" +
|
"ckRowResponse\022>\n\rbulkLoadHFile\022\025.BulkLoa" +
|
||||||
"oadHFileResponse\022D\n\017execCoprocessor\022\027.Ex" +
|
"dHFileRequest\032\026.BulkLoadHFileResponse\022D\n" +
|
||||||
"ecCoprocessorRequest\032\030.ExecCoprocessorRe",
|
"\017execCoprocessor\022\027.ExecCoprocessorReques",
|
||||||
"sponse\022F\n\013execService\022\032.CoprocessorServi" +
|
"t\032\030.ExecCoprocessorResponse\022F\n\013execServi" +
|
||||||
"ceRequest\032\033.CoprocessorServiceResponse\022&" +
|
"ce\022\032.CoprocessorServiceRequest\032\033.Coproce" +
|
||||||
"\n\005multi\022\r.MultiRequest\032\016.MultiResponseBB" +
|
"ssorServiceResponse\022&\n\005multi\022\r.MultiRequ" +
|
||||||
"\n*org.apache.hadoop.hbase.protobuf.gener" +
|
"est\032\016.MultiResponseBB\n*org.apache.hadoop" +
|
||||||
"atedB\014ClientProtosH\001\210\001\001\240\001\001"
|
".hbase.protobuf.generatedB\014ClientProtosH" +
|
||||||
|
"\001\210\001\001\240\001\001"
|
||||||
};
|
};
|
||||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||||
|
@ -23875,7 +23942,7 @@ public final class ClientProtos {
|
||||||
internal_static_ScanRequest_fieldAccessorTable = new
|
internal_static_ScanRequest_fieldAccessorTable = new
|
||||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||||
internal_static_ScanRequest_descriptor,
|
internal_static_ScanRequest_descriptor,
|
||||||
new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", },
|
new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", },
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.class,
|
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.class,
|
||||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.Builder.class);
|
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest.Builder.class);
|
||||||
internal_static_ScanResponse_descriptor =
|
internal_static_ScanResponse_descriptor =
|
||||||
|
|
|
@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
|
import org.apache.hadoop.hbase.OutOfOrderScannerNextException;
|
||||||
import org.apache.hadoop.hbase.RegionMovedException;
|
import org.apache.hadoop.hbase.RegionMovedException;
|
||||||
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
|
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
|
@ -280,8 +281,8 @@ public class HRegionServer implements ClientProtocol,
|
||||||
// Compactions
|
// Compactions
|
||||||
public CompactSplitThread compactSplitThread;
|
public CompactSplitThread compactSplitThread;
|
||||||
|
|
||||||
final ConcurrentHashMap<String, RegionScanner> scanners =
|
final ConcurrentHashMap<String, RegionScannerHolder> scanners =
|
||||||
new ConcurrentHashMap<String, RegionScanner>();
|
new ConcurrentHashMap<String, RegionScannerHolder>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map of regions currently being served by this region server. Key is the
|
* Map of regions currently being served by this region server. Key is the
|
||||||
|
@ -560,7 +561,11 @@ public class HRegionServer implements ClientProtocol,
|
||||||
|
|
||||||
RegionScanner getScanner(long scannerId) {
|
RegionScanner getScanner(long scannerId) {
|
||||||
String scannerIdString = Long.toString(scannerId);
|
String scannerIdString = Long.toString(scannerId);
|
||||||
return scanners.get(scannerIdString);
|
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
|
||||||
|
if (scannerHolder != null) {
|
||||||
|
return scannerHolder.s;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1140,9 +1145,9 @@ public class HRegionServer implements ClientProtocol,
|
||||||
private void closeAllScanners() {
|
private void closeAllScanners() {
|
||||||
// Close any outstanding scanners. Means they'll get an UnknownScanner
|
// Close any outstanding scanners. Means they'll get an UnknownScanner
|
||||||
// exception next time they come in.
|
// exception next time they come in.
|
||||||
for (Map.Entry<String, RegionScanner> e : this.scanners.entrySet()) {
|
for (Map.Entry<String, RegionScannerHolder> e : this.scanners.entrySet()) {
|
||||||
try {
|
try {
|
||||||
e.getValue().close();
|
e.getValue().s.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Closing scanner " + e.getKey(), ioe);
|
LOG.warn("Closing scanner " + e.getKey(), ioe);
|
||||||
}
|
}
|
||||||
|
@ -2536,8 +2541,9 @@ public class HRegionServer implements ClientProtocol,
|
||||||
}
|
}
|
||||||
|
|
||||||
public void leaseExpired() {
|
public void leaseExpired() {
|
||||||
RegionScanner s = scanners.remove(this.scannerName);
|
RegionScannerHolder rsh = scanners.remove(this.scannerName);
|
||||||
if (s != null) {
|
if (rsh != null) {
|
||||||
|
RegionScanner s = rsh.s;
|
||||||
LOG.info("Scanner " + this.scannerName + " lease expired on region "
|
LOG.info("Scanner " + this.scannerName + " lease expired on region "
|
||||||
+ s.getRegionInfo().getRegionNameAsString());
|
+ s.getRegionInfo().getRegionNameAsString());
|
||||||
try {
|
try {
|
||||||
|
@ -2841,7 +2847,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
scannerId = rand.nextLong();
|
scannerId = rand.nextLong();
|
||||||
if (scannerId == -1) continue;
|
if (scannerId == -1) continue;
|
||||||
String scannerName = String.valueOf(scannerId);
|
String scannerName = String.valueOf(scannerId);
|
||||||
RegionScanner existing = scanners.putIfAbsent(scannerName, s);
|
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
|
||||||
if (existing == null) {
|
if (existing == null) {
|
||||||
this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
|
this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
|
||||||
new ScannerListener(scannerName));
|
new ScannerListener(scannerName));
|
||||||
|
@ -3073,6 +3079,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
int ttl = 0;
|
int ttl = 0;
|
||||||
HRegion region = null;
|
HRegion region = null;
|
||||||
RegionScanner scanner = null;
|
RegionScanner scanner = null;
|
||||||
|
RegionScannerHolder rsh = null;
|
||||||
boolean moreResults = true;
|
boolean moreResults = true;
|
||||||
boolean closeScanner = false;
|
boolean closeScanner = false;
|
||||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||||
|
@ -3084,11 +3091,12 @@ public class HRegionServer implements ClientProtocol,
|
||||||
rows = request.getNumberOfRows();
|
rows = request.getNumberOfRows();
|
||||||
}
|
}
|
||||||
if (request.hasScannerId()) {
|
if (request.hasScannerId()) {
|
||||||
scanner = scanners.get(scannerName);
|
rsh = scanners.get(scannerName);
|
||||||
if (scanner == null) {
|
if (rsh == null) {
|
||||||
throw new UnknownScannerException(
|
throw new UnknownScannerException(
|
||||||
"Name: " + scannerName + ", already closed?");
|
"Name: " + scannerName + ", already closed?");
|
||||||
}
|
}
|
||||||
|
scanner = rsh.s;
|
||||||
region = getRegion(scanner.getRegionInfo().getRegionName());
|
region = getRegion(scanner.getRegionInfo().getRegionName());
|
||||||
} else {
|
} else {
|
||||||
region = getRegion(request.getRegion());
|
region = getRegion(request.getRegion());
|
||||||
|
@ -3110,6 +3118,22 @@ public class HRegionServer implements ClientProtocol,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rows > 0) {
|
if (rows > 0) {
|
||||||
|
// if nextCallSeq does not match throw Exception straight away. This needs to be
|
||||||
|
// performed even before checking of Lease.
|
||||||
|
// See HBASE-5974
|
||||||
|
if (request.hasNextCallSeq()) {
|
||||||
|
if (rsh == null) {
|
||||||
|
rsh = scanners.get(scannerName);
|
||||||
|
}
|
||||||
|
if (rsh != null) {
|
||||||
|
if (request.getNextCallSeq() != rsh.nextCallSeq) {
|
||||||
|
throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
|
||||||
|
+ " But the nextCallSeq got from client: " + request.getNextCallSeq());
|
||||||
|
}
|
||||||
|
// Increment the nextCallSeq value which is the next expected from client.
|
||||||
|
rsh.nextCallSeq++;
|
||||||
|
}
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
// Remove lease while its being processed in server; protects against case
|
// Remove lease while its being processed in server; protects against case
|
||||||
// where processing of request takes > lease expiration time.
|
// where processing of request takes > lease expiration time.
|
||||||
|
@ -3193,8 +3217,9 @@ public class HRegionServer implements ClientProtocol,
|
||||||
return builder.build(); // bypass
|
return builder.build(); // bypass
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
scanner = scanners.remove(scannerName);
|
rsh = scanners.remove(scannerName);
|
||||||
if (scanner != null) {
|
if (rsh != null) {
|
||||||
|
scanner = rsh.s;
|
||||||
scanner.close();
|
scanner.close();
|
||||||
leases.cancelLease(scannerName);
|
leases.cancelLease(scannerName);
|
||||||
if (region != null && region.getCoprocessorHost() != null) {
|
if (region != null && region.getCoprocessorHost() != null) {
|
||||||
|
@ -4135,4 +4160,16 @@ public class HRegionServer implements ClientProtocol,
|
||||||
private String getMyEphemeralNodePath() {
|
private String getMyEphemeralNodePath() {
|
||||||
return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
|
return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holder class which holds the RegionScanner and nextCallSeq together.
|
||||||
|
*/
|
||||||
|
private static class RegionScannerHolder {
|
||||||
|
private RegionScanner s;
|
||||||
|
private long nextCallSeq = 0L;
|
||||||
|
|
||||||
|
public RegionScannerHolder(RegionScanner s) {
|
||||||
|
this.s = s;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase.util;
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -82,7 +83,9 @@ public class JVMClusterUtil {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HRegionServer server;
|
HRegionServer server;
|
||||||
try {
|
try {
|
||||||
server = hrsc.getConstructor(Configuration.class).newInstance(c);
|
Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class);
|
||||||
|
ctor.setAccessible(true);
|
||||||
|
server = ctor.newInstance(c);
|
||||||
} catch (InvocationTargetException ite) {
|
} catch (InvocationTargetException ite) {
|
||||||
Throwable target = ite.getTargetException();
|
Throwable target = ite.getTargetException();
|
||||||
throw new RuntimeException("Failed construction of RegionServer: " +
|
throw new RuntimeException("Failed construction of RegionServer: " +
|
||||||
|
|
|
@ -209,6 +209,7 @@ message ScanRequest {
|
||||||
optional uint64 scannerId = 3;
|
optional uint64 scannerId = 3;
|
||||||
optional uint32 numberOfRows = 4;
|
optional uint32 numberOfRows = 4;
|
||||||
optional bool closeScanner = 5;
|
optional bool closeScanner = 5;
|
||||||
|
optional uint64 nextCallSeq = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,129 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client side and
|
||||||
|
* getting retried. This scenario should not result in some data being skipped at RS side.
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestClientScannerRPCTimeout {
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||||
|
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
|
||||||
|
private static final byte[] VALUE = Bytes.toBytes("testValue");
|
||||||
|
private static final int rpcTimeout = 2 * 1000;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
|
||||||
|
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScannerNextRPCTimesout() throws Exception {
|
||||||
|
final byte[] TABLE_NAME = Bytes.toBytes("testScannerNextRPCTimesout");
|
||||||
|
HTable ht = TEST_UTIL.createTable(TABLE_NAME, FAMILY);
|
||||||
|
byte[] r1 = Bytes.toBytes("row-1");
|
||||||
|
byte[] r2 = Bytes.toBytes("row-2");
|
||||||
|
byte[] r3 = Bytes.toBytes("row-3");
|
||||||
|
putToTable(ht, r1);
|
||||||
|
putToTable(ht, r2);
|
||||||
|
putToTable(ht, r3);
|
||||||
|
RegionServerWithScanTimeout.seqNoToSleepOn = 1;
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setCaching(1);
|
||||||
|
ResultScanner scanner = ht.getScanner(scan);
|
||||||
|
Result result = scanner.next();
|
||||||
|
assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
|
||||||
|
long t1 = System.currentTimeMillis();
|
||||||
|
result = scanner.next();
|
||||||
|
assertTrue((System.currentTimeMillis() - t1) > rpcTimeout);
|
||||||
|
assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
|
||||||
|
RegionServerWithScanTimeout.seqNoToSleepOn = -1;// No need of sleep
|
||||||
|
result = scanner.next();
|
||||||
|
assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void putToTable(HTable ht, byte[] rowkey) throws IOException {
|
||||||
|
Put put = new Put(rowkey);
|
||||||
|
put.add(FAMILY, QUALIFIER, VALUE);
|
||||||
|
ht.put(put);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
|
||||||
|
private long tableScannerId;
|
||||||
|
private boolean slept;
|
||||||
|
private static long seqNoToSleepOn = -1;
|
||||||
|
|
||||||
|
public RegionServerWithScanTimeout(Configuration conf) throws IOException, InterruptedException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScanResponse scan(final RpcController controller, final ScanRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
if (request.hasScannerId()) {
|
||||||
|
if (!slept && this.tableScannerId == request.getScannerId()
|
||||||
|
&& seqNoToSleepOn == request.getNextCallSeq()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(rpcTimeout + 500);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
slept = true;
|
||||||
|
}
|
||||||
|
return super.scan(controller, request);
|
||||||
|
} else {
|
||||||
|
ScanResponse scanRes = super.scan(controller, request);
|
||||||
|
String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
|
||||||
|
if (!regionName.contains("-ROOT-") && !regionName.contains(".META.")) {
|
||||||
|
tableScannerId = scanRes.getScannerId();
|
||||||
|
}
|
||||||
|
return scanRes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue