HBASE-5980 Scanner responses from RS should include metrics on rows/KVs filtered

This commit is contained in:
stack 2015-05-19 17:03:56 -07:00
parent 77d9719e2b
commit dc72dad7cd
17 changed files with 1158 additions and 193 deletions

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -202,7 +204,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
setHeartbeatMessage(false); setHeartbeatMessage(false);
try { try {
incRPCcallsMetrics(); incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); request =
RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
this.scanMetrics != null);
ScanResponse response = null; ScanResponse response = null;
controller = controllerFactory.newController(); controller = controllerFactory.newController();
controller.setPriority(getTableName()); controller.setPriority(getTableName());
@ -232,6 +236,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
+ rows + " rows from scanner=" + scannerId); + rows + " rows from scanner=" + scannerId);
} }
} }
updateServerSideMetrics(response);
// moreResults is only used for the case where a filter exhausts all elements // moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults() && !response.getMoreResults()) { if (response.hasMoreResults() && !response.getMoreResults()) {
scannerId = -1L; scannerId = -1L;
@ -341,6 +346,21 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} }
} }
/**
* Use the scan metrics returned by the server to add to the identically named counters in the
* client side metrics. If a counter does not exist with the same name as the server side metric,
* the attempt to increase the counter will fail.
* @param response
*/
private void updateServerSideMetrics(ScanResponse response) {
if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
for (Entry<String, Long> entry : serverMetrics.entrySet()) {
this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
}
}
private void close() { private void close() {
if (this.scannerId == -1L) { if (this.scannerId == -1L) {
return; return;
@ -348,7 +368,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
try { try {
incRPCcallsMetrics(); incRPCcallsMetrics();
ScanRequest request = ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true); RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
try { try {
getStub().scan(null, request); getStub().scan(null, request);
} catch (ServiceException se) { } catch (ServiceException se) {

View File

@ -18,41 +18,32 @@
package org.apache.hadoop.hbase.client.metrics; package org.apache.hadoop.hbase.client.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import com.google.common.collect.ImmutableMap;
/** /**
* Provides client-side metrics related to scan operations. * Provides metrics related to scan operations (both server side and client side metrics).
* <p>
* The data can be passed to mapreduce framework or other systems. * The data can be passed to mapreduce framework or other systems.
* We use atomic longs so that one thread can increment, * We use atomic longs so that one thread can increment,
* while another atomically resets to zero after the values are reported * while another atomically resets to zero after the values are reported
* to hadoop's counters. * to hadoop's counters.
* * <p>
* Some of these metrics are general for any client operation such as put * Some of these metrics are general for any client operation such as put
* However, there is no need for this. So they are defined under scan operation * However, there is no need for this. So they are defined under scan operation
* for now. * for now.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ScanMetrics { public class ScanMetrics extends ServerSideScanMetrics {
/**
* Hash to hold the String -> Atomic Long mappings.
*/
private final Map<String, AtomicLong> counters = new HashMap<String, AtomicLong>();
// AtomicLongs to hold the metrics values. These are all updated through ClientScanner and // AtomicLongs to hold the metrics values. These are all updated through ClientScanner and
// ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the // ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the
// values after progress is passed to hadoop's counters. // values after progress is passed to hadoop's counters.
/** /**
* number of RPC calls * number of RPC calls
*/ */
@ -103,36 +94,4 @@ public class ScanMetrics {
*/ */
public ScanMetrics() { public ScanMetrics() {
} }
private AtomicLong createCounter(String counterName) {
AtomicLong c = new AtomicLong(0);
counters.put(counterName, c);
return c;
}
public void setCounter(String counterName, long value) {
AtomicLong c = this.counters.get(counterName);
if (c != null) {
c.set(value);
}
}
/**
* Get all of the values since the last time this function was called.
*
* Calling this function will reset all AtomicLongs in the instance back to 0.
*
* @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap() {
//Create a builder
ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
//For every entry add the value and reset the AtomicLong back to zero
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
builder.put(e.getKey(), e.getValue().getAndSet(0));
}
//Build the immutable map so that people can't mess around with it.
return builder.build();
}
} }

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import com.google.common.collect.ImmutableMap;
/**
* Provides server side metrics related to scan operations.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ServerSideScanMetrics {
/**
* Hash to hold the String -> Atomic Long mappings for each metric
*/
private final Map<String, AtomicLong> counters = new HashMap<String, AtomicLong>();
/**
* Create a new counter with the specified name
* @param counterName
* @return {@link AtomicLong} instance for the counter with counterName
*/
protected AtomicLong createCounter(String counterName) {
AtomicLong c = new AtomicLong(0);
counters.put(counterName, c);
return c;
}
public static final String COUNT_OF_ROWS_SCANNED_KEY = "ROWS_SCANNED";
public static final String COUNT_OF_ROWS_FILTERED_KEY = "ROWS_FILTERED";
/**
* number of rows filtered during scan RPC
*/
public final AtomicLong countOfRowsFiltered = createCounter(COUNT_OF_ROWS_FILTERED_KEY);
/**
* number of rows scanned during scan RPC. Not every row scanned will be returned to the client
* since rows may be filtered.
*/
public final AtomicLong countOfRowsScanned = createCounter(COUNT_OF_ROWS_SCANNED_KEY);
/**
* @param counterName
* @param value
*/
public void setCounter(String counterName, long value) {
AtomicLong c = this.counters.get(counterName);
if (c != null) {
c.set(value);
}
}
/**
* @param counterName
* @return true if a counter exists with the counterName
*/
public boolean hasCounter(String counterName) {
return this.counters.containsKey(counterName);
}
/**
* @param counterName
* @return {@link AtomicLong} instance for this counter name, null if counter does not exist.
*/
public AtomicLong getCounter(String counterName) {
return this.counters.get(counterName);
}
/**
* @param counterName
* @param delta
*/
public void addToCounter(String counterName, long delta) {
AtomicLong c = this.counters.get(counterName);
if (c != null) {
c.addAndGet(delta);
}
}
/**
* Get all of the values since the last time this function was called. Calling this function will
* reset all AtomicLongs in the instance back to 0.
* @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap() {
// Create a builder
ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
// For every entry add the value and reset the AtomicLong back to zero
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
builder.put(e.getKey(), e.getValue().getAndSet(0));
}
// Build the immutable map so that people can't mess around with it.
return builder.build();
}
}

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@ -61,6 +60,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
@ -478,9 +478,8 @@ public final class RequestConverter {
* @return a scan request * @return a scan request
* @throws IOException * @throws IOException
*/ */
public static ScanRequest buildScanRequest(final byte[] regionName, public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan,
final Scan scan, final int numberOfRows, final int numberOfRows, final boolean closeScanner) throws IOException {
final boolean closeScanner) throws IOException {
ScanRequest.Builder builder = ScanRequest.newBuilder(); ScanRequest.Builder builder = ScanRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier( RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName); RegionSpecifierType.REGION_NAME, regionName);
@ -490,6 +489,7 @@ public final class RequestConverter {
builder.setScan(ProtobufUtil.toScan(scan)); builder.setScan(ProtobufUtil.toScan(scan));
builder.setClientHandlesPartials(true); builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true); builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(scan != null && scan.isScanMetricsEnabled());
return builder.build(); return builder.build();
} }
@ -501,14 +501,15 @@ public final class RequestConverter {
* @param closeScanner * @param closeScanner
* @return a scan request * @return a scan request
*/ */
public static ScanRequest buildScanRequest(final long scannerId, public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
final int numberOfRows, final boolean closeScanner) { final boolean closeScanner, final boolean trackMetrics) {
ScanRequest.Builder builder = ScanRequest.newBuilder(); ScanRequest.Builder builder = ScanRequest.newBuilder();
builder.setNumberOfRows(numberOfRows); builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner); builder.setCloseScanner(closeScanner);
builder.setScannerId(scannerId); builder.setScannerId(scannerId);
builder.setClientHandlesPartials(true); builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true); builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(trackMetrics);
return builder.build(); return builder.build();
} }
@ -522,7 +523,7 @@ public final class RequestConverter {
* @return a scan request * @return a scan request
*/ */
public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
final boolean closeScanner, final long nextCallSeq) { final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics) {
ScanRequest.Builder builder = ScanRequest.newBuilder(); ScanRequest.Builder builder = ScanRequest.newBuilder();
builder.setNumberOfRows(numberOfRows); builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner); builder.setCloseScanner(closeScanner);
@ -530,6 +531,7 @@ public final class RequestConverter {
builder.setNextCallSeq(nextCallSeq); builder.setNextCallSeq(nextCallSeq);
builder.setClientHandlesPartials(true); builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true); builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(trackMetrics);
return builder.build(); return builder.build();
} }

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.protobuf;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -48,6 +50,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
@ -395,4 +399,26 @@ public final class ResponseConverter {
} }
return results; return results;
} }
public static Map<String, Long> getScanMetrics(ScanResponse response) {
Map<String, Long> metricMap = new HashMap<String, Long>();
if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
return metricMap;
}
ScanMetrics metrics = response.getScanMetrics();
int numberOfMetrics = metrics.getMetricsCount();
for (int i = 0; i < numberOfMetrics; i++) {
NameInt64Pair metricPair = metrics.getMetrics(i);
if (metricPair != null) {
String name = metricPair.getName();
Long value = metricPair.getValue();
if (name != null && value != null) {
metricMap.put(name, value);
}
}
}
return metricMap;
}
} }

View File

@ -16443,6 +16443,16 @@ public final class ClientProtos {
* <code>optional bool client_handles_heartbeats = 8;</code> * <code>optional bool client_handles_heartbeats = 8;</code>
*/ */
boolean getClientHandlesHeartbeats(); boolean getClientHandlesHeartbeats();
// optional bool track_scan_metrics = 9;
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
boolean hasTrackScanMetrics();
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
boolean getTrackScanMetrics();
} }
/** /**
* Protobuf type {@code ScanRequest} * Protobuf type {@code ScanRequest}
@ -16564,6 +16574,11 @@ public final class ClientProtos {
clientHandlesHeartbeats_ = input.readBool(); clientHandlesHeartbeats_ = input.readBool();
break; break;
} }
case 72: {
bitField0_ |= 0x00000100;
trackScanMetrics_ = input.readBool();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -16744,6 +16759,22 @@ public final class ClientProtos {
return clientHandlesHeartbeats_; return clientHandlesHeartbeats_;
} }
// optional bool track_scan_metrics = 9;
public static final int TRACK_SCAN_METRICS_FIELD_NUMBER = 9;
private boolean trackScanMetrics_;
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public boolean hasTrackScanMetrics() {
return ((bitField0_ & 0x00000100) == 0x00000100);
}
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public boolean getTrackScanMetrics() {
return trackScanMetrics_;
}
private void initFields() { private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
@ -16753,6 +16784,7 @@ public final class ClientProtos {
nextCallSeq_ = 0L; nextCallSeq_ = 0L;
clientHandlesPartials_ = false; clientHandlesPartials_ = false;
clientHandlesHeartbeats_ = false; clientHandlesHeartbeats_ = false;
trackScanMetrics_ = false;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -16802,6 +16834,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000080) == 0x00000080)) { if (((bitField0_ & 0x00000080) == 0x00000080)) {
output.writeBool(8, clientHandlesHeartbeats_); output.writeBool(8, clientHandlesHeartbeats_);
} }
if (((bitField0_ & 0x00000100) == 0x00000100)) {
output.writeBool(9, trackScanMetrics_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -16843,6 +16878,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(8, clientHandlesHeartbeats_); .computeBoolSize(8, clientHandlesHeartbeats_);
} }
if (((bitField0_ & 0x00000100) == 0x00000100)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(9, trackScanMetrics_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -16906,6 +16945,11 @@ public final class ClientProtos {
result = result && (getClientHandlesHeartbeats() result = result && (getClientHandlesHeartbeats()
== other.getClientHandlesHeartbeats()); == other.getClientHandlesHeartbeats());
} }
result = result && (hasTrackScanMetrics() == other.hasTrackScanMetrics());
if (hasTrackScanMetrics()) {
result = result && (getTrackScanMetrics()
== other.getTrackScanMetrics());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -16951,6 +16995,10 @@ public final class ClientProtos {
hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER; hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats()); hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats());
} }
if (hasTrackScanMetrics()) {
hash = (37 * hash) + TRACK_SCAN_METRICS_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getTrackScanMetrics());
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -17099,6 +17147,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000040); bitField0_ = (bitField0_ & ~0x00000040);
clientHandlesHeartbeats_ = false; clientHandlesHeartbeats_ = false;
bitField0_ = (bitField0_ & ~0x00000080); bitField0_ = (bitField0_ & ~0x00000080);
trackScanMetrics_ = false;
bitField0_ = (bitField0_ & ~0x00000100);
return this; return this;
} }
@ -17167,6 +17217,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000080; to_bitField0_ |= 0x00000080;
} }
result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_; result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_;
if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
to_bitField0_ |= 0x00000100;
}
result.trackScanMetrics_ = trackScanMetrics_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -17207,6 +17261,9 @@ public final class ClientProtos {
if (other.hasClientHandlesHeartbeats()) { if (other.hasClientHandlesHeartbeats()) {
setClientHandlesHeartbeats(other.getClientHandlesHeartbeats()); setClientHandlesHeartbeats(other.getClientHandlesHeartbeats());
} }
if (other.hasTrackScanMetrics()) {
setTrackScanMetrics(other.getTrackScanMetrics());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -17678,6 +17735,39 @@ public final class ClientProtos {
return this; return this;
} }
// optional bool track_scan_metrics = 9;
private boolean trackScanMetrics_ ;
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public boolean hasTrackScanMetrics() {
return ((bitField0_ & 0x00000100) == 0x00000100);
}
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public boolean getTrackScanMetrics() {
return trackScanMetrics_;
}
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public Builder setTrackScanMetrics(boolean value) {
bitField0_ |= 0x00000100;
trackScanMetrics_ = value;
onChanged();
return this;
}
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public Builder clearTrackScanMetrics() {
bitField0_ = (bitField0_ & ~0x00000100);
trackScanMetrics_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:ScanRequest) // @@protoc_insertion_point(builder_scope:ScanRequest)
} }
@ -17920,6 +18010,38 @@ public final class ClientProtos {
* </pre> * </pre>
*/ */
boolean getHeartbeatMessage(); boolean getHeartbeatMessage();
// optional .ScanMetrics scan_metrics = 10;
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
boolean hasScanMetrics();
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics();
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder();
} }
/** /**
* Protobuf type {@code ScanResponse} * Protobuf type {@code ScanResponse}
@ -18058,6 +18180,19 @@ public final class ClientProtos {
heartbeatMessage_ = input.readBool(); heartbeatMessage_ = input.readBool();
break; break;
} }
case 82: {
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder subBuilder = null;
if (((bitField0_ & 0x00000040) == 0x00000040)) {
subBuilder = scanMetrics_.toBuilder();
}
scanMetrics_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(scanMetrics_);
scanMetrics_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000040;
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -18401,6 +18536,46 @@ public final class ClientProtos {
return heartbeatMessage_; return heartbeatMessage_;
} }
// optional .ScanMetrics scan_metrics = 10;
public static final int SCAN_METRICS_FIELD_NUMBER = 10;
private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_;
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public boolean hasScanMetrics() {
return ((bitField0_ & 0x00000040) == 0x00000040);
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() {
return scanMetrics_;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() {
return scanMetrics_;
}
private void initFields() { private void initFields() {
cellsPerResult_ = java.util.Collections.emptyList(); cellsPerResult_ = java.util.Collections.emptyList();
scannerId_ = 0L; scannerId_ = 0L;
@ -18411,6 +18586,7 @@ public final class ClientProtos {
partialFlagPerResult_ = java.util.Collections.emptyList(); partialFlagPerResult_ = java.util.Collections.emptyList();
moreResultsInRegion_ = false; moreResultsInRegion_ = false;
heartbeatMessage_ = false; heartbeatMessage_ = false;
scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -18451,6 +18627,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000020) == 0x00000020)) { if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBool(9, heartbeatMessage_); output.writeBool(9, heartbeatMessage_);
} }
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeMessage(10, scanMetrics_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -18503,6 +18682,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(9, heartbeatMessage_); .computeBoolSize(9, heartbeatMessage_);
} }
if (((bitField0_ & 0x00000040) == 0x00000040)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(10, scanMetrics_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -18562,6 +18745,11 @@ public final class ClientProtos {
result = result && (getHeartbeatMessage() result = result && (getHeartbeatMessage()
== other.getHeartbeatMessage()); == other.getHeartbeatMessage());
} }
result = result && (hasScanMetrics() == other.hasScanMetrics());
if (hasScanMetrics()) {
result = result && getScanMetrics()
.equals(other.getScanMetrics());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -18611,6 +18799,10 @@ public final class ClientProtos {
hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER; hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getHeartbeatMessage()); hash = (53 * hash) + hashBoolean(getHeartbeatMessage());
} }
if (hasScanMetrics()) {
hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER;
hash = (53 * hash) + getScanMetrics().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -18719,6 +18911,7 @@ public final class ClientProtos {
private void maybeForceBuilderInitialization() { private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getResultsFieldBuilder(); getResultsFieldBuilder();
getScanMetricsFieldBuilder();
} }
} }
private static Builder create() { private static Builder create() {
@ -18749,6 +18942,12 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000080); bitField0_ = (bitField0_ & ~0x00000080);
heartbeatMessage_ = false; heartbeatMessage_ = false;
bitField0_ = (bitField0_ & ~0x00000100); bitField0_ = (bitField0_ & ~0x00000100);
if (scanMetricsBuilder_ == null) {
scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
} else {
scanMetricsBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000200);
return this; return this;
} }
@ -18820,6 +19019,14 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000020; to_bitField0_ |= 0x00000020;
} }
result.heartbeatMessage_ = heartbeatMessage_; result.heartbeatMessage_ = heartbeatMessage_;
if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
to_bitField0_ |= 0x00000040;
}
if (scanMetricsBuilder_ == null) {
result.scanMetrics_ = scanMetrics_;
} else {
result.scanMetrics_ = scanMetricsBuilder_.build();
}
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -18900,6 +19107,9 @@ public final class ClientProtos {
if (other.hasHeartbeatMessage()) { if (other.hasHeartbeatMessage()) {
setHeartbeatMessage(other.getHeartbeatMessage()); setHeartbeatMessage(other.getHeartbeatMessage());
} }
if (other.hasScanMetrics()) {
mergeScanMetrics(other.getScanMetrics());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -19797,6 +20007,177 @@ public final class ClientProtos {
return this; return this;
} }
// optional .ScanMetrics scan_metrics = 10;
private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder> scanMetricsBuilder_;
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public boolean hasScanMetrics() {
return ((bitField0_ & 0x00000200) == 0x00000200);
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() {
if (scanMetricsBuilder_ == null) {
return scanMetrics_;
} else {
return scanMetricsBuilder_.getMessage();
}
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public Builder setScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) {
if (scanMetricsBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
scanMetrics_ = value;
onChanged();
} else {
scanMetricsBuilder_.setMessage(value);
}
bitField0_ |= 0x00000200;
return this;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public Builder setScanMetrics(
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder builderForValue) {
if (scanMetricsBuilder_ == null) {
scanMetrics_ = builderForValue.build();
onChanged();
} else {
scanMetricsBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000200;
return this;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public Builder mergeScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) {
if (scanMetricsBuilder_ == null) {
if (((bitField0_ & 0x00000200) == 0x00000200) &&
scanMetrics_ != org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance()) {
scanMetrics_ =
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.newBuilder(scanMetrics_).mergeFrom(value).buildPartial();
} else {
scanMetrics_ = value;
}
onChanged();
} else {
scanMetricsBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000200;
return this;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public Builder clearScanMetrics() {
if (scanMetricsBuilder_ == null) {
scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
onChanged();
} else {
scanMetricsBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000200);
return this;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder getScanMetricsBuilder() {
bitField0_ |= 0x00000200;
onChanged();
return getScanMetricsFieldBuilder().getBuilder();
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() {
if (scanMetricsBuilder_ != null) {
return scanMetricsBuilder_.getMessageOrBuilder();
} else {
return scanMetrics_;
}
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder>
getScanMetricsFieldBuilder() {
if (scanMetricsBuilder_ == null) {
scanMetricsBuilder_ = new com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder>(
scanMetrics_,
getParentForChildren(),
isClean());
scanMetrics_ = null;
}
return scanMetricsBuilder_;
}
// @@protoc_insertion_point(builder_scope:ScanResponse) // @@protoc_insertion_point(builder_scope:ScanResponse)
} }
@ -32868,122 +33249,124 @@ public final class ClientProtos {
static { static {
java.lang.String[] descriptorData = { java.lang.String[] descriptorData = {
"\n\014Client.proto\032\013HBase.proto\032\014Filter.prot" + "\n\014Client.proto\032\013HBase.proto\032\014Filter.prot" +
"o\032\nCell.proto\032\020Comparator.proto\"\037\n\016Autho" + "o\032\nCell.proto\032\020Comparator.proto\032\017MapRedu" +
"rizations\022\r\n\005label\030\001 \003(\t\"$\n\016CellVisibili" + "ce.proto\"\037\n\016Authorizations\022\r\n\005label\030\001 \003(" +
"ty\022\022\n\nexpression\030\001 \002(\t\"+\n\006Column\022\016\n\006fami" + "\t\"$\n\016CellVisibility\022\022\n\nexpression\030\001 \002(\t\"" +
"ly\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\324\002\n\003Get\022\013\n\003r" + "+\n\006Column\022\016\n\006family\030\001 \002(\014\022\021\n\tqualifier\030\002" +
"ow\030\001 \002(\014\022\027\n\006column\030\002 \003(\0132\007.Column\022!\n\tatt" + " \003(\014\"\324\002\n\003Get\022\013\n\003row\030\001 \002(\014\022\027\n\006column\030\002 \003(" +
"ribute\030\003 \003(\0132\016.NameBytesPair\022\027\n\006filter\030\004" + "\0132\007.Column\022!\n\tattribute\030\003 \003(\0132\016.NameByte" +
" \001(\0132\007.Filter\022\036\n\ntime_range\030\005 \001(\0132\n.Time" + "sPair\022\027\n\006filter\030\004 \001(\0132\007.Filter\022\036\n\ntime_r" +
"Range\022\027\n\014max_versions\030\006 \001(\r:\0011\022\032\n\014cache_" + "ange\030\005 \001(\0132\n.TimeRange\022\027\n\014max_versions\030\006" +
"blocks\030\007 \001(\010:\004true\022\023\n\013store_limit\030\010 \001(\r\022", " \001(\r:\0011\022\032\n\014cache_blocks\030\007 \001(\010:\004true\022\023\n\013s",
"\024\n\014store_offset\030\t \001(\r\022\035\n\016existence_only\030" + "tore_limit\030\010 \001(\r\022\024\n\014store_offset\030\t \001(\r\022\035" +
"\n \001(\010:\005false\022!\n\022closest_row_before\030\013 \001(\010" + "\n\016existence_only\030\n \001(\010:\005false\022!\n\022closest" +
":\005false\022)\n\013consistency\030\014 \001(\0162\014.Consisten" + "_row_before\030\013 \001(\010:\005false\022)\n\013consistency\030" +
"cy:\006STRONG\"z\n\006Result\022\023\n\004cell\030\001 \003(\0132\005.Cel" + "\014 \001(\0162\014.Consistency:\006STRONG\"z\n\006Result\022\023\n" +
"l\022\035\n\025associated_cell_count\030\002 \001(\005\022\016\n\006exis" + "\004cell\030\001 \003(\0132\005.Cell\022\035\n\025associated_cell_co" +
"ts\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010:\005false\022\026\n\007partia" + "unt\030\002 \001(\005\022\016\n\006exists\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010" +
"l\030\005 \001(\010:\005false\"A\n\nGetRequest\022 \n\006region\030\001" + ":\005false\022\026\n\007partial\030\005 \001(\010:\005false\"A\n\nGetRe" +
" \002(\0132\020.RegionSpecifier\022\021\n\003get\030\002 \002(\0132\004.Ge" +
"t\"&\n\013GetResponse\022\027\n\006result\030\001 \001(\0132\007.Resul" +
"t\"\200\001\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006family\030\002",
" \002(\014\022\021\n\tqualifier\030\003 \002(\014\022\"\n\014compare_type\030" +
"\004 \002(\0162\014.CompareType\022\037\n\ncomparator\030\005 \002(\0132" +
"\013.Comparator\"\265\006\n\rMutationProto\022\013\n\003row\030\001 " +
"\001(\014\0220\n\013mutate_type\030\002 \001(\0162\033.MutationProto" +
".MutationType\0220\n\014column_value\030\003 \003(\0132\032.Mu" +
"tationProto.ColumnValue\022\021\n\ttimestamp\030\004 \001" +
"(\004\022!\n\tattribute\030\005 \003(\0132\016.NameBytesPair\022:\n" +
"\ndurability\030\006 \001(\0162\031.MutationProto.Durabi" +
"lity:\013USE_DEFAULT\022\036\n\ntime_range\030\007 \001(\0132\n." +
"TimeRange\022\035\n\025associated_cell_count\030\010 \001(\005",
"\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013ColumnValue\022\016\n\006famil" +
"y\030\001 \002(\014\022B\n\017qualifier_value\030\002 \003(\0132).Mutat" +
"ionProto.ColumnValue.QualifierValue\032\203\001\n\016" +
"QualifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005val" +
"ue\030\002 \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022.\n\013delete_ty" +
"pe\030\004 \001(\0162\031.MutationProto.DeleteType\022\014\n\004t" +
"ags\030\005 \001(\014\"W\n\nDurability\022\017\n\013USE_DEFAULT\020\000" +
"\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC_WAL\020\002\022\014\n\010SYNC_WA" +
"L\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014MutationType\022\n\n\006AP" +
"PEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DELETE",
"\020\003\"p\n\nDeleteType\022\026\n\022DELETE_ONE_VERSION\020\000" +
"\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDELETE" +
"_FAMILY\020\002\022\031\n\025DELETE_FAMILY_VERSION\020\003\"\207\001\n" +
"\rMutateRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
"pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" +
"to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" +
"ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" +
"lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\271\003\n" +
"\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" +
"ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003",
" \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" +
".Filter\022\036\n\ntime_range\030\006 \001(\0132\n.TimeRange\022" +
"\027\n\014max_versions\030\007 \001(\r:\0011\022\032\n\014cache_blocks" +
"\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_" +
"result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024" +
"\n\014store_offset\030\014 \001(\r\022&\n\036load_column_fami" +
"lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" +
"eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" +
"\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" +
"\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
"nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
"er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
"lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
"\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" +
"ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" +
"sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" +
"ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" +
"l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" +
"le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
"\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea",
"rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" +
"quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
"5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" + "\021\n\003get\030\002 \002(\0132\004.Get\"&\n\013GetResponse\022\027\n\006res" +
"est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" + "ult\030\001 \001(\0132\007.Result\"\200\001\n\tCondition\022\013\n\003row\030",
"\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" + "\001 \002(\014\022\016\n\006family\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014" +
"(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " + "\022\"\n\014compare_type\030\004 \002(\0162\014.CompareType\022\037\n\n" +
"\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" + "comparator\030\005 \002(\0132\013.Comparator\"\265\006\n\rMutati" +
"(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" + "onProto\022\013\n\003row\030\001 \001(\014\0220\n\013mutate_type\030\002 \001(" +
"\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" + "\0162\033.MutationProto.MutationType\0220\n\014column" +
"viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai", "_value\030\003 \003(\0132\032.MutationProto.ColumnValue" +
"r\"d\n\031CoprocessorServiceRequest\022 \n\006region" + "\022\021\n\ttimestamp\030\004 \001(\004\022!\n\tattribute\030\005 \003(\0132\016" +
"\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" + ".NameBytesPair\022:\n\ndurability\030\006 \001(\0162\031.Mut" +
".CoprocessorServiceCall\"]\n\032CoprocessorSe" + "ationProto.Durability:\013USE_DEFAULT\022\036\n\nti" +
"rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" + "me_range\030\007 \001(\0132\n.TimeRange\022\035\n\025associated",
"ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" + "_cell_count\030\010 \001(\005\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013Col" +
"\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" + "umnValue\022\016\n\006family\030\001 \002(\014\022B\n\017qualifier_va" +
"\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" + "lue\030\002 \003(\0132).MutationProto.ColumnValue.Qu" +
"service_call\030\004 \001(\0132\027.CoprocessorServiceC" + "alifierValue\032\203\001\n\016QualifierValue\022\021\n\tquali" +
"all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" + "fier\030\001 \001(\014\022\r\n\005value\030\002 \001(\014\022\021\n\ttimestamp\030\003" +
"gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030", " \001(\004\022.\n\013delete_type\030\004 \001(\0162\031.MutationProt" +
"\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" + "o.DeleteType\022\014\n\004tags\030\005 \001(\014\"W\n\nDurability" +
"storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" + "\022\017\n\013USE_DEFAULT\020\000\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC" +
"\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" + "_WAL\020\002\022\014\n\010SYNC_WAL\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014M" +
"\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" + "utationType\022\n\n\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007",
"\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" + "\n\003PUT\020\002\022\n\n\006DELETE\020\003\"p\n\nDeleteType\022\026\n\022DEL" +
"\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" + "ETE_ONE_VERSION\020\000\022\034\n\030DELETE_MULTIPLE_VER" +
"Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" + "SIONS\020\001\022\021\n\rDELETE_FAMILY\020\002\022\031\n\025DELETE_FAM" +
"ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" + "ILY_VERSION\020\003\"\207\001\n\rMutateRequest\022 \n\006regio" +
".ResultOrException\022!\n\texception\030\002 \001(\0132\016." + "n\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation\030\002 " +
"NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA", "\002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(\0132\n" +
"ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" + ".Condition\022\023\n\013nonce_group\030\004 \001(\004\"<\n\016Mutat" +
"\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" + "eResponse\022\027\n\006result\030\001 \001(\0132\007.Result\022\021\n\tpr" +
"\rMultiResponse\022/\n\022regionActionResult\030\001 \003" + "ocessed\030\002 \001(\010\"\271\003\n\004Scan\022\027\n\006column\030\001 \003(\0132\007" +
"(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" + ".Column\022!\n\tattribute\030\002 \003(\0132\016.NameBytesPa",
"(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" + "ir\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_row\030\004 \001(\014\022" +
"E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" + "\027\n\006filter\030\005 \001(\0132\007.Filter\022\036\n\ntime_range\030\006" +
"t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" + " \001(\0132\n.TimeRange\022\027\n\014max_versions\030\007 \001(\r:\001" +
"t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" + "1\022\032\n\014cache_blocks\030\010 \001(\010:\004true\022\022\n\nbatch_s" +
"\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" + "ize\030\t \001(\r\022\027\n\017max_result_size\030\n \001(\004\022\023\n\013st" +
"oadHFileRequest\032\026.BulkLoadHFileResponse\022", "ore_limit\030\013 \001(\r\022\024\n\014store_offset\030\014 \001(\r\022&\n" +
"F\n\013ExecService\022\032.CoprocessorServiceReque" + "\036load_column_families_on_demand\030\r \001(\010\022\r\n" +
"st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" + "\005small\030\016 \001(\010\022\027\n\010reversed\030\017 \001(\010:\005false\022)\n" +
"egionServerService\022\032.CoprocessorServiceR" + "\013consistency\030\020 \001(\0162\014.Consistency:\006STRONG" +
"equest\032\033.CoprocessorServiceResponse\022&\n\005M" + "\022\017\n\007caching\030\021 \001(\r\"\376\001\n\013ScanRequest\022 \n\006reg",
"ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" + "ion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(" +
"rg.apache.hadoop.hbase.protobuf.generate" + "\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_o" +
"dB\014ClientProtosH\001\210\001\001\240\001\001" "f_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rn" +
"ext_call_seq\030\006 \001(\004\022\037\n\027client_handles_par" +
"tials\030\007 \001(\010\022!\n\031client_handles_heartbeats" +
"\030\010 \001(\010\022\032\n\022track_scan_metrics\030\t \001(\010\"\210\002\n\014S" +
"canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
"\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
"\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
"\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result",
"\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031" +
"\n\021heartbeat_message\030\t \001(\010\022\"\n\014scan_metric" +
"s\030\n \001(\0132\014.ScanMetrics\"\263\001\n\024BulkLoadHFileR" +
"equest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
"\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFileReq" +
"uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032" +
"*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 " +
"\002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001" +
" \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 " +
"\002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name",
"\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSe" +
"rviceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPa" +
"ir\"d\n\031CoprocessorServiceRequest\022 \n\006regio" +
"n\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132" +
"\027.CoprocessorServiceCall\"]\n\032CoprocessorS" +
"erviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionS" +
"pecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"" +
"{\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001" +
"(\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n" +
"\014service_call\030\004 \001(\0132\027.CoprocessorService",
"Call\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.R" +
"egionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action" +
"\030\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014me" +
"mstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001" +
"(\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001" +
"(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception" +
"\030\003 \001(\0132\016.NameBytesPair\0221\n\016service_result" +
"\030\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tloa" +
"dStats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022Region" +
"ActionResult\022-\n\021resultOrException\030\001 \003(\0132",
"\022.ResultOrException\022!\n\texception\030\002 \001(\0132\016" +
".NameBytesPair\"f\n\014MultiRequest\022#\n\014region" +
"Action\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGrou" +
"p\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S" +
"\n\rMultiResponse\022/\n\022regionActionResult\030\001 " +
"\003(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 " +
"\001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELI" +
"NE\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReque" +
"st\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReque" +
"st\032\017.MutateResponse\022#\n\004Scan\022\014.ScanReques",
"t\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.Bulk" +
"LoadHFileRequest\032\026.BulkLoadHFileResponse" +
"\022F\n\013ExecService\022\032.CoprocessorServiceRequ" +
"est\032\033.CoprocessorServiceResponse\022R\n\027Exec" +
"RegionServerService\022\032.CoprocessorService" +
"Request\032\033.CoprocessorServiceResponse\022&\n\005" +
"Multi\022\r.MultiRequest\032\016.MultiResponseBB\n*" +
"org.apache.hadoop.hbase.protobuf.generat" +
"edB\014ClientProtosH\001\210\001\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -33079,13 +33462,13 @@ public final class ClientProtos {
internal_static_ScanRequest_fieldAccessorTable = new internal_static_ScanRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanRequest_descriptor, internal_static_ScanRequest_descriptor,
new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", }); new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", });
internal_static_ScanResponse_descriptor = internal_static_ScanResponse_descriptor =
getDescriptor().getMessageTypes().get(13); getDescriptor().getMessageTypes().get(13);
internal_static_ScanResponse_fieldAccessorTable = new internal_static_ScanResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanResponse_descriptor, internal_static_ScanResponse_descriptor,
new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", }); new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", });
internal_static_BulkLoadHFileRequest_descriptor = internal_static_BulkLoadHFileRequest_descriptor =
getDescriptor().getMessageTypes().get(14); getDescriptor().getMessageTypes().get(14);
internal_static_BulkLoadHFileRequest_fieldAccessorTable = new internal_static_BulkLoadHFileRequest_fieldAccessorTable = new
@ -33180,6 +33563,7 @@ public final class ClientProtos {
org.apache.hadoop.hbase.protobuf.generated.FilterProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.FilterProtos.getDescriptor(),
org.apache.hadoop.hbase.protobuf.generated.CellProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.CellProtos.getDescriptor(),
org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos.getDescriptor(),
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.getDescriptor(),
}, assigner); }, assigner);
} }

View File

@ -28,6 +28,7 @@ import "HBase.proto";
import "Filter.proto"; import "Filter.proto";
import "Cell.proto"; import "Cell.proto";
import "Comparator.proto"; import "Comparator.proto";
import "MapReduce.proto";
/** /**
* The protocol buffer version of Authorizations. * The protocol buffer version of Authorizations.
@ -276,6 +277,7 @@ message ScanRequest {
optional uint64 next_call_seq = 6; optional uint64 next_call_seq = 6;
optional bool client_handles_partials = 7; optional bool client_handles_partials = 7;
optional bool client_handles_heartbeats = 8; optional bool client_handles_heartbeats = 8;
optional bool track_scan_metrics = 9;
} }
/** /**
@ -320,6 +322,11 @@ message ScanResponse {
// timing out. Seeing a heartbeat message communicates to the Client that the // timing out. Seeing a heartbeat message communicates to the Client that the
// server would have continued to scan had the time limit not been reached. // server would have continued to scan had the time limit not been reached.
optional bool heartbeat_message = 9; optional bool heartbeat_message = 9;
// This field is filled in if the client has requested that scan metrics be tracked.
// The metrics tracked here are sent back to the client to be tracked together with
// the existing client side metrics.
optional ScanMetrics scan_metrics = 10;
} }
/** /**

View File

@ -5417,6 +5417,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
nextKv = heap.peek(); nextKv = heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
if (scannerContext.checkBatchLimit(limitScope)) { if (scannerContext.checkBatchLimit(limitScope)) {
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
@ -5490,8 +5491,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// progress should be kept. // progress should be kept.
if (scannerContext.getKeepProgress()) { if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation. // Progress should be kept. Reset to initial values seen at start of method invocation.
scannerContext scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
.setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); initialTimeProgress);
} else { } else {
scannerContext.clearProgress(); scannerContext.clearProgress();
} }
@ -5556,7 +5557,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Check if rowkey filter wants to exclude this row. If so, loop to next. // Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call. // Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) { if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length); incrementCountOfRowsFilteredMetric(scannerContext);
// Typically the count of rows scanned is incremented inside #populateResult. However,
// here we are filtering a row based purely on its row key, preventing us from calling
// #populateResult. Thus, perform the necessary increment here to rows scanned metric
incrementCountOfRowsScannedMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) { if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
@ -5605,9 +5611,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear(); results.clear();
boolean moreRows = nextRow(currentRow, offset, length); boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) { if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
@ -5650,14 +5657,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Double check to prevent empty rows from appearing in result. It could be // Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used. // the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) { if (results.isEmpty()) {
boolean moreRows = nextRow(currentRow, offset, length); incrementCountOfRowsFilteredMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) { if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
if (!stopRow) continue; if (!stopRow) continue;
} }
// We are done. Return the result.
if (stopRow) { if (stopRow) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} else { } else {
@ -5666,6 +5673,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
}
protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
}
/** /**
* @param currentRow * @param currentRow
* @param offset * @param offset
@ -5712,7 +5731,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
&& filter.filterRowKey(row, offset, length); && filter.filterRowKey(row, offset, length);
} }
protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException { protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
short length) throws IOException {
assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read."; assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
Cell next; Cell next;
while ((next = this.storeHeap.peek()) != null && while ((next = this.storeHeap.peek()) != null &&
@ -5720,6 +5740,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.storeHeap.next(MOCKED_LIST); this.storeHeap.next(MOCKED_LIST);
} }
resetFilters(); resetFilters();
// Calling the hook in CP which allows it to do a fast forward // Calling the hook in CP which allows it to do a fast forward
return this.region.getCoprocessorHost() == null return this.region.getCoprocessorHost() == null
|| this.region.getCoprocessorHost() || this.region.getCoprocessorHost()

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
public class NoLimitScannerContext extends ScannerContext { public class NoLimitScannerContext extends ScannerContext {
public NoLimitScannerContext() { public NoLimitScannerContext() {
super(false, null); super(false, null, false);
} }
/** /**

View File

@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@ -141,9 +142,11 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@ -2334,12 +2337,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final LimitScope timeScope = final LimitScope timeScope =
allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
boolean trackMetrics =
request.hasTrackScanMetrics() && request.getTrackScanMetrics();
// Configure with limits for this RPC. Set keep progress true since size progress // Configure with limits for this RPC. Set keep progress true since size progress
// towards size limit should be kept between calls to nextRaw // towards size limit should be kept between calls to nextRaw
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
contextBuilder.setSizeLimit(sizeScope, maxResultSize); contextBuilder.setSizeLimit(sizeScope, maxResultSize);
contextBuilder.setBatchLimit(scanner.getBatch()); contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
ScannerContext scannerContext = contextBuilder.build(); ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false; boolean limitReached = false;
@ -2393,6 +2400,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// We didn't get a single batch // We didn't get a single batch
builder.setMoreResultsInRegion(false); builder.setMoreResultsInRegion(false);
} }
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
if (trackMetrics) {
Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
for (Entry<String, Long> entry : metrics.entrySet()) {
pairBuilder.setName(entry.getKey());
pairBuilder.setValue(entry.getValue());
metricBuilder.addMetrics(pairBuilder.build());
}
builder.setScanMetrics(metricBuilder.build());
}
} }
region.updateReadRequestsCount(i); region.updateReadRequestsCount(i);
region.getMetrics().updateScanNext(totalCellSize); region.getMetrics().updateScanNext(totalCellSize);

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
@ -63,13 +63,14 @@ class ReversedRegionScannerImpl extends RegionScannerImpl {
} }
@Override @Override
protected boolean nextRow(byte[] currentRow, int offset, short length) protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
throws IOException { short length) throws IOException {
assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
byte row[] = new byte[length]; byte row[] = new byte[length];
System.arraycopy(currentRow, offset, row, 0, length); System.arraycopy(currentRow, offset, row, 0, length);
this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row));
resetFilters(); resetFilters();
// Calling the hook in CP which allows it to do a fast forward // Calling the hook in CP which allows it to do a fast forward
if (this.region.getCoprocessorHost() != null) { if (this.region.getCoprocessorHost() != null) {
return this.region.getCoprocessorHost().postScannerFilterRow(this, return this.region.getCoprocessorHost().postScannerFilterRow(this,

View File

@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
/** /**
* ScannerContext instances encapsulate limit tracking AND progress towards those limits during * ScannerContext instances encapsulate limit tracking AND progress towards those limits during
@ -96,7 +97,12 @@ public class ScannerContext {
boolean keepProgress; boolean keepProgress;
private static boolean DEFAULT_KEEP_PROGRESS = false; private static boolean DEFAULT_KEEP_PROGRESS = false;
ScannerContext(boolean keepProgress, LimitFields limitsToCopy) { /**
* Tracks the relevant server side metrics during scans. null when metrics should not be tracked
*/
final ServerSideScanMetrics metrics;
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
this.limits = new LimitFields(); this.limits = new LimitFields();
if (limitsToCopy != null) this.limits.copy(limitsToCopy); if (limitsToCopy != null) this.limits.copy(limitsToCopy);
@ -105,6 +111,21 @@ public class ScannerContext {
this.keepProgress = keepProgress; this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE; this.scannerState = DEFAULT_STATE;
this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
}
boolean isTrackingMetrics() {
return this.metrics != null;
}
/**
* Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()}
* has been made to confirm that metrics are indeed being tracked.
* @return {@link ServerSideScanMetrics} instance that is tracking metrics for this scan
*/
ServerSideScanMetrics getMetrics() {
assert isTrackingMetrics();
return this.metrics;
} }
/** /**
@ -331,6 +352,7 @@ public class ScannerContext {
public static final class Builder { public static final class Builder {
boolean keepProgress = DEFAULT_KEEP_PROGRESS; boolean keepProgress = DEFAULT_KEEP_PROGRESS;
boolean trackMetrics = false;
LimitFields limits = new LimitFields(); LimitFields limits = new LimitFields();
private Builder() { private Builder() {
@ -345,6 +367,11 @@ public class ScannerContext {
return this; return this;
} }
public Builder setTrackMetrics(boolean trackMetrics) {
this.trackMetrics = trackMetrics;
return this;
}
public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) { public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) {
limits.setSize(sizeLimit); limits.setSize(sizeLimit);
limits.setSizeScope(sizeScope); limits.setSizeScope(sizeScope);
@ -363,7 +390,7 @@ public class ScannerContext {
} }
public ScannerContext build() { public ScannerContext build() {
return new ScannerContext(keepProgress, limits); return new ScannerContext(keepProgress, limits, trackMetrics);
} }
} }

View File

@ -0,0 +1,324 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestServerSideScanMetricsFromClientSide {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Table TABLE = null;
/**
* Table configuration
*/
private static TableName TABLE_NAME = TableName.valueOf("testTable");
private static int NUM_ROWS = 10;
private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
// Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then
// table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which
// breaks the simple generation of expected kv's
private static int NUM_FAMILIES = 1;
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
private static int NUM_QUALIFIERS = 1;
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
private static int VALUE_SIZE = 10;
private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS;
// Approximation of how large the heap size of cells in our table. Should be accessed through
// getCellHeapSize().
private static long CELL_HEAP_SIZE = -1;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
}
static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
byte[][] qualifiers, byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
return ht;
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Make puts to put the input value into each combination of row, family, and qualifier
* @param rows
* @param families
* @param qualifiers
* @param value
* @return
* @throws IOException
*/
static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] value) throws IOException {
Put put;
ArrayList<Put> puts = new ArrayList<>();
for (int row = 0; row < rows.length; row++) {
put = new Put(rows[row]);
for (int fam = 0; fam < families.length; fam++) {
for (int qual = 0; qual < qualifiers.length; qual++) {
KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
put.add(kv);
}
}
puts.add(put);
}
return puts;
}
/**
* @return The approximate heap size of a cell in the test table. All cells should have
* approximately the same heap size, so the value is cached to avoid repeating the
* calculation
* @throws Exception
*/
private long getCellHeapSize() throws Exception {
if (CELL_HEAP_SIZE == -1) {
// Do a partial scan that will return a single result with a single cell
Scan scan = new Scan();
scan.setMaxResultSize(1);
scan.setAllowPartialResults(true);
ResultScanner scanner = TABLE.getScanner(scan);
Result result = scanner.next();
assertTrue(result != null);
assertTrue(result.rawCells() != null);
assertTrue(result.rawCells().length == 1);
CELL_HEAP_SIZE = CellUtil.estimatedHeapSizeOf(result.rawCells()[0]);
scanner.close();
}
return CELL_HEAP_SIZE;
}
@Test
public void testRowsSeenMetric() throws Exception {
// Base scan configuration
Scan baseScan;
baseScan = new Scan();
baseScan.setScanMetricsEnabled(true);
testRowsSeenMetric(baseScan);
// Test case that only a single result will be returned per RPC to the serer
baseScan.setCaching(1);
testRowsSeenMetric(baseScan);
// Test case that partial results are returned from the server. At most one cell will be
// contained in each response
baseScan.setMaxResultSize(1);
testRowsSeenMetric(baseScan);
// Test case that size limit is set such that a few cells are returned per partial result from
// the server
baseScan.setCaching(NUM_ROWS);
baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1));
testRowsSeenMetric(baseScan);
}
public void testRowsSeenMetric(Scan baseScan) throws Exception {
Scan scan;
scan = new Scan(baseScan);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, NUM_ROWS);
for (int i = 0; i < ROWS.length - 1; i++) {
scan = new Scan(baseScan);
scan.setStartRow(ROWS[0]);
scan.setStopRow(ROWS[i + 1]);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, i + 1);
}
for (int i = ROWS.length - 1; i > 0; i--) {
scan = new Scan(baseScan);
scan.setStartRow(ROWS[i - 1]);
scan.setStopRow(ROWS[ROWS.length - 1]);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length - i);
}
// The filter should filter out all rows, but we still expect to see every row.
Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator("xyz".getBytes()));
scan = new Scan(baseScan);
scan.setFilter(filter);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length);
// Filter should pass on all rows
SingleColumnValueFilter singleColumnValueFilter =
new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.EQUAL, VALUE);
scan = new Scan(baseScan);
scan.setFilter(singleColumnValueFilter);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length);
// Filter should filter out all rows
singleColumnValueFilter =
new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.NOT_EQUAL, VALUE);
scan = new Scan(baseScan);
scan.setFilter(singleColumnValueFilter);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length);
}
@Test
public void testRowsFilteredMetric() throws Exception {
// Base scan configuration
Scan baseScan;
baseScan = new Scan();
baseScan.setScanMetricsEnabled(true);
// Test case where scan uses default values
testRowsFilteredMetric(baseScan);
// Test case where at most one Result is retrieved per RPC
baseScan.setCaching(1);
testRowsFilteredMetric(baseScan);
// Test case where size limit is very restrictive and partial results will be returned from
// server
baseScan.setMaxResultSize(1);
testRowsFilteredMetric(baseScan);
// Test a case where max result size limits response from server to only a few cells (not all
// cells from the row)
baseScan.setCaching(NUM_ROWS);
baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1));
testRowsSeenMetric(baseScan);
}
public void testRowsFilteredMetric(Scan baseScan) throws Exception {
testRowsFilteredMetric(baseScan, null, 0);
// Row filter doesn't match any row key. All rows should be filtered
Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator("xyz".getBytes()));
testRowsFilteredMetric(baseScan, filter, ROWS.length);
// Filter will return results containing only the first key. Number of entire rows filtered
// should be 0.
filter = new FirstKeyOnlyFilter();
testRowsFilteredMetric(baseScan, filter, 0);
// Column prefix will find some matching qualifier on each row. Number of entire rows filtered
// should be 0
filter = new ColumnPrefixFilter(QUALIFIERS[0]);
testRowsFilteredMetric(baseScan, filter, 0);
// Column prefix will NOT find any matching qualifier on any row. All rows should be filtered
filter = new ColumnPrefixFilter("xyz".getBytes());
testRowsFilteredMetric(baseScan, filter, ROWS.length);
// Matching column value should exist in each row. No rows should be filtered.
filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.EQUAL, VALUE);
testRowsFilteredMetric(baseScan, filter, 0);
// No matching column value should exist in any row. Filter all rows
filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.NOT_EQUAL, VALUE);
testRowsFilteredMetric(baseScan, filter, ROWS.length);
List<Filter> filters = new ArrayList<Filter>();
filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(ROWS[0])));
filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(ROWS[3])));
int numberOfMatchingRowFilters = filters.size();
filter = new FilterList(Operator.MUST_PASS_ONE, filters);
testRowsFilteredMetric(baseScan, filter, ROWS.length - numberOfMatchingRowFilters);
filters.clear();
// Add a single column value exclude filter for each column... The net effect is that all
// columns will be excluded when scanning on the server side. This will result in an empty cell
// array in RegionScanner#nextInternal which should be interpreted as a row being filtered.
for (int family = 0; family < FAMILIES.length; family++) {
for (int qualifier = 0; qualifier < QUALIFIERS.length; qualifier++) {
filters.add(new SingleColumnValueExcludeFilter(FAMILIES[family], QUALIFIERS[qualifier],
CompareOp.EQUAL, VALUE));
}
}
filter = new FilterList(Operator.MUST_PASS_ONE, filters);
testRowsFilteredMetric(baseScan, filter, ROWS.length);
}
public void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered)
throws Exception {
Scan scan = new Scan(baseScan);
if (filter != null) scan.setFilter(filter);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY, expectedNumFiltered);
}
/**
* Run the scan to completetion and check the metric against the specified value
* @param scan
* @param metricKey
* @param expectedValue
* @throws Exception
*/
public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
ResultScanner scanner = TABLE.getScanner(scan);
// Iterate through all the results
for (Result r : scanner) {
}
scanner.close();
ScanMetrics metrics = scan.getScanMetrics();
assertTrue("Metrics are null", metrics != null);
assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
final long actualMetricValue = metrics.getCounter(metricKey).get();
assertEquals("Metric: " + metricKey + " Expected: " + expectedValue + " Actual: "
+ actualMetricValue, expectedValue, actualMetricValue);
}
}

View File

@ -49,6 +49,8 @@ module HBaseConstants
METHOD = "METHOD" METHOD = "METHOD"
MAXLENGTH = "MAXLENGTH" MAXLENGTH = "MAXLENGTH"
CACHE_BLOCKS = "CACHE_BLOCKS" CACHE_BLOCKS = "CACHE_BLOCKS"
ALL_METRICS = "ALL_METRICS"
METRICS = "METRICS"
REVERSED = "REVERSED" REVERSED = "REVERSED"
REPLICATION_SCOPE = "REPLICATION_SCOPE" REPLICATION_SCOPE = "REPLICATION_SCOPE"
INTERVAL = 'INTERVAL' INTERVAL = 'INTERVAL'

View File

@ -407,6 +407,8 @@ EOF
def _hash_to_scan(args) def _hash_to_scan(args)
if args.any? if args.any?
enablemetrics = args["ALL_METRICS"].nil? ? false : args["ALL_METRICS"]
enablemetrics = enablemetrics || !args["METRICS"].nil?
filter = args["FILTER"] filter = args["FILTER"]
startrow = args["STARTROW"] || '' startrow = args["STARTROW"] || ''
stoprow = args["STOPROW"] stoprow = args["STOPROW"]
@ -454,6 +456,7 @@ EOF
scan.setFilter(org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter)) scan.setFilter(org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter))
end end
scan.setScanMetricsEnabled(enablemetrics) if enablemetrics
scan.setTimeStamp(timestamp) if timestamp scan.setTimeStamp(timestamp) if timestamp
scan.setCacheBlocks(cache_blocks) scan.setCacheBlocks(cache_blocks)
scan.setReversed(reversed) scan.setReversed(reversed)
@ -478,8 +481,10 @@ EOF
#---------------------------------------------------------------------------------------------- #----------------------------------------------------------------------------------------------
# Scans whole table or a range of keys and returns rows matching specific criteria # Scans whole table or a range of keys and returns rows matching specific criteria
def _scan_internal(args = {}) def _scan_internal(args = {}, scan = nil)
raise(ArgumentError, "Arguments should be a Hash") unless args.kind_of?(Hash) raise(ArgumentError, "Args should be a Hash") unless args.kind_of?(Hash)
raise(ArgumentError, "Scan argument should be org.apache.hadoop.hbase.client.Scan") \
unless scan == nil || scan.kind_of?(org.apache.hadoop.hbase.client.Scan)
limit = args["LIMIT"] || -1 limit = args["LIMIT"] || -1
maxlength = args.delete("MAXLENGTH") || -1 maxlength = args.delete("MAXLENGTH") || -1
@ -489,7 +494,8 @@ EOF
@converters.clear() @converters.clear()
# Start the scanner # Start the scanner
scanner = @table.getScanner(_hash_to_scan(args)) scan = scan == nil ? _hash_to_scan(args) : scan
scanner = @table.getScanner(scan)
iter = scanner.iterator iter = scanner.iterator
# Iterate results # Iterate results
@ -519,6 +525,7 @@ EOF
break break
end end
end end
scanner.close()
return ((block_given?) ? count : res) return ((block_given?) ? count : res)
end end

View File

@ -25,7 +25,7 @@ module Shell
Scan a table; pass table name and optionally a dictionary of scanner Scan a table; pass table name and optionally a dictionary of scanner
specifications. Scanner specifications may include one or more of: specifications. Scanner specifications may include one or more of:
TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, ROWPREFIXFILTER, TIMESTAMP, TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, ROWPREFIXFILTER, TIMESTAMP,
MAXLENGTH or COLUMNS, CACHE or RAW, VERSIONS MAXLENGTH or COLUMNS, CACHE or RAW, VERSIONS, ALL_METRICS or METRICS
If no columns are specified, all columns will be scanned. If no columns are specified, all columns will be scanned.
To scan all members of a column family, leave the qualifier empty as in To scan all members of a column family, leave the qualifier empty as in
@ -36,6 +36,11 @@ The filter can be specified in two ways:
Filter Language document attached to the HBASE-4176 JIRA Filter Language document attached to the HBASE-4176 JIRA
2. Using the entire package name of the filter. 2. Using the entire package name of the filter.
If you wish to see metrics regarding the execution of the scan, the
ALL_METRICS boolean should be set to true. Alternatively, if you would
prefer to see only a subset of the metrics, the METRICS array can be
defined to include the names of only the metrics you care about.
Some examples: Some examples:
hbase> scan 'hbase:meta' hbase> scan 'hbase:meta'
@ -44,6 +49,8 @@ Some examples:
hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'} hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}
hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]} hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]}
hbase> scan 't1', {REVERSED => true} hbase> scan 't1', {REVERSED => true}
hbase> scan 't1', {ALL_METRICS => true}
hbase> scan 't1', {METRICS => ['RPC_RETRIES', 'ROWS_FILTERED']}
hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => " hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => "
(QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"} (QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"}
hbase> scan 't1', {FILTER => hbase> scan 't1', {FILTER =>
@ -100,12 +107,18 @@ EOF
now = Time.now now = Time.now
formatter.header(["ROW", "COLUMN+CELL"]) formatter.header(["ROW", "COLUMN+CELL"])
scan = table._hash_to_scan(args)
#actually do the scanning #actually do the scanning
count = table._scan_internal(args) do |row, cells| count = table._scan_internal(args, scan) do |row, cells|
formatter.row([ row, cells ]) formatter.row([ row, cells ])
end end
formatter.footer(now, count) formatter.footer(now, count)
# if scan metrics were enabled, print them after the results
if (scan != nil && scan.isScanMetricsEnabled())
formatter.scan_metrics(scan.getScanMetrics(), args["METRICS"])
end
end end
end end
end end

View File

@ -112,6 +112,37 @@ module Shell
@row_count += 1 @row_count += 1
end end
# Output the scan metrics. Can be filtered to output only those metrics whose keys exists
# in the metric_filter
def scan_metrics(scan_metrics = nil, metric_filter = [])
return if scan_metrics == nil
raise(ArgumentError, \
"Argument should be org.apache.hadoop.hbase.client.metrics.ScanMetrics") \
unless scan_metrics.kind_of?(org.apache.hadoop.hbase.client.metrics.ScanMetrics)
# prefix output with empty line
@out.puts
# save row count to restore after printing metrics
# (metrics should not count towards row count)
saved_row_count = @row_count
iter = scan_metrics.getMetricsMap().entrySet().iterator()
metric_hash = Hash.new()
# put keys in hash so they can be sorted easily
while iter.hasNext
metric = iter.next
metric_hash[metric.getKey.to_s] = metric.getValue.to_s
end
# print in alphabetical order
row(["METRIC", "VALUE"], false)
metric_hash.sort.map do |key, value|
if (not metric_filter or metric_filter.length == 0 or metric_filter.include?(key))
row([key, value])
end
end
@row_count = saved_row_count
return
end
def split(width, str) def split(width, str)
if width == 0 if width == 0
return [str] return [str]