diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 85352ff0386..afce287af6f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.io.InterruptedIOException; import java.net.UnknownHostException; +import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -202,7 +204,9 @@ public class ScannerCallable extends RegionServerCallable { setHeartbeatMessage(false); try { incRPCcallsMetrics(); - request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); + request = + RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, + this.scanMetrics != null); ScanResponse response = null; controller = controllerFactory.newController(); controller.setPriority(getTableName()); @@ -232,6 +236,7 @@ public class ScannerCallable extends RegionServerCallable { + rows + " rows from scanner=" + scannerId); } } + updateServerSideMetrics(response); // moreResults is only used for the case where a filter exhausts all elements if (response.hasMoreResults() && !response.getMoreResults()) { scannerId = -1L; @@ -341,6 +346,21 @@ public class ScannerCallable extends RegionServerCallable { } } + /** + * Use the scan metrics returned by the server to add to the identically named counters in the + * client side metrics. If a counter does not exist with the same name as the server side metric, + * the attempt to increase the counter will fail. + * @param response + */ + private void updateServerSideMetrics(ScanResponse response) { + if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return; + + Map serverMetrics = ResponseConverter.getScanMetrics(response); + for (Entry entry : serverMetrics.entrySet()) { + this.scanMetrics.addToCounter(entry.getKey(), entry.getValue()); + } + } + private void close() { if (this.scannerId == -1L) { return; @@ -348,7 +368,7 @@ public class ScannerCallable extends RegionServerCallable { try { incRPCcallsMetrics(); ScanRequest request = - RequestConverter.buildScanRequest(this.scannerId, 0, true); + RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { getStub().scan(null, request); } catch (ServiceException se) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java index 35c66678c93..ec2c937c834 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java @@ -18,41 +18,32 @@ package org.apache.hadoop.hbase.client.metrics; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import com.google.common.collect.ImmutableMap; - /** - * Provides client-side metrics related to scan operations. + * Provides metrics related to scan operations (both server side and client side metrics). + *

* The data can be passed to mapreduce framework or other systems. * We use atomic longs so that one thread can increment, * while another atomically resets to zero after the values are reported * to hadoop's counters. - * + *

* Some of these metrics are general for any client operation such as put * However, there is no need for this. So they are defined under scan operation * for now. */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class ScanMetrics { +public class ScanMetrics extends ServerSideScanMetrics { - /** - * Hash to hold the String -> Atomic Long mappings. - */ - private final Map counters = new HashMap(); - - // AtomicLongs to hold the metrics values. These are all updated through ClientScanner and - // ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the + // AtomicLongs to hold the metrics values. These are all updated through ClientScanner and + // ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the // values after progress is passed to hadoop's counters. - /** * number of RPC calls */ @@ -103,36 +94,4 @@ public class ScanMetrics { */ public ScanMetrics() { } - - private AtomicLong createCounter(String counterName) { - AtomicLong c = new AtomicLong(0); - counters.put(counterName, c); - return c; - } - - public void setCounter(String counterName, long value) { - AtomicLong c = this.counters.get(counterName); - if (c != null) { - c.set(value); - } - } - - /** - * Get all of the values since the last time this function was called. - * - * Calling this function will reset all AtomicLongs in the instance back to 0. - * - * @return A Map of String -> Long for metrics - */ - public Map getMetricsMap() { - //Create a builder - ImmutableMap.Builder builder = ImmutableMap.builder(); - //For every entry add the value and reset the AtomicLong back to zero - for (Map.Entry e : this.counters.entrySet()) { - builder.put(e.getKey(), e.getValue().getAndSet(0)); - } - //Build the immutable map so that people can't mess around with it. - return builder.build(); - } - } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java new file mode 100644 index 00000000000..c971c73d60d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import com.google.common.collect.ImmutableMap; + +/** + * Provides server side metrics related to scan operations. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ServerSideScanMetrics { + /** + * Hash to hold the String -> Atomic Long mappings for each metric + */ + private final Map counters = new HashMap(); + + /** + * Create a new counter with the specified name + * @param counterName + * @return {@link AtomicLong} instance for the counter with counterName + */ + protected AtomicLong createCounter(String counterName) { + AtomicLong c = new AtomicLong(0); + counters.put(counterName, c); + return c; + } + + public static final String COUNT_OF_ROWS_SCANNED_KEY = "ROWS_SCANNED"; + public static final String COUNT_OF_ROWS_FILTERED_KEY = "ROWS_FILTERED"; + + /** + * number of rows filtered during scan RPC + */ + public final AtomicLong countOfRowsFiltered = createCounter(COUNT_OF_ROWS_FILTERED_KEY); + + /** + * number of rows scanned during scan RPC. Not every row scanned will be returned to the client + * since rows may be filtered. + */ + public final AtomicLong countOfRowsScanned = createCounter(COUNT_OF_ROWS_SCANNED_KEY); + + /** + * @param counterName + * @param value + */ + public void setCounter(String counterName, long value) { + AtomicLong c = this.counters.get(counterName); + if (c != null) { + c.set(value); + } + } + + /** + * @param counterName + * @return true if a counter exists with the counterName + */ + public boolean hasCounter(String counterName) { + return this.counters.containsKey(counterName); + } + + /** + * @param counterName + * @return {@link AtomicLong} instance for this counter name, null if counter does not exist. + */ + public AtomicLong getCounter(String counterName) { + return this.counters.get(counterName); + } + + /** + * @param counterName + * @param delta + */ + public void addToCounter(String counterName, long delta) { + AtomicLong c = this.counters.get(counterName); + if (c != null) { + c.addAndGet(delta); + } + } + + /** + * Get all of the values since the last time this function was called. Calling this function will + * reset all AtomicLongs in the instance back to 0. + * @return A Map of String -> Long for metrics + */ + public Map getMetricsMap() { + // Create a builder + ImmutableMap.Builder builder = ImmutableMap.builder(); + // For every entry add the value and reset the AtomicLong back to zero + for (Map.Entry e : this.counters.entrySet()) { + builder.put(e.getKey(), e.getValue().getAndSet(0)); + } + // Build the immutable map so that people can't mess around with it. + return builder.build(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index ff77e515fb5..911d9925a1c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -61,6 +60,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; @@ -479,9 +479,8 @@ public final class RequestConverter { * @return a scan request * @throws IOException */ - public static ScanRequest buildScanRequest(final byte[] regionName, - final Scan scan, final int numberOfRows, - final boolean closeScanner) throws IOException { + public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan, + final int numberOfRows, final boolean closeScanner) throws IOException { ScanRequest.Builder builder = ScanRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -491,6 +490,7 @@ public final class RequestConverter { builder.setScan(ProtobufUtil.toScan(scan)); builder.setClientHandlesPartials(true); builder.setClientHandlesHeartbeats(true); + builder.setTrackScanMetrics(scan != null && scan.isScanMetricsEnabled()); return builder.build(); } @@ -502,14 +502,15 @@ public final class RequestConverter { * @param closeScanner * @return a scan request */ - public static ScanRequest buildScanRequest(final long scannerId, - final int numberOfRows, final boolean closeScanner) { + public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, + final boolean closeScanner, final boolean trackMetrics) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); builder.setScannerId(scannerId); builder.setClientHandlesPartials(true); builder.setClientHandlesHeartbeats(true); + builder.setTrackScanMetrics(trackMetrics); return builder.build(); } @@ -523,7 +524,7 @@ public final class RequestConverter { * @return a scan request */ public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, - final boolean closeScanner, final long nextCallSeq) { + final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics) { ScanRequest.Builder builder = ScanRequest.newBuilder(); builder.setNumberOfRows(numberOfRows); builder.setCloseScanner(closeScanner); @@ -531,6 +532,7 @@ public final class RequestConverter { builder.setNextCallSeq(nextCallSeq); builder.setClientHandlesPartials(true); builder.setClientHandlesHeartbeats(true); + builder.setTrackScanMetrics(trackMetrics); return builder.build(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java index 4b646979314..177b1c70fb3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.protobuf; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,6 +50,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair; +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; @@ -395,4 +399,26 @@ public final class ResponseConverter { } return results; } + + public static Map getScanMetrics(ScanResponse response) { + Map metricMap = new HashMap(); + if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) { + return metricMap; + } + + ScanMetrics metrics = response.getScanMetrics(); + int numberOfMetrics = metrics.getMetricsCount(); + for (int i = 0; i < numberOfMetrics; i++) { + NameInt64Pair metricPair = metrics.getMetrics(i); + if (metricPair != null) { + String name = metricPair.getName(); + Long value = metricPair.getValue(); + if (name != null && value != null) { + metricMap.put(name, value); + } + } + } + + return metricMap; + } } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 2991ece7ed4..55767c773c7 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -16443,6 +16443,16 @@ public final class ClientProtos { * optional bool client_handles_heartbeats = 8; */ boolean getClientHandlesHeartbeats(); + + // optional bool track_scan_metrics = 9; + /** + * optional bool track_scan_metrics = 9; + */ + boolean hasTrackScanMetrics(); + /** + * optional bool track_scan_metrics = 9; + */ + boolean getTrackScanMetrics(); } /** * Protobuf type {@code ScanRequest} @@ -16564,6 +16574,11 @@ public final class ClientProtos { clientHandlesHeartbeats_ = input.readBool(); break; } + case 72: { + bitField0_ |= 0x00000100; + trackScanMetrics_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -16744,6 +16759,22 @@ public final class ClientProtos { return clientHandlesHeartbeats_; } + // optional bool track_scan_metrics = 9; + public static final int TRACK_SCAN_METRICS_FIELD_NUMBER = 9; + private boolean trackScanMetrics_; + /** + * optional bool track_scan_metrics = 9; + */ + public boolean hasTrackScanMetrics() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional bool track_scan_metrics = 9; + */ + public boolean getTrackScanMetrics() { + return trackScanMetrics_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); @@ -16753,6 +16784,7 @@ public final class ClientProtos { nextCallSeq_ = 0L; clientHandlesPartials_ = false; clientHandlesHeartbeats_ = false; + trackScanMetrics_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -16802,6 +16834,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000080) == 0x00000080)) { output.writeBool(8, clientHandlesHeartbeats_); } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + output.writeBool(9, trackScanMetrics_); + } getUnknownFields().writeTo(output); } @@ -16843,6 +16878,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(8, clientHandlesHeartbeats_); } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(9, trackScanMetrics_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -16906,6 +16945,11 @@ public final class ClientProtos { result = result && (getClientHandlesHeartbeats() == other.getClientHandlesHeartbeats()); } + result = result && (hasTrackScanMetrics() == other.hasTrackScanMetrics()); + if (hasTrackScanMetrics()) { + result = result && (getTrackScanMetrics() + == other.getTrackScanMetrics()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -16951,6 +16995,10 @@ public final class ClientProtos { hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats()); } + if (hasTrackScanMetrics()) { + hash = (37 * hash) + TRACK_SCAN_METRICS_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getTrackScanMetrics()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -17099,6 +17147,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000040); clientHandlesHeartbeats_ = false; bitField0_ = (bitField0_ & ~0x00000080); + trackScanMetrics_ = false; + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -17167,6 +17217,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000080; } result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000100; + } + result.trackScanMetrics_ = trackScanMetrics_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -17207,6 +17261,9 @@ public final class ClientProtos { if (other.hasClientHandlesHeartbeats()) { setClientHandlesHeartbeats(other.getClientHandlesHeartbeats()); } + if (other.hasTrackScanMetrics()) { + setTrackScanMetrics(other.getTrackScanMetrics()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -17678,6 +17735,39 @@ public final class ClientProtos { return this; } + // optional bool track_scan_metrics = 9; + private boolean trackScanMetrics_ ; + /** + * optional bool track_scan_metrics = 9; + */ + public boolean hasTrackScanMetrics() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + /** + * optional bool track_scan_metrics = 9; + */ + public boolean getTrackScanMetrics() { + return trackScanMetrics_; + } + /** + * optional bool track_scan_metrics = 9; + */ + public Builder setTrackScanMetrics(boolean value) { + bitField0_ |= 0x00000100; + trackScanMetrics_ = value; + onChanged(); + return this; + } + /** + * optional bool track_scan_metrics = 9; + */ + public Builder clearTrackScanMetrics() { + bitField0_ = (bitField0_ & ~0x00000100); + trackScanMetrics_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ScanRequest) } @@ -17920,6 +18010,38 @@ public final class ClientProtos { * */ boolean getHeartbeatMessage(); + + // optional .ScanMetrics scan_metrics = 10; + /** + * optional .ScanMetrics scan_metrics = 10; + * + *

+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * 
+ */ + boolean hasScanMetrics(); + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics(); + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * 
+ */ + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder(); } /** * Protobuf type {@code ScanResponse} @@ -18058,6 +18180,19 @@ public final class ClientProtos { heartbeatMessage_ = input.readBool(); break; } + case 82: { + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder subBuilder = null; + if (((bitField0_ & 0x00000040) == 0x00000040)) { + subBuilder = scanMetrics_.toBuilder(); + } + scanMetrics_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(scanMetrics_); + scanMetrics_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000040; + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -18401,6 +18536,46 @@ public final class ClientProtos { return heartbeatMessage_; } + // optional .ScanMetrics scan_metrics = 10; + public static final int SCAN_METRICS_FIELD_NUMBER = 10; + private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_; + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * 
+ */ + public boolean hasScanMetrics() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() { + return scanMetrics_; + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+     * This field is filled in if the client has requested that scan metrics be tracked.
+     * The metrics tracked here are sent back to the client to be tracked together with 
+     * the existing client side metrics.
+     * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() { + return scanMetrics_; + } + private void initFields() { cellsPerResult_ = java.util.Collections.emptyList(); scannerId_ = 0L; @@ -18411,6 +18586,7 @@ public final class ClientProtos { partialFlagPerResult_ = java.util.Collections.emptyList(); moreResultsInRegion_ = false; heartbeatMessage_ = false; + scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -18451,6 +18627,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeBool(9, heartbeatMessage_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeMessage(10, scanMetrics_); + } getUnknownFields().writeTo(output); } @@ -18503,6 +18682,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(9, heartbeatMessage_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(10, scanMetrics_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -18562,6 +18745,11 @@ public final class ClientProtos { result = result && (getHeartbeatMessage() == other.getHeartbeatMessage()); } + result = result && (hasScanMetrics() == other.hasScanMetrics()); + if (hasScanMetrics()) { + result = result && getScanMetrics() + .equals(other.getScanMetrics()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -18611,6 +18799,10 @@ public final class ClientProtos { hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getHeartbeatMessage()); } + if (hasScanMetrics()) { + hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER; + hash = (53 * hash) + getScanMetrics().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -18719,6 +18911,7 @@ public final class ClientProtos { private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getResultsFieldBuilder(); + getScanMetricsFieldBuilder(); } } private static Builder create() { @@ -18749,6 +18942,12 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000080); heartbeatMessage_ = false; bitField0_ = (bitField0_ & ~0x00000100); + if (scanMetricsBuilder_ == null) { + scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); + } else { + scanMetricsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000200); return this; } @@ -18820,6 +19019,14 @@ public final class ClientProtos { to_bitField0_ |= 0x00000020; } result.heartbeatMessage_ = heartbeatMessage_; + if (((from_bitField0_ & 0x00000200) == 0x00000200)) { + to_bitField0_ |= 0x00000040; + } + if (scanMetricsBuilder_ == null) { + result.scanMetrics_ = scanMetrics_; + } else { + result.scanMetrics_ = scanMetricsBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -18900,6 +19107,9 @@ public final class ClientProtos { if (other.hasHeartbeatMessage()) { setHeartbeatMessage(other.getHeartbeatMessage()); } + if (other.hasScanMetrics()) { + mergeScanMetrics(other.getScanMetrics()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -19797,6 +20007,177 @@ public final class ClientProtos { return this; } + // optional .ScanMetrics scan_metrics = 10; + private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder> scanMetricsBuilder_; + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + public boolean hasScanMetrics() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() { + if (scanMetricsBuilder_ == null) { + return scanMetrics_; + } else { + return scanMetricsBuilder_.getMessage(); + } + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + public Builder setScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) { + if (scanMetricsBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + scanMetrics_ = value; + onChanged(); + } else { + scanMetricsBuilder_.setMessage(value); + } + bitField0_ |= 0x00000200; + return this; + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + public Builder setScanMetrics( + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder builderForValue) { + if (scanMetricsBuilder_ == null) { + scanMetrics_ = builderForValue.build(); + onChanged(); + } else { + scanMetricsBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000200; + return this; + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + public Builder mergeScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) { + if (scanMetricsBuilder_ == null) { + if (((bitField0_ & 0x00000200) == 0x00000200) && + scanMetrics_ != org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance()) { + scanMetrics_ = + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.newBuilder(scanMetrics_).mergeFrom(value).buildPartial(); + } else { + scanMetrics_ = value; + } + onChanged(); + } else { + scanMetricsBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000200; + return this; + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + public Builder clearScanMetrics() { + if (scanMetricsBuilder_ == null) { + scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance(); + onChanged(); + } else { + scanMetricsBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000200); + return this; + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder getScanMetricsBuilder() { + bitField0_ |= 0x00000200; + onChanged(); + return getScanMetricsFieldBuilder().getBuilder(); + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() { + if (scanMetricsBuilder_ != null) { + return scanMetricsBuilder_.getMessageOrBuilder(); + } else { + return scanMetrics_; + } + } + /** + * optional .ScanMetrics scan_metrics = 10; + * + *
+       * This field is filled in if the client has requested that scan metrics be tracked.
+       * The metrics tracked here are sent back to the client to be tracked together with 
+       * the existing client side metrics.
+       * 
+ */ + private com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder> + getScanMetricsFieldBuilder() { + if (scanMetricsBuilder_ == null) { + scanMetricsBuilder_ = new com.google.protobuf.SingleFieldBuilder< + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder>( + scanMetrics_, + getParentForChildren(), + isClean()); + scanMetrics_ = null; + } + return scanMetricsBuilder_; + } + // @@protoc_insertion_point(builder_scope:ScanResponse) } @@ -32868,122 +33249,124 @@ public final class ClientProtos { static { java.lang.String[] descriptorData = { "\n\014Client.proto\032\013HBase.proto\032\014Filter.prot" + - "o\032\nCell.proto\032\020Comparator.proto\"\037\n\016Autho" + - "rizations\022\r\n\005label\030\001 \003(\t\"$\n\016CellVisibili" + - "ty\022\022\n\nexpression\030\001 \002(\t\"+\n\006Column\022\016\n\006fami" + - "ly\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\324\002\n\003Get\022\013\n\003r" + - "ow\030\001 \002(\014\022\027\n\006column\030\002 \003(\0132\007.Column\022!\n\tatt" + - "ribute\030\003 \003(\0132\016.NameBytesPair\022\027\n\006filter\030\004" + - " \001(\0132\007.Filter\022\036\n\ntime_range\030\005 \001(\0132\n.Time" + - "Range\022\027\n\014max_versions\030\006 \001(\r:\0011\022\032\n\014cache_" + - "blocks\030\007 \001(\010:\004true\022\023\n\013store_limit\030\010 \001(\r\022", - "\024\n\014store_offset\030\t \001(\r\022\035\n\016existence_only\030" + - "\n \001(\010:\005false\022!\n\022closest_row_before\030\013 \001(\010" + - ":\005false\022)\n\013consistency\030\014 \001(\0162\014.Consisten" + - "cy:\006STRONG\"z\n\006Result\022\023\n\004cell\030\001 \003(\0132\005.Cel" + - "l\022\035\n\025associated_cell_count\030\002 \001(\005\022\016\n\006exis" + - "ts\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010:\005false\022\026\n\007partia" + - "l\030\005 \001(\010:\005false\"A\n\nGetRequest\022 \n\006region\030\001" + - " \002(\0132\020.RegionSpecifier\022\021\n\003get\030\002 \002(\0132\004.Ge" + - "t\"&\n\013GetResponse\022\027\n\006result\030\001 \001(\0132\007.Resul" + - "t\"\200\001\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006family\030\002", - " \002(\014\022\021\n\tqualifier\030\003 \002(\014\022\"\n\014compare_type\030" + - "\004 \002(\0162\014.CompareType\022\037\n\ncomparator\030\005 \002(\0132" + - "\013.Comparator\"\265\006\n\rMutationProto\022\013\n\003row\030\001 " + - "\001(\014\0220\n\013mutate_type\030\002 \001(\0162\033.MutationProto" + - ".MutationType\0220\n\014column_value\030\003 \003(\0132\032.Mu" + - "tationProto.ColumnValue\022\021\n\ttimestamp\030\004 \001" + - "(\004\022!\n\tattribute\030\005 \003(\0132\016.NameBytesPair\022:\n" + - "\ndurability\030\006 \001(\0162\031.MutationProto.Durabi" + - "lity:\013USE_DEFAULT\022\036\n\ntime_range\030\007 \001(\0132\n." + - "TimeRange\022\035\n\025associated_cell_count\030\010 \001(\005", - "\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013ColumnValue\022\016\n\006famil" + - "y\030\001 \002(\014\022B\n\017qualifier_value\030\002 \003(\0132).Mutat" + - "ionProto.ColumnValue.QualifierValue\032\203\001\n\016" + - "QualifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005val" + - "ue\030\002 \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022.\n\013delete_ty" + - "pe\030\004 \001(\0162\031.MutationProto.DeleteType\022\014\n\004t" + - "ags\030\005 \001(\014\"W\n\nDurability\022\017\n\013USE_DEFAULT\020\000" + - "\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC_WAL\020\002\022\014\n\010SYNC_WA" + - "L\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014MutationType\022\n\n\006AP" + - "PEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DELETE", - "\020\003\"p\n\nDeleteType\022\026\n\022DELETE_ONE_VERSION\020\000" + - "\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDELETE" + - "_FAMILY\020\002\022\031\n\025DELETE_FAMILY_VERSION\020\003\"\207\001\n" + - "\rMutateRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" + - "pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" + - "to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" + - "ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" + - "lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\271\003\n" + - "\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" + - "ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003", - " \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" + - ".Filter\022\036\n\ntime_range\030\006 \001(\0132\n.TimeRange\022" + - "\027\n\014max_versions\030\007 \001(\r:\0011\022\032\n\014cache_blocks" + - "\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_" + - "result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024" + - "\n\014store_offset\030\014 \001(\r\022&\n\036load_column_fami" + - "lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" + - "eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" + - "\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" + - "\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio", - "nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" + - "er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" + - "lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" + - "\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" + - "ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" + - "sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" + - "ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" + - "l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" + - "le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" + - "\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea", - "rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" + + "o\032\nCell.proto\032\020Comparator.proto\032\017MapRedu" + + "ce.proto\"\037\n\016Authorizations\022\r\n\005label\030\001 \003(" + + "\t\"$\n\016CellVisibility\022\022\n\nexpression\030\001 \002(\t\"" + + "+\n\006Column\022\016\n\006family\030\001 \002(\014\022\021\n\tqualifier\030\002" + + " \003(\014\"\324\002\n\003Get\022\013\n\003row\030\001 \002(\014\022\027\n\006column\030\002 \003(" + + "\0132\007.Column\022!\n\tattribute\030\003 \003(\0132\016.NameByte" + + "sPair\022\027\n\006filter\030\004 \001(\0132\007.Filter\022\036\n\ntime_r" + + "ange\030\005 \001(\0132\n.TimeRange\022\027\n\014max_versions\030\006" + + " \001(\r:\0011\022\032\n\014cache_blocks\030\007 \001(\010:\004true\022\023\n\013s", + "tore_limit\030\010 \001(\r\022\024\n\014store_offset\030\t \001(\r\022\035" + + "\n\016existence_only\030\n \001(\010:\005false\022!\n\022closest" + + "_row_before\030\013 \001(\010:\005false\022)\n\013consistency\030" + + "\014 \001(\0162\014.Consistency:\006STRONG\"z\n\006Result\022\023\n" + + "\004cell\030\001 \003(\0132\005.Cell\022\035\n\025associated_cell_co" + + "unt\030\002 \001(\005\022\016\n\006exists\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010" + + ":\005false\022\026\n\007partial\030\005 \001(\010:\005false\"A\n\nGetRe" + "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + - "5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" + - "est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" + - "\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" + - "(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " + - "\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" + - "(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" + - "\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" + - "viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai", - "r\"d\n\031CoprocessorServiceRequest\022 \n\006region" + - "\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" + - ".CoprocessorServiceCall\"]\n\032CoprocessorSe" + - "rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" + - "ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" + - "\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" + - "\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" + - "service_call\030\004 \001(\0132\027.CoprocessorServiceC" + - "all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" + - "gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030", - "\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" + - "storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" + - "\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" + - "\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" + - "\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" + - "\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" + - "Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" + - "ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" + - ".ResultOrException\022!\n\texception\030\002 \001(\0132\016." + - "NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA", - "ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" + - "\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" + - "\rMultiResponse\022/\n\022regionActionResult\030\001 \003" + - "(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" + - "(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" + - "E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" + - "t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" + - "t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" + - "\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" + - "oadHFileRequest\032\026.BulkLoadHFileResponse\022", - "F\n\013ExecService\022\032.CoprocessorServiceReque" + - "st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" + - "egionServerService\022\032.CoprocessorServiceR" + - "equest\032\033.CoprocessorServiceResponse\022&\n\005M" + - "ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" + - "rg.apache.hadoop.hbase.protobuf.generate" + - "dB\014ClientProtosH\001\210\001\001\240\001\001" + "\021\n\003get\030\002 \002(\0132\004.Get\"&\n\013GetResponse\022\027\n\006res" + + "ult\030\001 \001(\0132\007.Result\"\200\001\n\tCondition\022\013\n\003row\030", + "\001 \002(\014\022\016\n\006family\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014" + + "\022\"\n\014compare_type\030\004 \002(\0162\014.CompareType\022\037\n\n" + + "comparator\030\005 \002(\0132\013.Comparator\"\265\006\n\rMutati" + + "onProto\022\013\n\003row\030\001 \001(\014\0220\n\013mutate_type\030\002 \001(" + + "\0162\033.MutationProto.MutationType\0220\n\014column" + + "_value\030\003 \003(\0132\032.MutationProto.ColumnValue" + + "\022\021\n\ttimestamp\030\004 \001(\004\022!\n\tattribute\030\005 \003(\0132\016" + + ".NameBytesPair\022:\n\ndurability\030\006 \001(\0162\031.Mut" + + "ationProto.Durability:\013USE_DEFAULT\022\036\n\nti" + + "me_range\030\007 \001(\0132\n.TimeRange\022\035\n\025associated", + "_cell_count\030\010 \001(\005\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013Col" + + "umnValue\022\016\n\006family\030\001 \002(\014\022B\n\017qualifier_va" + + "lue\030\002 \003(\0132).MutationProto.ColumnValue.Qu" + + "alifierValue\032\203\001\n\016QualifierValue\022\021\n\tquali" + + "fier\030\001 \001(\014\022\r\n\005value\030\002 \001(\014\022\021\n\ttimestamp\030\003" + + " \001(\004\022.\n\013delete_type\030\004 \001(\0162\031.MutationProt" + + "o.DeleteType\022\014\n\004tags\030\005 \001(\014\"W\n\nDurability" + + "\022\017\n\013USE_DEFAULT\020\000\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC" + + "_WAL\020\002\022\014\n\010SYNC_WAL\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014M" + + "utationType\022\n\n\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007", + "\n\003PUT\020\002\022\n\n\006DELETE\020\003\"p\n\nDeleteType\022\026\n\022DEL" + + "ETE_ONE_VERSION\020\000\022\034\n\030DELETE_MULTIPLE_VER" + + "SIONS\020\001\022\021\n\rDELETE_FAMILY\020\002\022\031\n\025DELETE_FAM" + + "ILY_VERSION\020\003\"\207\001\n\rMutateRequest\022 \n\006regio" + + "n\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation\030\002 " + + "\002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(\0132\n" + + ".Condition\022\023\n\013nonce_group\030\004 \001(\004\"<\n\016Mutat" + + "eResponse\022\027\n\006result\030\001 \001(\0132\007.Result\022\021\n\tpr" + + "ocessed\030\002 \001(\010\"\271\003\n\004Scan\022\027\n\006column\030\001 \003(\0132\007" + + ".Column\022!\n\tattribute\030\002 \003(\0132\016.NameBytesPa", + "ir\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_row\030\004 \001(\014\022" + + "\027\n\006filter\030\005 \001(\0132\007.Filter\022\036\n\ntime_range\030\006" + + " \001(\0132\n.TimeRange\022\027\n\014max_versions\030\007 \001(\r:\001" + + "1\022\032\n\014cache_blocks\030\010 \001(\010:\004true\022\022\n\nbatch_s" + + "ize\030\t \001(\r\022\027\n\017max_result_size\030\n \001(\004\022\023\n\013st" + + "ore_limit\030\013 \001(\r\022\024\n\014store_offset\030\014 \001(\r\022&\n" + + "\036load_column_families_on_demand\030\r \001(\010\022\r\n" + + "\005small\030\016 \001(\010\022\027\n\010reversed\030\017 \001(\010:\005false\022)\n" + + "\013consistency\030\020 \001(\0162\014.Consistency:\006STRONG" + + "\022\017\n\007caching\030\021 \001(\r\"\376\001\n\013ScanRequest\022 \n\006reg", + "ion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(" + + "\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_o" + + "f_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rn" + + "ext_call_seq\030\006 \001(\004\022\037\n\027client_handles_par" + + "tials\030\007 \001(\010\022!\n\031client_handles_heartbeats" + + "\030\010 \001(\010\022\032\n\022track_scan_metrics\030\t \001(\010\"\210\002\n\014S" + + "canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" + + "\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" + + "\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" + + "\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result", + "\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031" + + "\n\021heartbeat_message\030\t \001(\010\022\"\n\014scan_metric" + + "s\030\n \001(\0132\014.ScanMetrics\"\263\001\n\024BulkLoadHFileR" + + "equest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" + + "\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFileReq" + + "uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032" + + "*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 " + + "\002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001" + + " \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 " + + "\002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name", + "\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSe" + + "rviceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPa" + + "ir\"d\n\031CoprocessorServiceRequest\022 \n\006regio" + + "n\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132" + + "\027.CoprocessorServiceCall\"]\n\032CoprocessorS" + + "erviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionS" + + "pecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"" + + "{\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001" + + "(\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n" + + "\014service_call\030\004 \001(\0132\027.CoprocessorService", + "Call\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.R" + + "egionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action" + + "\030\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014me" + + "mstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001" + + "(\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001" + + "(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception" + + "\030\003 \001(\0132\016.NameBytesPair\0221\n\016service_result" + + "\030\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tloa" + + "dStats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022Region" + + "ActionResult\022-\n\021resultOrException\030\001 \003(\0132", + "\022.ResultOrException\022!\n\texception\030\002 \001(\0132\016" + + ".NameBytesPair\"f\n\014MultiRequest\022#\n\014region" + + "Action\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGrou" + + "p\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S" + + "\n\rMultiResponse\022/\n\022regionActionResult\030\001 " + + "\003(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 " + + "\001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELI" + + "NE\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReque" + + "st\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReque" + + "st\032\017.MutateResponse\022#\n\004Scan\022\014.ScanReques", + "t\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.Bulk" + + "LoadHFileRequest\032\026.BulkLoadHFileResponse" + + "\022F\n\013ExecService\022\032.CoprocessorServiceRequ" + + "est\032\033.CoprocessorServiceResponse\022R\n\027Exec" + + "RegionServerService\022\032.CoprocessorService" + + "Request\032\033.CoprocessorServiceResponse\022&\n\005" + + "Multi\022\r.MultiRequest\032\016.MultiResponseBB\n*" + + "org.apache.hadoop.hbase.protobuf.generat" + + "edB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -33079,13 +33462,13 @@ public final class ClientProtos { internal_static_ScanRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanRequest_descriptor, - new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", }); + new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", }); internal_static_ScanResponse_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_ScanResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ScanResponse_descriptor, - new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", }); + new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", }); internal_static_BulkLoadHFileRequest_descriptor = getDescriptor().getMessageTypes().get(14); internal_static_BulkLoadHFileRequest_fieldAccessorTable = new @@ -33180,6 +33563,7 @@ public final class ClientProtos { org.apache.hadoop.hbase.protobuf.generated.FilterProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.CellProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos.getDescriptor(), + org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.getDescriptor(), }, assigner); } diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 3a48cc89ac3..c857c6310ce 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -28,6 +28,7 @@ import "HBase.proto"; import "Filter.proto"; import "Cell.proto"; import "Comparator.proto"; +import "MapReduce.proto"; /** * The protocol buffer version of Authorizations. @@ -276,6 +277,7 @@ message ScanRequest { optional uint64 next_call_seq = 6; optional bool client_handles_partials = 7; optional bool client_handles_heartbeats = 8; + optional bool track_scan_metrics = 9; } /** @@ -314,12 +316,17 @@ message ScanResponse { // reasons such as the size in bytes or quantity of results accumulated. This field // will true when more results exist in the current region. optional bool more_results_in_region = 8; - + // This field is filled in if the server is sending back a heartbeat message. // Heartbeat messages are sent back to the client to prevent the scanner from // timing out. Seeing a heartbeat message communicates to the Client that the // server would have continued to scan had the time limit not been reached. optional bool heartbeat_message = 9; + + // This field is filled in if the client has requested that scan metrics be tracked. + // The metrics tracked here are sent back to the client to be tracked together with + // the existing client side metrics. + optional ScanMetrics scan_metrics = 10; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 576a23e8e34..2c776973bf4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5417,7 +5417,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi nextKv = heap.peek(); moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - + if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); if (scannerContext.checkBatchLimit(limitScope)) { return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); } else if (scannerContext.checkSizeLimit(limitScope)) { @@ -5490,8 +5490,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // progress should be kept. if (scannerContext.getKeepProgress()) { // Progress should be kept. Reset to initial values seen at start of method invocation. - scannerContext - .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); + scannerContext.setProgress(initialBatchProgress, initialSizeProgress, + initialTimeProgress); } else { scannerContext.clearProgress(); } @@ -5556,7 +5556,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(current)) { - boolean moreRows = nextRow(current); + incrementCountOfRowsFilteredMetric(scannerContext); + // Typically the count of rows scanned is incremented inside #populateResult. However, + // here we are filtering a row based purely on its row key, preventing us from calling + // #populateResult. Thus, perform the necessary increment here to rows scanned metric + incrementCountOfRowsScannedMetric(scannerContext); + boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5605,9 +5610,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { + if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { + incrementCountOfRowsFilteredMetric(scannerContext); results.clear(); - boolean moreRows = nextRow(current); + boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } @@ -5650,14 +5656,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Double check to prevent empty rows from appearing in result. It could be // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { - boolean moreRows = nextRow(current); + incrementCountOfRowsFilteredMetric(scannerContext); + boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } if (!stopRow) continue; } - // We are done. Return the result. if (stopRow) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } else { @@ -5666,6 +5672,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { + if (scannerContext == null || !scannerContext.isTrackingMetrics()) return; + + scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); + } + + protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { + if (scannerContext == null || !scannerContext.isTrackingMetrics()) return; + + scannerContext.getMetrics().countOfRowsScanned.incrementAndGet(); + } + /** * @param currentRow * @param offset @@ -5711,7 +5729,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return filter != null && filter.filterRowKey(current); } - protected boolean nextRow(Cell curRowCell) throws IOException { + protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read."; Cell next; while ((next = this.storeHeap.peek()) != null && @@ -5719,6 +5737,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.storeHeap.next(MOCKED_LIST); } resetFilters(); + // Calling the hook in CP which allows it to do a fast forward return this.region.getCoprocessorHost() == null || this.region.getCoprocessorHost() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java index 66ed6c087b1..3e0d7e8ad10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; public class NoLimitScannerContext extends ScannerContext { public NoLimitScannerContext() { - super(false, null); + super(false, null, false); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 9364162657a..7bdbed2390b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; import java.util.TreeSet; @@ -141,9 +142,11 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -2337,12 +2340,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final LimitScope timeScope = allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + boolean trackMetrics = + request.hasTrackScanMetrics() && request.getTrackScanMetrics(); + // Configure with limits for this RPC. Set keep progress true since size progress // towards size limit should be kept between calls to nextRaw ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); contextBuilder.setSizeLimit(sizeScope, maxResultSize); contextBuilder.setBatchLimit(scanner.getBatch()); contextBuilder.setTimeLimit(timeScope, timeLimit); + contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); boolean limitReached = false; @@ -2396,6 +2403,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // We didn't get a single batch builder.setMoreResultsInRegion(false); } + + // Check to see if the client requested that we track metrics server side. If the + // client requested metrics, retrieve the metrics from the scanner context. + if (trackMetrics) { + Map metrics = scannerContext.getMetrics().getMetricsMap(); + ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder(); + NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder(); + + for (Entry entry : metrics.entrySet()) { + pairBuilder.setName(entry.getKey()); + pairBuilder.setValue(entry.getValue()); + metricBuilder.addMetrics(pairBuilder.build()); + } + + builder.setScanMetrics(metricBuilder.build()); + } } region.updateReadRequestsCount(i); region.getMetrics().updateScanNext(totalCellSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java index ec107b9bedd..d7ef6e89e6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedRegionScannerImpl.java @@ -65,7 +65,7 @@ class ReversedRegionScannerImpl extends RegionScannerImpl { } @Override - protected boolean nextRow(Cell curRowCell) + protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; byte[] row = new byte[curRowCell.getRowLength()]; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 8dfd0f42289..a9277892aa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; /** * ScannerContext instances encapsulate limit tracking AND progress towards those limits during @@ -96,7 +97,12 @@ public class ScannerContext { boolean keepProgress; private static boolean DEFAULT_KEEP_PROGRESS = false; - ScannerContext(boolean keepProgress, LimitFields limitsToCopy) { + /** + * Tracks the relevant server side metrics during scans. null when metrics should not be tracked + */ + final ServerSideScanMetrics metrics; + + ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) { this.limits = new LimitFields(); if (limitsToCopy != null) this.limits.copy(limitsToCopy); @@ -105,6 +111,21 @@ public class ScannerContext { this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; + this.metrics = trackMetrics ? new ServerSideScanMetrics() : null; + } + + boolean isTrackingMetrics() { + return this.metrics != null; + } + + /** + * Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()} + * has been made to confirm that metrics are indeed being tracked. + * @return {@link ServerSideScanMetrics} instance that is tracking metrics for this scan + */ + ServerSideScanMetrics getMetrics() { + assert isTrackingMetrics(); + return this.metrics; } /** @@ -331,6 +352,7 @@ public class ScannerContext { public static final class Builder { boolean keepProgress = DEFAULT_KEEP_PROGRESS; + boolean trackMetrics = false; LimitFields limits = new LimitFields(); private Builder() { @@ -345,6 +367,11 @@ public class ScannerContext { return this; } + public Builder setTrackMetrics(boolean trackMetrics) { + this.trackMetrics = trackMetrics; + return this; + } + public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) { limits.setSize(sizeLimit); limits.setSizeScope(sizeScope); @@ -363,7 +390,7 @@ public class ScannerContext { } public ScannerContext build() { - return new ScannerContext(keepProgress, limits); + return new ScannerContext(keepProgress, limits, trackMetrics); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java new file mode 100644 index 00000000000..6f9151518a8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestServerSideScanMetricsFromClientSide { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static Table TABLE = null; + + /** + * Table configuration + */ + private static TableName TABLE_NAME = TableName.valueOf("testTable"); + + private static int NUM_ROWS = 10; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then + // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which + // breaks the simple generation of expected kv's + private static int NUM_FAMILIES = 1; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 1; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 10; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS; + + // Approximation of how large the heap size of cells in our table. Should be accessed through + // getCellHeapSize(). + private static long CELL_HEAP_SIZE = -1; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + static Table createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + Table ht = TEST_UTIL.createTable(name, families); + List puts = createPuts(rows, families, qualifiers, cellValue); + ht.put(puts); + + return ht; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Make puts to put the input value into each combination of row, family, and qualifier + * @param rows + * @param families + * @param qualifiers + * @param value + * @return + * @throws IOException + */ + static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList puts = new ArrayList<>(); + + for (int row = 0; row < rows.length; row++) { + put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + /** + * @return The approximate heap size of a cell in the test table. All cells should have + * approximately the same heap size, so the value is cached to avoid repeating the + * calculation + * @throws Exception + */ + private long getCellHeapSize() throws Exception { + if (CELL_HEAP_SIZE == -1) { + // Do a partial scan that will return a single result with a single cell + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = TABLE.getScanner(scan); + + Result result = scanner.next(); + + assertTrue(result != null); + assertTrue(result.rawCells() != null); + assertTrue(result.rawCells().length == 1); + + CELL_HEAP_SIZE = CellUtil.estimatedHeapSizeOf(result.rawCells()[0]); + scanner.close(); + } + + return CELL_HEAP_SIZE; + } + + @Test + public void testRowsSeenMetric() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + testRowsSeenMetric(baseScan); + + // Test case that only a single result will be returned per RPC to the serer + baseScan.setCaching(1); + testRowsSeenMetric(baseScan); + + // Test case that partial results are returned from the server. At most one cell will be + // contained in each response + baseScan.setMaxResultSize(1); + testRowsSeenMetric(baseScan); + + // Test case that size limit is set such that a few cells are returned per partial result from + // the server + baseScan.setCaching(NUM_ROWS); + baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1)); + testRowsSeenMetric(baseScan); + } + + public void testRowsSeenMetric(Scan baseScan) throws Exception { + Scan scan; + scan = new Scan(baseScan); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, NUM_ROWS); + + for (int i = 0; i < ROWS.length - 1; i++) { + scan = new Scan(baseScan); + scan.setStartRow(ROWS[0]); + scan.setStopRow(ROWS[i + 1]); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, i + 1); + } + + for (int i = ROWS.length - 1; i > 0; i--) { + scan = new Scan(baseScan); + scan.setStartRow(ROWS[i - 1]); + scan.setStopRow(ROWS[ROWS.length - 1]); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length - i); + } + + // The filter should filter out all rows, but we still expect to see every row. + Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator("xyz".getBytes())); + scan = new Scan(baseScan); + scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length); + + // Filter should pass on all rows + SingleColumnValueFilter singleColumnValueFilter = + new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.EQUAL, VALUE); + scan = new Scan(baseScan); + scan.setFilter(singleColumnValueFilter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length); + + // Filter should filter out all rows + singleColumnValueFilter = + new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.NOT_EQUAL, VALUE); + scan = new Scan(baseScan); + scan.setFilter(singleColumnValueFilter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length); + } + + @Test + public void testRowsFilteredMetric() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + + // Test case where scan uses default values + testRowsFilteredMetric(baseScan); + + // Test case where at most one Result is retrieved per RPC + baseScan.setCaching(1); + testRowsFilteredMetric(baseScan); + + // Test case where size limit is very restrictive and partial results will be returned from + // server + baseScan.setMaxResultSize(1); + testRowsFilteredMetric(baseScan); + + // Test a case where max result size limits response from server to only a few cells (not all + // cells from the row) + baseScan.setCaching(NUM_ROWS); + baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1)); + testRowsSeenMetric(baseScan); + } + + public void testRowsFilteredMetric(Scan baseScan) throws Exception { + testRowsFilteredMetric(baseScan, null, 0); + + // Row filter doesn't match any row key. All rows should be filtered + Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator("xyz".getBytes())); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + // Filter will return results containing only the first key. Number of entire rows filtered + // should be 0. + filter = new FirstKeyOnlyFilter(); + testRowsFilteredMetric(baseScan, filter, 0); + + // Column prefix will find some matching qualifier on each row. Number of entire rows filtered + // should be 0 + filter = new ColumnPrefixFilter(QUALIFIERS[0]); + testRowsFilteredMetric(baseScan, filter, 0); + + // Column prefix will NOT find any matching qualifier on any row. All rows should be filtered + filter = new ColumnPrefixFilter("xyz".getBytes()); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + // Matching column value should exist in each row. No rows should be filtered. + filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.EQUAL, VALUE); + testRowsFilteredMetric(baseScan, filter, 0); + + // No matching column value should exist in any row. Filter all rows + filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.NOT_EQUAL, VALUE); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + List filters = new ArrayList(); + filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(ROWS[0]))); + filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(ROWS[3]))); + int numberOfMatchingRowFilters = filters.size(); + filter = new FilterList(Operator.MUST_PASS_ONE, filters); + testRowsFilteredMetric(baseScan, filter, ROWS.length - numberOfMatchingRowFilters); + filters.clear(); + + // Add a single column value exclude filter for each column... The net effect is that all + // columns will be excluded when scanning on the server side. This will result in an empty cell + // array in RegionScanner#nextInternal which should be interpreted as a row being filtered. + for (int family = 0; family < FAMILIES.length; family++) { + for (int qualifier = 0; qualifier < QUALIFIERS.length; qualifier++) { + filters.add(new SingleColumnValueExcludeFilter(FAMILIES[family], QUALIFIERS[qualifier], + CompareOp.EQUAL, VALUE)); + } + } + filter = new FilterList(Operator.MUST_PASS_ONE, filters); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + } + + public void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered) + throws Exception { + Scan scan = new Scan(baseScan); + if (filter != null) scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY, expectedNumFiltered); + } + + /** + * Run the scan to completetion and check the metric against the specified value + * @param scan + * @param metricKey + * @param expectedValue + * @throws Exception + */ + public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { + assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); + ResultScanner scanner = TABLE.getScanner(scan); + + // Iterate through all the results + for (Result r : scanner) { + } + scanner.close(); + ScanMetrics metrics = scan.getScanMetrics(); + assertTrue("Metrics are null", metrics != null); + assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); + final long actualMetricValue = metrics.getCounter(metricKey).get(); + assertEquals("Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + + actualMetricValue, expectedValue, actualMetricValue); + + } +} diff --git a/hbase-shell/src/main/ruby/hbase.rb b/hbase-shell/src/main/ruby/hbase.rb index f181edabded..aca10065622 100644 --- a/hbase-shell/src/main/ruby/hbase.rb +++ b/hbase-shell/src/main/ruby/hbase.rb @@ -49,6 +49,8 @@ module HBaseConstants METHOD = "METHOD" MAXLENGTH = "MAXLENGTH" CACHE_BLOCKS = "CACHE_BLOCKS" + ALL_METRICS = "ALL_METRICS" + METRICS = "METRICS" REVERSED = "REVERSED" REPLICATION_SCOPE = "REPLICATION_SCOPE" INTERVAL = 'INTERVAL' diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb index bba81d5f532..960eafc67fa 100644 --- a/hbase-shell/src/main/ruby/hbase/table.rb +++ b/hbase-shell/src/main/ruby/hbase/table.rb @@ -407,6 +407,8 @@ EOF def _hash_to_scan(args) if args.any? + enablemetrics = args["ALL_METRICS"].nil? ? false : args["ALL_METRICS"] + enablemetrics = enablemetrics || !args["METRICS"].nil? filter = args["FILTER"] startrow = args["STARTROW"] || '' stoprow = args["STOPROW"] @@ -454,6 +456,7 @@ EOF scan.setFilter(org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter)) end + scan.setScanMetricsEnabled(enablemetrics) if enablemetrics scan.setTimeStamp(timestamp) if timestamp scan.setCacheBlocks(cache_blocks) scan.setReversed(reversed) @@ -478,8 +481,10 @@ EOF #---------------------------------------------------------------------------------------------- # Scans whole table or a range of keys and returns rows matching specific criteria - def _scan_internal(args = {}) - raise(ArgumentError, "Arguments should be a Hash") unless args.kind_of?(Hash) + def _scan_internal(args = {}, scan = nil) + raise(ArgumentError, "Args should be a Hash") unless args.kind_of?(Hash) + raise(ArgumentError, "Scan argument should be org.apache.hadoop.hbase.client.Scan") \ + unless scan == nil || scan.kind_of?(org.apache.hadoop.hbase.client.Scan) limit = args["LIMIT"] || -1 maxlength = args.delete("MAXLENGTH") || -1 @@ -489,7 +494,8 @@ EOF @converters.clear() # Start the scanner - scanner = @table.getScanner(_hash_to_scan(args)) + scan = scan == nil ? _hash_to_scan(args) : scan + scanner = @table.getScanner(scan) iter = scanner.iterator # Iterate results @@ -519,6 +525,7 @@ EOF break end end + scanner.close() return ((block_given?) ? count : res) end diff --git a/hbase-shell/src/main/ruby/shell/commands/scan.rb b/hbase-shell/src/main/ruby/shell/commands/scan.rb index c6aba9c9e0a..d2fe4e9e465 100644 --- a/hbase-shell/src/main/ruby/shell/commands/scan.rb +++ b/hbase-shell/src/main/ruby/shell/commands/scan.rb @@ -25,7 +25,7 @@ module Shell Scan a table; pass table name and optionally a dictionary of scanner specifications. Scanner specifications may include one or more of: TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, ROWPREFIXFILTER, TIMESTAMP, -MAXLENGTH or COLUMNS, CACHE or RAW, VERSIONS +MAXLENGTH or COLUMNS, CACHE or RAW, VERSIONS, ALL_METRICS or METRICS If no columns are specified, all columns will be scanned. To scan all members of a column family, leave the qualifier empty as in @@ -36,6 +36,11 @@ The filter can be specified in two ways: Filter Language document attached to the HBASE-4176 JIRA 2. Using the entire package name of the filter. +If you wish to see metrics regarding the execution of the scan, the +ALL_METRICS boolean should be set to true. Alternatively, if you would +prefer to see only a subset of the metrics, the METRICS array can be +defined to include the names of only the metrics you care about. + Some examples: hbase> scan 'hbase:meta' @@ -44,6 +49,8 @@ Some examples: hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'} hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]} hbase> scan 't1', {REVERSED => true} + hbase> scan 't1', {ALL_METRICS => true} + hbase> scan 't1', {METRICS => ['RPC_RETRIES', 'ROWS_FILTERED']} hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => " (QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"} hbase> scan 't1', {FILTER => @@ -100,12 +107,18 @@ EOF now = Time.now formatter.header(["ROW", "COLUMN+CELL"]) + scan = table._hash_to_scan(args) #actually do the scanning - count = table._scan_internal(args) do |row, cells| + count = table._scan_internal(args, scan) do |row, cells| formatter.row([ row, cells ]) end formatter.footer(now, count) + + # if scan metrics were enabled, print them after the results + if (scan != nil && scan.isScanMetricsEnabled()) + formatter.scan_metrics(scan.getScanMetrics(), args["METRICS"]) + end end end end diff --git a/hbase-shell/src/main/ruby/shell/formatter.rb b/hbase-shell/src/main/ruby/shell/formatter.rb index 36aaf76aea7..47c9c8da448 100644 --- a/hbase-shell/src/main/ruby/shell/formatter.rb +++ b/hbase-shell/src/main/ruby/shell/formatter.rb @@ -112,6 +112,37 @@ module Shell @row_count += 1 end + # Output the scan metrics. Can be filtered to output only those metrics whose keys exists + # in the metric_filter + def scan_metrics(scan_metrics = nil, metric_filter = []) + return if scan_metrics == nil + raise(ArgumentError, \ + "Argument should be org.apache.hadoop.hbase.client.metrics.ScanMetrics") \ + unless scan_metrics.kind_of?(org.apache.hadoop.hbase.client.metrics.ScanMetrics) + # prefix output with empty line + @out.puts + # save row count to restore after printing metrics + # (metrics should not count towards row count) + saved_row_count = @row_count + iter = scan_metrics.getMetricsMap().entrySet().iterator() + metric_hash = Hash.new() + # put keys in hash so they can be sorted easily + while iter.hasNext + metric = iter.next + metric_hash[metric.getKey.to_s] = metric.getValue.to_s + end + # print in alphabetical order + row(["METRIC", "VALUE"], false) + metric_hash.sort.map do |key, value| + if (not metric_filter or metric_filter.length == 0 or metric_filter.include?(key)) + row([key, value]) + end + end + + @row_count = saved_row_count + return + end + def split(width, str) if width == 0 return [str]