HBASE-7238 Size based scan metric broken by protobufs (Sergey)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1434558 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ca430a2314
commit
4ef3a7b395
|
@ -11652,6 +11652,10 @@ public final class ClientProtos {
|
|||
// optional uint32 ttl = 4;
|
||||
boolean hasTtl();
|
||||
int getTtl();
|
||||
|
||||
// optional uint64 resultSizeBytes = 5;
|
||||
boolean hasResultSizeBytes();
|
||||
long getResultSizeBytes();
|
||||
}
|
||||
public static final class ScanResponse extends
|
||||
com.google.protobuf.GeneratedMessage
|
||||
|
@ -11733,11 +11737,22 @@ public final class ClientProtos {
|
|||
return ttl_;
|
||||
}
|
||||
|
||||
// optional uint64 resultSizeBytes = 5;
|
||||
public static final int RESULTSIZEBYTES_FIELD_NUMBER = 5;
|
||||
private long resultSizeBytes_;
|
||||
public boolean hasResultSizeBytes() {
|
||||
return ((bitField0_ & 0x00000008) == 0x00000008);
|
||||
}
|
||||
public long getResultSizeBytes() {
|
||||
return resultSizeBytes_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
result_ = java.util.Collections.emptyList();
|
||||
scannerId_ = 0L;
|
||||
moreResults_ = false;
|
||||
ttl_ = 0;
|
||||
resultSizeBytes_ = 0L;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -11769,6 +11784,9 @@ public final class ClientProtos {
|
|||
if (((bitField0_ & 0x00000004) == 0x00000004)) {
|
||||
output.writeUInt32(4, ttl_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||
output.writeUInt64(5, resultSizeBytes_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -11794,6 +11812,10 @@ public final class ClientProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt32Size(4, ttl_);
|
||||
}
|
||||
if (((bitField0_ & 0x00000008) == 0x00000008)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeUInt64Size(5, resultSizeBytes_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -11834,6 +11856,11 @@ public final class ClientProtos {
|
|||
result = result && (getTtl()
|
||||
== other.getTtl());
|
||||
}
|
||||
result = result && (hasResultSizeBytes() == other.hasResultSizeBytes());
|
||||
if (hasResultSizeBytes()) {
|
||||
result = result && (getResultSizeBytes()
|
||||
== other.getResultSizeBytes());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -11859,6 +11886,10 @@ public final class ClientProtos {
|
|||
hash = (37 * hash) + TTL_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getTtl();
|
||||
}
|
||||
if (hasResultSizeBytes()) {
|
||||
hash = (37 * hash) + RESULTSIZEBYTES_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashLong(getResultSizeBytes());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
return hash;
|
||||
}
|
||||
|
@ -11988,6 +12019,8 @@ public final class ClientProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000004);
|
||||
ttl_ = 0;
|
||||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
resultSizeBytes_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -12047,6 +12080,10 @@ public final class ClientProtos {
|
|||
to_bitField0_ |= 0x00000004;
|
||||
}
|
||||
result.ttl_ = ttl_;
|
||||
if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
to_bitField0_ |= 0x00000008;
|
||||
}
|
||||
result.resultSizeBytes_ = resultSizeBytes_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -12098,6 +12135,9 @@ public final class ClientProtos {
|
|||
if (other.hasTtl()) {
|
||||
setTtl(other.getTtl());
|
||||
}
|
||||
if (other.hasResultSizeBytes()) {
|
||||
setResultSizeBytes(other.getResultSizeBytes());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -12156,6 +12196,11 @@ public final class ClientProtos {
|
|||
ttl_ = input.readUInt32();
|
||||
break;
|
||||
}
|
||||
case 40: {
|
||||
bitField0_ |= 0x00000010;
|
||||
resultSizeBytes_ = input.readUInt64();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -12411,6 +12456,27 @@ public final class ClientProtos {
|
|||
return this;
|
||||
}
|
||||
|
||||
// optional uint64 resultSizeBytes = 5;
|
||||
private long resultSizeBytes_ ;
|
||||
public boolean hasResultSizeBytes() {
|
||||
return ((bitField0_ & 0x00000010) == 0x00000010);
|
||||
}
|
||||
public long getResultSizeBytes() {
|
||||
return resultSizeBytes_;
|
||||
}
|
||||
public Builder setResultSizeBytes(long value) {
|
||||
bitField0_ |= 0x00000010;
|
||||
resultSizeBytes_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearResultSizeBytes() {
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
resultSizeBytes_ = 0L;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:ScanResponse)
|
||||
}
|
||||
|
||||
|
@ -21524,46 +21590,47 @@ public final class ClientProtos {
|
|||
"\230\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Region" +
|
||||
"Specifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\021\n\tscanne" +
|
||||
"rId\030\003 \001(\004\022\024\n\014numberOfRows\030\004 \001(\r\022\024\n\014close" +
|
||||
"Scanner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"\\\n\014Sc" +
|
||||
"Scanner\030\005 \001(\010\022\023\n\013nextCallSeq\030\006 \001(\004\"u\n\014Sc" +
|
||||
"anResponse\022\027\n\006result\030\001 \003(\0132\007.Result\022\021\n\ts" +
|
||||
"cannerId\030\002 \001(\004\022\023\n\013moreResults\030\003 \001(\010\022\013\n\003t",
|
||||
"tl\030\004 \001(\r\"?\n\016LockRowRequest\022 \n\006region\030\001 \002" +
|
||||
"(\0132\020.RegionSpecifier\022\013\n\003row\030\002 \003(\014\".\n\017Loc" +
|
||||
"kRowResponse\022\016\n\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(" +
|
||||
"\r\"D\n\020UnlockRowRequest\022 \n\006region\030\001 \002(\0132\020." +
|
||||
"RegionSpecifier\022\016\n\006lockId\030\002 \002(\004\"\023\n\021Unloc" +
|
||||
"kRowResponse\"\260\001\n\024BulkLoadHFileRequest\022 \n" +
|
||||
"\006region\030\001 \002(\0132\020.RegionSpecifier\0224\n\nfamil" +
|
||||
"yPath\030\002 \003(\0132 .BulkLoadHFileRequest.Famil" +
|
||||
"yPath\022\024\n\014assignSeqNum\030\003 \001(\010\032*\n\nFamilyPat" +
|
||||
"h\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkL",
|
||||
"oadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"_\n\026Copr" +
|
||||
"ocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\023\n\013servi" +
|
||||
"ceName\030\002 \002(\t\022\022\n\nmethodName\030\003 \002(\t\022\017\n\007requ" +
|
||||
"est\030\004 \002(\014\"d\n\031CoprocessorServiceRequest\022 " +
|
||||
"\n\006region\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call" +
|
||||
"\030\002 \002(\0132\027.CoprocessorServiceCall\"]\n\032Copro" +
|
||||
"cessorServiceResponse\022 \n\006region\030\001 \002(\0132\020." +
|
||||
"RegionSpecifier\022\035\n\005value\030\002 \002(\0132\016.NameByt" +
|
||||
"esPair\"9\n\013MultiAction\022\027\n\006mutate\030\001 \001(\0132\007." +
|
||||
"Mutate\022\021\n\003get\030\002 \001(\0132\004.Get\"I\n\014ActionResul",
|
||||
"t\022\026\n\005value\030\001 \001(\0132\007.Result\022!\n\texception\030\002" +
|
||||
" \001(\0132\016.NameBytesPair\"^\n\014MultiRequest\022 \n\006" +
|
||||
"region\030\001 \002(\0132\020.RegionSpecifier\022\034\n\006action" +
|
||||
"\030\002 \003(\0132\014.MultiAction\022\016\n\006atomic\030\003 \001(\010\".\n\r" +
|
||||
"MultiResponse\022\035\n\006result\030\001 \003(\0132\r.ActionRe" +
|
||||
"sult2\223\003\n\rClientService\022 \n\003get\022\013.GetReque" +
|
||||
"st\032\014.GetResponse\022)\n\006mutate\022\016.MutateReque" +
|
||||
"st\032\017.MutateResponse\022#\n\004scan\022\014.ScanReques" +
|
||||
"t\032\r.ScanResponse\022,\n\007lockRow\022\017.LockRowReq" +
|
||||
"uest\032\020.LockRowResponse\0222\n\tunlockRow\022\021.Un",
|
||||
"lockRowRequest\032\022.UnlockRowResponse\022>\n\rbu" +
|
||||
"lkLoadHFile\022\025.BulkLoadHFileRequest\032\026.Bul" +
|
||||
"kLoadHFileResponse\022F\n\013execService\022\032.Copr" +
|
||||
"ocessorServiceRequest\032\033.CoprocessorServi" +
|
||||
"ceResponse\022&\n\005multi\022\r.MultiRequest\032\016.Mul" +
|
||||
"tiResponseBB\n*org.apache.hadoop.hbase.pr" +
|
||||
"otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
|
||||
"tl\030\004 \001(\r\022\027\n\017resultSizeBytes\030\005 \001(\004\"?\n\016Loc" +
|
||||
"kRowRequest\022 \n\006region\030\001 \002(\0132\020.RegionSpec" +
|
||||
"ifier\022\013\n\003row\030\002 \003(\014\".\n\017LockRowResponse\022\016\n" +
|
||||
"\006lockId\030\001 \002(\004\022\013\n\003ttl\030\002 \001(\r\"D\n\020UnlockRowR" +
|
||||
"equest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
|
||||
"\022\016\n\006lockId\030\002 \002(\004\"\023\n\021UnlockRowResponse\"\260\001" +
|
||||
"\n\024BulkLoadHFileRequest\022 \n\006region\030\001 \002(\0132\020" +
|
||||
".RegionSpecifier\0224\n\nfamilyPath\030\002 \003(\0132 .B" +
|
||||
"ulkLoadHFileRequest.FamilyPath\022\024\n\014assign" +
|
||||
"SeqNum\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002",
|
||||
"(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" +
|
||||
"e\022\016\n\006loaded\030\001 \002(\010\"_\n\026CoprocessorServiceC" +
|
||||
"all\022\013\n\003row\030\001 \002(\014\022\023\n\013serviceName\030\002 \002(\t\022\022\n" +
|
||||
"\nmethodName\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"d\n\031Co" +
|
||||
"processorServiceRequest\022 \n\006region\030\001 \002(\0132" +
|
||||
"\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027.Coproc" +
|
||||
"essorServiceCall\"]\n\032CoprocessorServiceRe" +
|
||||
"sponse\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
|
||||
"\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"9\n\013Multi" +
|
||||
"Action\022\027\n\006mutate\030\001 \001(\0132\007.Mutate\022\021\n\003get\030\002",
|
||||
" \001(\0132\004.Get\"I\n\014ActionResult\022\026\n\005value\030\001 \001(" +
|
||||
"\0132\007.Result\022!\n\texception\030\002 \001(\0132\016.NameByte" +
|
||||
"sPair\"^\n\014MultiRequest\022 \n\006region\030\001 \002(\0132\020." +
|
||||
"RegionSpecifier\022\034\n\006action\030\002 \003(\0132\014.MultiA" +
|
||||
"ction\022\016\n\006atomic\030\003 \001(\010\".\n\rMultiResponse\022\035" +
|
||||
"\n\006result\030\001 \003(\0132\r.ActionResult2\223\003\n\rClient" +
|
||||
"Service\022 \n\003get\022\013.GetRequest\032\014.GetRespons" +
|
||||
"e\022)\n\006mutate\022\016.MutateRequest\032\017.MutateResp" +
|
||||
"onse\022#\n\004scan\022\014.ScanRequest\032\r.ScanRespons" +
|
||||
"e\022,\n\007lockRow\022\017.LockRowRequest\032\020.LockRowR",
|
||||
"esponse\0222\n\tunlockRow\022\021.UnlockRowRequest\032" +
|
||||
"\022.UnlockRowResponse\022>\n\rbulkLoadHFile\022\025.B" +
|
||||
"ulkLoadHFileRequest\032\026.BulkLoadHFileRespo" +
|
||||
"nse\022F\n\013execService\022\032.CoprocessorServiceR" +
|
||||
"equest\032\033.CoprocessorServiceResponse\022&\n\005m" +
|
||||
"ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" +
|
||||
"rg.apache.hadoop.hbase.protobuf.generate" +
|
||||
"dB\014ClientProtosH\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -21679,7 +21746,7 @@ public final class ClientProtos {
|
|||
internal_static_ScanResponse_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_ScanResponse_descriptor,
|
||||
new java.lang.String[] { "Result", "ScannerId", "MoreResults", "Ttl", },
|
||||
new java.lang.String[] { "Result", "ScannerId", "MoreResults", "Ttl", "ResultSizeBytes", },
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse.class,
|
||||
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse.Builder.class);
|
||||
internal_static_LockRowRequest_descriptor =
|
||||
|
|
|
@ -218,6 +218,7 @@ message ScanResponse {
|
|||
optional uint64 scannerId = 2;
|
||||
optional bool moreResults = 3;
|
||||
optional uint32 ttl = 4;
|
||||
optional uint64 resultSizeBytes = 5;
|
||||
}
|
||||
|
||||
message LockRowRequest {
|
||||
|
|
|
@ -140,8 +140,9 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
incRPCcallsMetrics();
|
||||
ScanRequest request =
|
||||
RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
|
||||
ScanResponse response = null;
|
||||
try {
|
||||
ScanResponse response = server.scan(null, request);
|
||||
response = server.scan(null, request);
|
||||
// Client and RS maintain a nextCallSeq number during the scan. Every next() call
|
||||
// from client to server will increment this number in both sides. Client passes this
|
||||
// number along with the request and at RS side both the incoming nextCallSeq and its
|
||||
|
@ -171,7 +172,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
updateResultsMetrics(rrs);
|
||||
updateResultsMetrics(response);
|
||||
} catch (IOException e) {
|
||||
if (logScannerActivity) {
|
||||
LOG.info("Got exception in fetching from scanner="
|
||||
|
@ -226,22 +227,15 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateResultsMetrics(Result[] rrs) {
|
||||
if (this.scanMetrics == null || rrs == null) {
|
||||
private void updateResultsMetrics(ScanResponse response) {
|
||||
if (this.scanMetrics == null || !response.hasResultSizeBytes()) {
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* broken by protobufs
|
||||
for (Result rr : rrs) {
|
||||
if (rr.getBytes() != null) {
|
||||
this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
|
||||
if (isRegionServerRemote) {
|
||||
this.scanMetrics.countOfBytesInRemoteResults.inc(
|
||||
rr.getBytes().getLength());
|
||||
}
|
||||
}
|
||||
long value = response.getResultSizeBytes();
|
||||
this.scanMetrics.countOfBytesInResults.addAndGet(value);
|
||||
if (isRegionServerRemote) {
|
||||
this.scanMetrics.countOfBytesInRemoteResults.addAndGet(value);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
private void close() {
|
||||
|
|
|
@ -2947,6 +2947,7 @@ public class HRegionServer implements ClientProtocol,
|
|||
RegionScannerHolder rsh = null;
|
||||
boolean moreResults = true;
|
||||
boolean closeScanner = false;
|
||||
Long resultsWireSize = null;
|
||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
if (request.hasCloseScanner()) {
|
||||
closeScanner = request.getCloseScanner();
|
||||
|
@ -2972,6 +2973,8 @@ public class HRegionServer implements ClientProtocol,
|
|||
if (!isLoadingCfsOnDemandSet) {
|
||||
scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
|
||||
}
|
||||
byte[] hasMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
||||
resultsWireSize = (hasMetrics != null && Bytes.toBoolean(hasMetrics)) ? 0L : null;
|
||||
region.prepareScanner(scan);
|
||||
if (region.getCoprocessorHost() != null) {
|
||||
scanner = region.getCoprocessorHost().preScannerOpen(scan);
|
||||
|
@ -3079,9 +3082,16 @@ public class HRegionServer implements ClientProtocol,
|
|||
} else {
|
||||
for (Result result: results) {
|
||||
if (result != null) {
|
||||
builder.addResult(ProtobufUtil.toResult(result));
|
||||
ClientProtos.Result pbResult = ProtobufUtil.toResult(result);
|
||||
if (resultsWireSize != null) {
|
||||
resultsWireSize += pbResult.getSerializedSize();
|
||||
}
|
||||
builder.addResult(pbResult);
|
||||
}
|
||||
}
|
||||
if (resultsWireSize != null) {
|
||||
builder.setResultSizeBytes(resultsWireSize.longValue());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// We're done. On way out re-add the above removed lease.
|
||||
|
|
Loading…
Reference in New Issue