HBASE-5980 Scanner responses from RS should include metrics on rows/KVs filtered

This commit is contained in:
stack 2015-05-21 11:09:14 -07:00
parent 109f138ca3
commit 078a9a97c3
15 changed files with 716 additions and 193 deletions

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -205,7 +207,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
setHeartbeatMessage(false); setHeartbeatMessage(false);
try { try {
incRPCcallsMetrics(); incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); request =
RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
this.scanMetrics != null);
ScanResponse response = null; ScanResponse response = null;
controller = controllerFactory.newController(); controller = controllerFactory.newController();
controller.setPriority(getTableName()); controller.setPriority(getTableName());
@ -235,6 +239,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
+ rows + " rows from scanner=" + scannerId); + rows + " rows from scanner=" + scannerId);
} }
} }
updateServerSideMetrics(response);
// moreResults is only used for the case where a filter exhausts all elements // moreResults is only used for the case where a filter exhausts all elements
if (response.hasMoreResults() && !response.getMoreResults()) { if (response.hasMoreResults() && !response.getMoreResults()) {
scannerId = -1L; scannerId = -1L;
@ -344,6 +349,21 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} }
} }
/**
* Use the scan metrics returned by the server to add to the identically named counters in the
* client side metrics. If a counter does not exist with the same name as the server side metric,
* the attempt to increase the counter will fail.
* @param response
*/
private void updateServerSideMetrics(ScanResponse response) {
if (this.scanMetrics == null || response == null || !response.hasScanMetrics()) return;
Map<String, Long> serverMetrics = ResponseConverter.getScanMetrics(response);
for (Entry<String, Long> entry : serverMetrics.entrySet()) {
this.scanMetrics.addToCounter(entry.getKey(), entry.getValue());
}
}
private void close() { private void close() {
if (this.scannerId == -1L) { if (this.scannerId == -1L) {
return; return;
@ -351,7 +371,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
try { try {
incRPCcallsMetrics(); incRPCcallsMetrics();
ScanRequest request = ScanRequest request =
RequestConverter.buildScanRequest(this.scannerId, 0, true); RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
try { try {
getStub().scan(null, request); getStub().scan(null, request);
} catch (ServiceException se) { } catch (ServiceException se) {

View File

@ -18,41 +18,32 @@
package org.apache.hadoop.hbase.client.metrics; package org.apache.hadoop.hbase.client.metrics;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import com.google.common.collect.ImmutableMap;
/** /**
* Provides client-side metrics related to scan operations. * Provides metrics related to scan operations (both server side and client side metrics).
* <p>
* The data can be passed to mapreduce framework or other systems. * The data can be passed to mapreduce framework or other systems.
* We use atomic longs so that one thread can increment, * We use atomic longs so that one thread can increment,
* while another atomically resets to zero after the values are reported * while another atomically resets to zero after the values are reported
* to hadoop's counters. * to hadoop's counters.
* * <p>
* Some of these metrics are general for any client operation such as put * Some of these metrics are general for any client operation such as put
* However, there is no need for this. So they are defined under scan operation * However, there is no need for this. So they are defined under scan operation
* for now. * for now.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ScanMetrics { public class ScanMetrics extends ServerSideScanMetrics {
/** // AtomicLongs to hold the metrics values. These are all updated through ClientScanner and
* Hash to hold the String -> Atomic Long mappings. // ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the
*/
private final Map<String, AtomicLong> counters = new HashMap<String, AtomicLong>();
// AtomicLongs to hold the metrics values. These are all updated through ClientScanner and
// ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the
// values after progress is passed to hadoop's counters. // values after progress is passed to hadoop's counters.
/** /**
* number of RPC calls * number of RPC calls
*/ */
@ -103,36 +94,4 @@ public class ScanMetrics {
*/ */
public ScanMetrics() { public ScanMetrics() {
} }
private AtomicLong createCounter(String counterName) {
AtomicLong c = new AtomicLong(0);
counters.put(counterName, c);
return c;
}
public void setCounter(String counterName, long value) {
AtomicLong c = this.counters.get(counterName);
if (c != null) {
c.set(value);
}
}
/**
* Get all of the values since the last time this function was called.
*
* Calling this function will reset all AtomicLongs in the instance back to 0.
*
* @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap() {
//Create a builder
ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
//For every entry add the value and reset the AtomicLong back to zero
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
builder.put(e.getKey(), e.getValue().getAndSet(0));
}
//Build the immutable map so that people can't mess around with it.
return builder.build();
}
} }

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@ -63,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
@ -481,9 +481,8 @@ public final class RequestConverter {
* @return a scan request * @return a scan request
* @throws IOException * @throws IOException
*/ */
public static ScanRequest buildScanRequest(final byte[] regionName, public static ScanRequest buildScanRequest(final byte[] regionName, final Scan scan,
final Scan scan, final int numberOfRows, final int numberOfRows, final boolean closeScanner) throws IOException {
final boolean closeScanner) throws IOException {
ScanRequest.Builder builder = ScanRequest.newBuilder(); ScanRequest.Builder builder = ScanRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier( RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName); RegionSpecifierType.REGION_NAME, regionName);
@ -493,6 +492,7 @@ public final class RequestConverter {
builder.setScan(ProtobufUtil.toScan(scan)); builder.setScan(ProtobufUtil.toScan(scan));
builder.setClientHandlesPartials(true); builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true); builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(scan != null && scan.isScanMetricsEnabled());
return builder.build(); return builder.build();
} }
@ -504,14 +504,15 @@ public final class RequestConverter {
* @param closeScanner * @param closeScanner
* @return a scan request * @return a scan request
*/ */
public static ScanRequest buildScanRequest(final long scannerId, public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
final int numberOfRows, final boolean closeScanner) { final boolean closeScanner, final boolean trackMetrics) {
ScanRequest.Builder builder = ScanRequest.newBuilder(); ScanRequest.Builder builder = ScanRequest.newBuilder();
builder.setNumberOfRows(numberOfRows); builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner); builder.setCloseScanner(closeScanner);
builder.setScannerId(scannerId); builder.setScannerId(scannerId);
builder.setClientHandlesPartials(true); builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true); builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(trackMetrics);
return builder.build(); return builder.build();
} }
@ -525,7 +526,7 @@ public final class RequestConverter {
* @return a scan request * @return a scan request
*/ */
public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows, public static ScanRequest buildScanRequest(final long scannerId, final int numberOfRows,
final boolean closeScanner, final long nextCallSeq) { final boolean closeScanner, final long nextCallSeq, final boolean trackMetrics) {
ScanRequest.Builder builder = ScanRequest.newBuilder(); ScanRequest.Builder builder = ScanRequest.newBuilder();
builder.setNumberOfRows(numberOfRows); builder.setNumberOfRows(numberOfRows);
builder.setCloseScanner(closeScanner); builder.setCloseScanner(closeScanner);
@ -533,6 +534,7 @@ public final class RequestConverter {
builder.setNextCallSeq(nextCallSeq); builder.setNextCallSeq(nextCallSeq);
builder.setClientHandlesPartials(true); builder.setClientHandlesPartials(true);
builder.setClientHandlesHeartbeats(true); builder.setClientHandlesHeartbeats(true);
builder.setTrackScanMetrics(trackMetrics);
return builder.build(); return builder.build();
} }

View File

@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.protobuf;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
@ -375,4 +379,26 @@ public final class ResponseConverter {
} }
return results; return results;
} }
public static Map<String, Long> getScanMetrics(ScanResponse response) {
Map<String, Long> metricMap = new HashMap<String, Long>();
if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
return metricMap;
}
ScanMetrics metrics = response.getScanMetrics();
int numberOfMetrics = metrics.getMetricsCount();
for (int i = 0; i < numberOfMetrics; i++) {
NameInt64Pair metricPair = metrics.getMetrics(i);
if (metricPair != null) {
String name = metricPair.getName();
Long value = metricPair.getValue();
if (name != null && value != null) {
metricMap.put(name, value);
}
}
}
return metricMap;
}
} }

View File

@ -16443,6 +16443,16 @@ public final class ClientProtos {
* <code>optional bool client_handles_heartbeats = 8;</code> * <code>optional bool client_handles_heartbeats = 8;</code>
*/ */
boolean getClientHandlesHeartbeats(); boolean getClientHandlesHeartbeats();
// optional bool track_scan_metrics = 9;
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
boolean hasTrackScanMetrics();
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
boolean getTrackScanMetrics();
} }
/** /**
* Protobuf type {@code ScanRequest} * Protobuf type {@code ScanRequest}
@ -16564,6 +16574,11 @@ public final class ClientProtos {
clientHandlesHeartbeats_ = input.readBool(); clientHandlesHeartbeats_ = input.readBool();
break; break;
} }
case 72: {
bitField0_ |= 0x00000100;
trackScanMetrics_ = input.readBool();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -16744,6 +16759,22 @@ public final class ClientProtos {
return clientHandlesHeartbeats_; return clientHandlesHeartbeats_;
} }
// optional bool track_scan_metrics = 9;
public static final int TRACK_SCAN_METRICS_FIELD_NUMBER = 9;
private boolean trackScanMetrics_;
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public boolean hasTrackScanMetrics() {
return ((bitField0_ & 0x00000100) == 0x00000100);
}
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public boolean getTrackScanMetrics() {
return trackScanMetrics_;
}
private void initFields() { private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance(); scan_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan.getDefaultInstance();
@ -16753,6 +16784,7 @@ public final class ClientProtos {
nextCallSeq_ = 0L; nextCallSeq_ = 0L;
clientHandlesPartials_ = false; clientHandlesPartials_ = false;
clientHandlesHeartbeats_ = false; clientHandlesHeartbeats_ = false;
trackScanMetrics_ = false;
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -16802,6 +16834,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000080) == 0x00000080)) { if (((bitField0_ & 0x00000080) == 0x00000080)) {
output.writeBool(8, clientHandlesHeartbeats_); output.writeBool(8, clientHandlesHeartbeats_);
} }
if (((bitField0_ & 0x00000100) == 0x00000100)) {
output.writeBool(9, trackScanMetrics_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -16843,6 +16878,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(8, clientHandlesHeartbeats_); .computeBoolSize(8, clientHandlesHeartbeats_);
} }
if (((bitField0_ & 0x00000100) == 0x00000100)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(9, trackScanMetrics_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -16906,6 +16945,11 @@ public final class ClientProtos {
result = result && (getClientHandlesHeartbeats() result = result && (getClientHandlesHeartbeats()
== other.getClientHandlesHeartbeats()); == other.getClientHandlesHeartbeats());
} }
result = result && (hasTrackScanMetrics() == other.hasTrackScanMetrics());
if (hasTrackScanMetrics()) {
result = result && (getTrackScanMetrics()
== other.getTrackScanMetrics());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -16951,6 +16995,10 @@ public final class ClientProtos {
hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER; hash = (37 * hash) + CLIENT_HANDLES_HEARTBEATS_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats()); hash = (53 * hash) + hashBoolean(getClientHandlesHeartbeats());
} }
if (hasTrackScanMetrics()) {
hash = (37 * hash) + TRACK_SCAN_METRICS_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getTrackScanMetrics());
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -17099,6 +17147,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000040); bitField0_ = (bitField0_ & ~0x00000040);
clientHandlesHeartbeats_ = false; clientHandlesHeartbeats_ = false;
bitField0_ = (bitField0_ & ~0x00000080); bitField0_ = (bitField0_ & ~0x00000080);
trackScanMetrics_ = false;
bitField0_ = (bitField0_ & ~0x00000100);
return this; return this;
} }
@ -17167,6 +17217,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000080; to_bitField0_ |= 0x00000080;
} }
result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_; result.clientHandlesHeartbeats_ = clientHandlesHeartbeats_;
if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
to_bitField0_ |= 0x00000100;
}
result.trackScanMetrics_ = trackScanMetrics_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -17207,6 +17261,9 @@ public final class ClientProtos {
if (other.hasClientHandlesHeartbeats()) { if (other.hasClientHandlesHeartbeats()) {
setClientHandlesHeartbeats(other.getClientHandlesHeartbeats()); setClientHandlesHeartbeats(other.getClientHandlesHeartbeats());
} }
if (other.hasTrackScanMetrics()) {
setTrackScanMetrics(other.getTrackScanMetrics());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -17678,6 +17735,39 @@ public final class ClientProtos {
return this; return this;
} }
// optional bool track_scan_metrics = 9;
private boolean trackScanMetrics_ ;
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public boolean hasTrackScanMetrics() {
return ((bitField0_ & 0x00000100) == 0x00000100);
}
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public boolean getTrackScanMetrics() {
return trackScanMetrics_;
}
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public Builder setTrackScanMetrics(boolean value) {
bitField0_ |= 0x00000100;
trackScanMetrics_ = value;
onChanged();
return this;
}
/**
* <code>optional bool track_scan_metrics = 9;</code>
*/
public Builder clearTrackScanMetrics() {
bitField0_ = (bitField0_ & ~0x00000100);
trackScanMetrics_ = false;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:ScanRequest) // @@protoc_insertion_point(builder_scope:ScanRequest)
} }
@ -17920,6 +18010,38 @@ public final class ClientProtos {
* </pre> * </pre>
*/ */
boolean getHeartbeatMessage(); boolean getHeartbeatMessage();
// optional .ScanMetrics scan_metrics = 10;
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
boolean hasScanMetrics();
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics();
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder();
} }
/** /**
* Protobuf type {@code ScanResponse} * Protobuf type {@code ScanResponse}
@ -18058,6 +18180,19 @@ public final class ClientProtos {
heartbeatMessage_ = input.readBool(); heartbeatMessage_ = input.readBool();
break; break;
} }
case 82: {
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder subBuilder = null;
if (((bitField0_ & 0x00000040) == 0x00000040)) {
subBuilder = scanMetrics_.toBuilder();
}
scanMetrics_ = input.readMessage(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(scanMetrics_);
scanMetrics_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000040;
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -18401,6 +18536,46 @@ public final class ClientProtos {
return heartbeatMessage_; return heartbeatMessage_;
} }
// optional .ScanMetrics scan_metrics = 10;
public static final int SCAN_METRICS_FIELD_NUMBER = 10;
private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_;
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public boolean hasScanMetrics() {
return ((bitField0_ & 0x00000040) == 0x00000040);
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() {
return scanMetrics_;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() {
return scanMetrics_;
}
private void initFields() { private void initFields() {
cellsPerResult_ = java.util.Collections.emptyList(); cellsPerResult_ = java.util.Collections.emptyList();
scannerId_ = 0L; scannerId_ = 0L;
@ -18411,6 +18586,7 @@ public final class ClientProtos {
partialFlagPerResult_ = java.util.Collections.emptyList(); partialFlagPerResult_ = java.util.Collections.emptyList();
moreResultsInRegion_ = false; moreResultsInRegion_ = false;
heartbeatMessage_ = false; heartbeatMessage_ = false;
scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -18451,6 +18627,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000020) == 0x00000020)) { if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBool(9, heartbeatMessage_); output.writeBool(9, heartbeatMessage_);
} }
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeMessage(10, scanMetrics_);
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -18503,6 +18682,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(9, heartbeatMessage_); .computeBoolSize(9, heartbeatMessage_);
} }
if (((bitField0_ & 0x00000040) == 0x00000040)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(10, scanMetrics_);
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -18562,6 +18745,11 @@ public final class ClientProtos {
result = result && (getHeartbeatMessage() result = result && (getHeartbeatMessage()
== other.getHeartbeatMessage()); == other.getHeartbeatMessage());
} }
result = result && (hasScanMetrics() == other.hasScanMetrics());
if (hasScanMetrics()) {
result = result && getScanMetrics()
.equals(other.getScanMetrics());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -18611,6 +18799,10 @@ public final class ClientProtos {
hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER; hash = (37 * hash) + HEARTBEAT_MESSAGE_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getHeartbeatMessage()); hash = (53 * hash) + hashBoolean(getHeartbeatMessage());
} }
if (hasScanMetrics()) {
hash = (37 * hash) + SCAN_METRICS_FIELD_NUMBER;
hash = (53 * hash) + getScanMetrics().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -18719,6 +18911,7 @@ public final class ClientProtos {
private void maybeForceBuilderInitialization() { private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getResultsFieldBuilder(); getResultsFieldBuilder();
getScanMetricsFieldBuilder();
} }
} }
private static Builder create() { private static Builder create() {
@ -18749,6 +18942,12 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000080); bitField0_ = (bitField0_ & ~0x00000080);
heartbeatMessage_ = false; heartbeatMessage_ = false;
bitField0_ = (bitField0_ & ~0x00000100); bitField0_ = (bitField0_ & ~0x00000100);
if (scanMetricsBuilder_ == null) {
scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
} else {
scanMetricsBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000200);
return this; return this;
} }
@ -18820,6 +19019,14 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000020; to_bitField0_ |= 0x00000020;
} }
result.heartbeatMessage_ = heartbeatMessage_; result.heartbeatMessage_ = heartbeatMessage_;
if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
to_bitField0_ |= 0x00000040;
}
if (scanMetricsBuilder_ == null) {
result.scanMetrics_ = scanMetrics_;
} else {
result.scanMetrics_ = scanMetricsBuilder_.build();
}
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -18900,6 +19107,9 @@ public final class ClientProtos {
if (other.hasHeartbeatMessage()) { if (other.hasHeartbeatMessage()) {
setHeartbeatMessage(other.getHeartbeatMessage()); setHeartbeatMessage(other.getHeartbeatMessage());
} }
if (other.hasScanMetrics()) {
mergeScanMetrics(other.getScanMetrics());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -19797,6 +20007,177 @@ public final class ClientProtos {
return this; return this;
} }
// optional .ScanMetrics scan_metrics = 10;
private org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder> scanMetricsBuilder_;
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public boolean hasScanMetrics() {
return ((bitField0_ & 0x00000200) == 0x00000200);
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics getScanMetrics() {
if (scanMetricsBuilder_ == null) {
return scanMetrics_;
} else {
return scanMetricsBuilder_.getMessage();
}
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public Builder setScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) {
if (scanMetricsBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
scanMetrics_ = value;
onChanged();
} else {
scanMetricsBuilder_.setMessage(value);
}
bitField0_ |= 0x00000200;
return this;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public Builder setScanMetrics(
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder builderForValue) {
if (scanMetricsBuilder_ == null) {
scanMetrics_ = builderForValue.build();
onChanged();
} else {
scanMetricsBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000200;
return this;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public Builder mergeScanMetrics(org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics value) {
if (scanMetricsBuilder_ == null) {
if (((bitField0_ & 0x00000200) == 0x00000200) &&
scanMetrics_ != org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance()) {
scanMetrics_ =
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.newBuilder(scanMetrics_).mergeFrom(value).buildPartial();
} else {
scanMetrics_ = value;
}
onChanged();
} else {
scanMetricsBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000200;
return this;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public Builder clearScanMetrics() {
if (scanMetricsBuilder_ == null) {
scanMetrics_ = org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.getDefaultInstance();
onChanged();
} else {
scanMetricsBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000200);
return this;
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder getScanMetricsBuilder() {
bitField0_ |= 0x00000200;
onChanged();
return getScanMetricsFieldBuilder().getBuilder();
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
public org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder getScanMetricsOrBuilder() {
if (scanMetricsBuilder_ != null) {
return scanMetricsBuilder_.getMessageOrBuilder();
} else {
return scanMetrics_;
}
}
/**
* <code>optional .ScanMetrics scan_metrics = 10;</code>
*
* <pre>
* This field is filled in if the client has requested that scan metrics be tracked.
* The metrics tracked here are sent back to the client to be tracked together with
* the existing client side metrics.
* </pre>
*/
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder>
getScanMetricsFieldBuilder() {
if (scanMetricsBuilder_ == null) {
scanMetricsBuilder_ = new com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics.Builder, org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetricsOrBuilder>(
scanMetrics_,
getParentForChildren(),
isClean());
scanMetrics_ = null;
}
return scanMetricsBuilder_;
}
// @@protoc_insertion_point(builder_scope:ScanResponse) // @@protoc_insertion_point(builder_scope:ScanResponse)
} }
@ -32868,122 +33249,124 @@ public final class ClientProtos {
static { static {
java.lang.String[] descriptorData = { java.lang.String[] descriptorData = {
"\n\014Client.proto\032\013HBase.proto\032\014Filter.prot" + "\n\014Client.proto\032\013HBase.proto\032\014Filter.prot" +
"o\032\nCell.proto\032\020Comparator.proto\"\037\n\016Autho" + "o\032\nCell.proto\032\020Comparator.proto\032\017MapRedu" +
"rizations\022\r\n\005label\030\001 \003(\t\"$\n\016CellVisibili" + "ce.proto\"\037\n\016Authorizations\022\r\n\005label\030\001 \003(" +
"ty\022\022\n\nexpression\030\001 \002(\t\"+\n\006Column\022\016\n\006fami" + "\t\"$\n\016CellVisibility\022\022\n\nexpression\030\001 \002(\t\"" +
"ly\030\001 \002(\014\022\021\n\tqualifier\030\002 \003(\014\"\324\002\n\003Get\022\013\n\003r" + "+\n\006Column\022\016\n\006family\030\001 \002(\014\022\021\n\tqualifier\030\002" +
"ow\030\001 \002(\014\022\027\n\006column\030\002 \003(\0132\007.Column\022!\n\tatt" + " \003(\014\"\324\002\n\003Get\022\013\n\003row\030\001 \002(\014\022\027\n\006column\030\002 \003(" +
"ribute\030\003 \003(\0132\016.NameBytesPair\022\027\n\006filter\030\004" + "\0132\007.Column\022!\n\tattribute\030\003 \003(\0132\016.NameByte" +
" \001(\0132\007.Filter\022\036\n\ntime_range\030\005 \001(\0132\n.Time" + "sPair\022\027\n\006filter\030\004 \001(\0132\007.Filter\022\036\n\ntime_r" +
"Range\022\027\n\014max_versions\030\006 \001(\r:\0011\022\032\n\014cache_" + "ange\030\005 \001(\0132\n.TimeRange\022\027\n\014max_versions\030\006" +
"blocks\030\007 \001(\010:\004true\022\023\n\013store_limit\030\010 \001(\r\022", " \001(\r:\0011\022\032\n\014cache_blocks\030\007 \001(\010:\004true\022\023\n\013s",
"\024\n\014store_offset\030\t \001(\r\022\035\n\016existence_only\030" + "tore_limit\030\010 \001(\r\022\024\n\014store_offset\030\t \001(\r\022\035" +
"\n \001(\010:\005false\022!\n\022closest_row_before\030\013 \001(\010" + "\n\016existence_only\030\n \001(\010:\005false\022!\n\022closest" +
":\005false\022)\n\013consistency\030\014 \001(\0162\014.Consisten" + "_row_before\030\013 \001(\010:\005false\022)\n\013consistency\030" +
"cy:\006STRONG\"z\n\006Result\022\023\n\004cell\030\001 \003(\0132\005.Cel" + "\014 \001(\0162\014.Consistency:\006STRONG\"z\n\006Result\022\023\n" +
"l\022\035\n\025associated_cell_count\030\002 \001(\005\022\016\n\006exis" + "\004cell\030\001 \003(\0132\005.Cell\022\035\n\025associated_cell_co" +
"ts\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010:\005false\022\026\n\007partia" + "unt\030\002 \001(\005\022\016\n\006exists\030\003 \001(\010\022\024\n\005stale\030\004 \001(\010" +
"l\030\005 \001(\010:\005false\"A\n\nGetRequest\022 \n\006region\030\001" + ":\005false\022\026\n\007partial\030\005 \001(\010:\005false\"A\n\nGetRe" +
" \002(\0132\020.RegionSpecifier\022\021\n\003get\030\002 \002(\0132\004.Ge" +
"t\"&\n\013GetResponse\022\027\n\006result\030\001 \001(\0132\007.Resul" +
"t\"\200\001\n\tCondition\022\013\n\003row\030\001 \002(\014\022\016\n\006family\030\002",
" \002(\014\022\021\n\tqualifier\030\003 \002(\014\022\"\n\014compare_type\030" +
"\004 \002(\0162\014.CompareType\022\037\n\ncomparator\030\005 \002(\0132" +
"\013.Comparator\"\265\006\n\rMutationProto\022\013\n\003row\030\001 " +
"\001(\014\0220\n\013mutate_type\030\002 \001(\0162\033.MutationProto" +
".MutationType\0220\n\014column_value\030\003 \003(\0132\032.Mu" +
"tationProto.ColumnValue\022\021\n\ttimestamp\030\004 \001" +
"(\004\022!\n\tattribute\030\005 \003(\0132\016.NameBytesPair\022:\n" +
"\ndurability\030\006 \001(\0162\031.MutationProto.Durabi" +
"lity:\013USE_DEFAULT\022\036\n\ntime_range\030\007 \001(\0132\n." +
"TimeRange\022\035\n\025associated_cell_count\030\010 \001(\005",
"\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013ColumnValue\022\016\n\006famil" +
"y\030\001 \002(\014\022B\n\017qualifier_value\030\002 \003(\0132).Mutat" +
"ionProto.ColumnValue.QualifierValue\032\203\001\n\016" +
"QualifierValue\022\021\n\tqualifier\030\001 \001(\014\022\r\n\005val" +
"ue\030\002 \001(\014\022\021\n\ttimestamp\030\003 \001(\004\022.\n\013delete_ty" +
"pe\030\004 \001(\0162\031.MutationProto.DeleteType\022\014\n\004t" +
"ags\030\005 \001(\014\"W\n\nDurability\022\017\n\013USE_DEFAULT\020\000" +
"\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC_WAL\020\002\022\014\n\010SYNC_WA" +
"L\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014MutationType\022\n\n\006AP" +
"PEND\020\000\022\r\n\tINCREMENT\020\001\022\007\n\003PUT\020\002\022\n\n\006DELETE",
"\020\003\"p\n\nDeleteType\022\026\n\022DELETE_ONE_VERSION\020\000" +
"\022\034\n\030DELETE_MULTIPLE_VERSIONS\020\001\022\021\n\rDELETE" +
"_FAMILY\020\002\022\031\n\025DELETE_FAMILY_VERSION\020\003\"\207\001\n" +
"\rMutateRequest\022 \n\006region\030\001 \002(\0132\020.RegionS" +
"pecifier\022 \n\010mutation\030\002 \002(\0132\016.MutationPro" +
"to\022\035\n\tcondition\030\003 \001(\0132\n.Condition\022\023\n\013non" +
"ce_group\030\004 \001(\004\"<\n\016MutateResponse\022\027\n\006resu" +
"lt\030\001 \001(\0132\007.Result\022\021\n\tprocessed\030\002 \001(\010\"\271\003\n" +
"\004Scan\022\027\n\006column\030\001 \003(\0132\007.Column\022!\n\tattrib" +
"ute\030\002 \003(\0132\016.NameBytesPair\022\021\n\tstart_row\030\003",
" \001(\014\022\020\n\010stop_row\030\004 \001(\014\022\027\n\006filter\030\005 \001(\0132\007" +
".Filter\022\036\n\ntime_range\030\006 \001(\0132\n.TimeRange\022" +
"\027\n\014max_versions\030\007 \001(\r:\0011\022\032\n\014cache_blocks" +
"\030\010 \001(\010:\004true\022\022\n\nbatch_size\030\t \001(\r\022\027\n\017max_" +
"result_size\030\n \001(\004\022\023\n\013store_limit\030\013 \001(\r\022\024" +
"\n\014store_offset\030\014 \001(\r\022&\n\036load_column_fami" +
"lies_on_demand\030\r \001(\010\022\r\n\005small\030\016 \001(\010\022\027\n\010r" +
"eversed\030\017 \001(\010:\005false\022)\n\013consistency\030\020 \001(" +
"\0162\014.Consistency:\006STRONG\022\017\n\007caching\030\021 \001(\r" +
"\"\342\001\n\013ScanRequest\022 \n\006region\030\001 \001(\0132\020.Regio",
"nSpecifier\022\023\n\004scan\030\002 \001(\0132\005.Scan\022\022\n\nscann" +
"er_id\030\003 \001(\004\022\026\n\016number_of_rows\030\004 \001(\r\022\025\n\rc" +
"lose_scanner\030\005 \001(\010\022\025\n\rnext_call_seq\030\006 \001(" +
"\004\022\037\n\027client_handles_partials\030\007 \001(\010\022!\n\031cl" +
"ient_handles_heartbeats\030\010 \001(\010\"\344\001\n\014ScanRe" +
"sponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n\nscan" +
"ner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022\013\n\003tt" +
"l\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r\n\005sta" +
"le\030\006 \001(\010\022\037\n\027partial_flag_per_result\030\007 \003(" +
"\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031\n\021hea",
"rtbeat_message\030\t \001(\010\"\263\001\n\024BulkLoadHFileRe" +
"quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" + "quest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier\022" +
"5\n\013family_path\030\002 \003(\0132 .BulkLoadHFileRequ" + "\021\n\003get\030\002 \002(\0132\004.Get\"&\n\013GetResponse\022\027\n\006res" +
"est.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032*" + "ult\030\001 \001(\0132\007.Result\"\200\001\n\tCondition\022\013\n\003row\030",
"\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002" + "\001 \002(\014\022\016\n\006family\030\002 \002(\014\022\021\n\tqualifier\030\003 \002(\014" +
"(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 " + "\022\"\n\014compare_type\030\004 \002(\0162\014.CompareType\022\037\n\n" +
"\002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002" + "comparator\030\005 \002(\0132\013.Comparator\"\265\006\n\rMutati" +
"(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name\030" + "onProto\022\013\n\003row\030\001 \001(\014\0220\n\013mutate_type\030\002 \001(" +
"\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSer" + "\0162\033.MutationProto.MutationType\0220\n\014column" +
"viceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPai", "_value\030\003 \003(\0132\032.MutationProto.ColumnValue" +
"r\"d\n\031CoprocessorServiceRequest\022 \n\006region" + "\022\021\n\ttimestamp\030\004 \001(\004\022!\n\tattribute\030\005 \003(\0132\016" +
"\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132\027" + ".NameBytesPair\022:\n\ndurability\030\006 \001(\0162\031.Mut" +
".CoprocessorServiceCall\"]\n\032CoprocessorSe" + "ationProto.Durability:\013USE_DEFAULT\022\036\n\nti" +
"rviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionSp" + "me_range\030\007 \001(\0132\n.TimeRange\022\035\n\025associated",
"ecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"{" + "_cell_count\030\010 \001(\005\022\r\n\005nonce\030\t \001(\004\032\347\001\n\013Col" +
"\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001(" + "umnValue\022\016\n\006family\030\001 \002(\014\022B\n\017qualifier_va" +
"\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n\014" + "lue\030\002 \003(\0132).MutationProto.ColumnValue.Qu" +
"service_call\030\004 \001(\0132\027.CoprocessorServiceC" + "alifierValue\032\203\001\n\016QualifierValue\022\021\n\tquali" +
"all\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.Re" + "fier\030\001 \001(\014\022\r\n\005value\030\002 \001(\014\022\021\n\ttimestamp\030\003" +
"gionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action\030", " \001(\004\022.\n\013delete_type\030\004 \001(\0162\031.MutationProt" +
"\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014mem" + "o.DeleteType\022\014\n\004tags\030\005 \001(\014\"W\n\nDurability" +
"storeLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(" + "\022\017\n\013USE_DEFAULT\020\000\022\014\n\010SKIP_WAL\020\001\022\r\n\tASYNC" +
"\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001(" + "_WAL\020\002\022\014\n\010SYNC_WAL\020\003\022\r\n\tFSYNC_WAL\020\004\">\n\014M" +
"\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception\030" + "utationType\022\n\n\006APPEND\020\000\022\r\n\tINCREMENT\020\001\022\007",
"\003 \001(\0132\016.NameBytesPair\0221\n\016service_result\030" + "\n\003PUT\020\002\022\n\n\006DELETE\020\003\"p\n\nDeleteType\022\026\n\022DEL" +
"\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tload" + "ETE_ONE_VERSION\020\000\022\034\n\030DELETE_MULTIPLE_VER" +
"Stats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022RegionA" + "SIONS\020\001\022\021\n\rDELETE_FAMILY\020\002\022\031\n\025DELETE_FAM" +
"ctionResult\022-\n\021resultOrException\030\001 \003(\0132\022" + "ILY_VERSION\020\003\"\207\001\n\rMutateRequest\022 \n\006regio" +
".ResultOrException\022!\n\texception\030\002 \001(\0132\016." + "n\030\001 \002(\0132\020.RegionSpecifier\022 \n\010mutation\030\002 " +
"NameBytesPair\"f\n\014MultiRequest\022#\n\014regionA", "\002(\0132\016.MutationProto\022\035\n\tcondition\030\003 \001(\0132\n" +
"ction\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGroup" + ".Condition\022\023\n\013nonce_group\030\004 \001(\004\"<\n\016Mutat" +
"\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S\n" + "eResponse\022\027\n\006result\030\001 \001(\0132\007.Result\022\021\n\tpr" +
"\rMultiResponse\022/\n\022regionActionResult\030\001 \003" + "ocessed\030\002 \001(\010\"\271\003\n\004Scan\022\027\n\006column\030\001 \003(\0132\007" +
"(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 \001" + ".Column\022!\n\tattribute\030\002 \003(\0132\016.NameBytesPa",
"(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELIN" + "ir\022\021\n\tstart_row\030\003 \001(\014\022\020\n\010stop_row\030\004 \001(\014\022" +
"E\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReques" + "\027\n\006filter\030\005 \001(\0132\007.Filter\022\036\n\ntime_range\030\006" +
"t\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReques" + " \001(\0132\n.TimeRange\022\027\n\014max_versions\030\007 \001(\r:\001" +
"t\032\017.MutateResponse\022#\n\004Scan\022\014.ScanRequest" + "1\022\032\n\014cache_blocks\030\010 \001(\010:\004true\022\022\n\nbatch_s" +
"\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.BulkL" + "ize\030\t \001(\r\022\027\n\017max_result_size\030\n \001(\004\022\023\n\013st" +
"oadHFileRequest\032\026.BulkLoadHFileResponse\022", "ore_limit\030\013 \001(\r\022\024\n\014store_offset\030\014 \001(\r\022&\n" +
"F\n\013ExecService\022\032.CoprocessorServiceReque" + "\036load_column_families_on_demand\030\r \001(\010\022\r\n" +
"st\032\033.CoprocessorServiceResponse\022R\n\027ExecR" + "\005small\030\016 \001(\010\022\027\n\010reversed\030\017 \001(\010:\005false\022)\n" +
"egionServerService\022\032.CoprocessorServiceR" + "\013consistency\030\020 \001(\0162\014.Consistency:\006STRONG" +
"equest\032\033.CoprocessorServiceResponse\022&\n\005M" + "\022\017\n\007caching\030\021 \001(\r\"\376\001\n\013ScanRequest\022 \n\006reg",
"ulti\022\r.MultiRequest\032\016.MultiResponseBB\n*o" + "ion\030\001 \001(\0132\020.RegionSpecifier\022\023\n\004scan\030\002 \001(" +
"rg.apache.hadoop.hbase.protobuf.generate" + "\0132\005.Scan\022\022\n\nscanner_id\030\003 \001(\004\022\026\n\016number_o" +
"dB\014ClientProtosH\001\210\001\001\240\001\001" "f_rows\030\004 \001(\r\022\025\n\rclose_scanner\030\005 \001(\010\022\025\n\rn" +
"ext_call_seq\030\006 \001(\004\022\037\n\027client_handles_par" +
"tials\030\007 \001(\010\022!\n\031client_handles_heartbeats" +
"\030\010 \001(\010\022\032\n\022track_scan_metrics\030\t \001(\010\"\210\002\n\014S" +
"canResponse\022\030\n\020cells_per_result\030\001 \003(\r\022\022\n" +
"\nscanner_id\030\002 \001(\004\022\024\n\014more_results\030\003 \001(\010\022" +
"\013\n\003ttl\030\004 \001(\r\022\030\n\007results\030\005 \003(\0132\007.Result\022\r" +
"\n\005stale\030\006 \001(\010\022\037\n\027partial_flag_per_result",
"\030\007 \003(\010\022\036\n\026more_results_in_region\030\010 \001(\010\022\031" +
"\n\021heartbeat_message\030\t \001(\010\022\"\n\014scan_metric" +
"s\030\n \001(\0132\014.ScanMetrics\"\263\001\n\024BulkLoadHFileR" +
"equest\022 \n\006region\030\001 \002(\0132\020.RegionSpecifier" +
"\0225\n\013family_path\030\002 \003(\0132 .BulkLoadHFileReq" +
"uest.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\032" +
"*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 " +
"\002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n\006loaded\030\001" +
" \002(\010\"a\n\026CoprocessorServiceCall\022\013\n\003row\030\001 " +
"\002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013method_name",
"\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"9\n\030CoprocessorSe" +
"rviceResult\022\035\n\005value\030\001 \001(\0132\016.NameBytesPa" +
"ir\"d\n\031CoprocessorServiceRequest\022 \n\006regio" +
"n\030\001 \002(\0132\020.RegionSpecifier\022%\n\004call\030\002 \002(\0132" +
"\027.CoprocessorServiceCall\"]\n\032CoprocessorS" +
"erviceResponse\022 \n\006region\030\001 \002(\0132\020.RegionS" +
"pecifier\022\035\n\005value\030\002 \002(\0132\016.NameBytesPair\"" +
"{\n\006Action\022\r\n\005index\030\001 \001(\r\022 \n\010mutation\030\002 \001" +
"(\0132\016.MutationProto\022\021\n\003get\030\003 \001(\0132\004.Get\022-\n" +
"\014service_call\030\004 \001(\0132\027.CoprocessorService",
"Call\"Y\n\014RegionAction\022 \n\006region\030\001 \002(\0132\020.R" +
"egionSpecifier\022\016\n\006atomic\030\002 \001(\010\022\027\n\006action" +
"\030\003 \003(\0132\007.Action\"D\n\017RegionLoadStats\022\027\n\014me" +
"mstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001" +
"(\005:\0010\"\266\001\n\021ResultOrException\022\r\n\005index\030\001 \001" +
"(\r\022\027\n\006result\030\002 \001(\0132\007.Result\022!\n\texception" +
"\030\003 \001(\0132\016.NameBytesPair\0221\n\016service_result" +
"\030\004 \001(\0132\031.CoprocessorServiceResult\022#\n\tloa" +
"dStats\030\005 \001(\0132\020.RegionLoadStats\"f\n\022Region" +
"ActionResult\022-\n\021resultOrException\030\001 \003(\0132",
"\022.ResultOrException\022!\n\texception\030\002 \001(\0132\016" +
".NameBytesPair\"f\n\014MultiRequest\022#\n\014region" +
"Action\030\001 \003(\0132\r.RegionAction\022\022\n\nnonceGrou" +
"p\030\002 \001(\004\022\035\n\tcondition\030\003 \001(\0132\n.Condition\"S" +
"\n\rMultiResponse\022/\n\022regionActionResult\030\001 " +
"\003(\0132\023.RegionActionResult\022\021\n\tprocessed\030\002 " +
"\001(\010*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELI" +
"NE\020\0012\205\003\n\rClientService\022 \n\003Get\022\013.GetReque" +
"st\032\014.GetResponse\022)\n\006Mutate\022\016.MutateReque" +
"st\032\017.MutateResponse\022#\n\004Scan\022\014.ScanReques",
"t\032\r.ScanResponse\022>\n\rBulkLoadHFile\022\025.Bulk" +
"LoadHFileRequest\032\026.BulkLoadHFileResponse" +
"\022F\n\013ExecService\022\032.CoprocessorServiceRequ" +
"est\032\033.CoprocessorServiceResponse\022R\n\027Exec" +
"RegionServerService\022\032.CoprocessorService" +
"Request\032\033.CoprocessorServiceResponse\022&\n\005" +
"Multi\022\r.MultiRequest\032\016.MultiResponseBB\n*" +
"org.apache.hadoop.hbase.protobuf.generat" +
"edB\014ClientProtosH\001\210\001\001\240\001\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -33079,13 +33462,13 @@ public final class ClientProtos {
internal_static_ScanRequest_fieldAccessorTable = new internal_static_ScanRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanRequest_descriptor, internal_static_ScanRequest_descriptor,
new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", }); new java.lang.String[] { "Region", "Scan", "ScannerId", "NumberOfRows", "CloseScanner", "NextCallSeq", "ClientHandlesPartials", "ClientHandlesHeartbeats", "TrackScanMetrics", });
internal_static_ScanResponse_descriptor = internal_static_ScanResponse_descriptor =
getDescriptor().getMessageTypes().get(13); getDescriptor().getMessageTypes().get(13);
internal_static_ScanResponse_fieldAccessorTable = new internal_static_ScanResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ScanResponse_descriptor, internal_static_ScanResponse_descriptor,
new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", }); new java.lang.String[] { "CellsPerResult", "ScannerId", "MoreResults", "Ttl", "Results", "Stale", "PartialFlagPerResult", "MoreResultsInRegion", "HeartbeatMessage", "ScanMetrics", });
internal_static_BulkLoadHFileRequest_descriptor = internal_static_BulkLoadHFileRequest_descriptor =
getDescriptor().getMessageTypes().get(14); getDescriptor().getMessageTypes().get(14);
internal_static_BulkLoadHFileRequest_fieldAccessorTable = new internal_static_BulkLoadHFileRequest_fieldAccessorTable = new
@ -33180,6 +33563,7 @@ public final class ClientProtos {
org.apache.hadoop.hbase.protobuf.generated.FilterProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.FilterProtos.getDescriptor(),
org.apache.hadoop.hbase.protobuf.generated.CellProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.CellProtos.getDescriptor(),
org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos.getDescriptor(), org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos.getDescriptor(),
org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.getDescriptor(),
}, assigner); }, assigner);
} }

View File

@ -28,6 +28,7 @@ import "HBase.proto";
import "Filter.proto"; import "Filter.proto";
import "Cell.proto"; import "Cell.proto";
import "Comparator.proto"; import "Comparator.proto";
import "MapReduce.proto";
/** /**
* The protocol buffer version of Authorizations. * The protocol buffer version of Authorizations.
@ -276,6 +277,7 @@ message ScanRequest {
optional uint64 next_call_seq = 6; optional uint64 next_call_seq = 6;
optional bool client_handles_partials = 7; optional bool client_handles_partials = 7;
optional bool client_handles_heartbeats = 8; optional bool client_handles_heartbeats = 8;
optional bool track_scan_metrics = 9;
} }
/** /**
@ -320,6 +322,11 @@ message ScanResponse {
// timing out. Seeing a heartbeat message communicates to the Client that the // timing out. Seeing a heartbeat message communicates to the Client that the
// server would have continued to scan had the time limit not been reached. // server would have continued to scan had the time limit not been reached.
optional bool heartbeat_message = 9; optional bool heartbeat_message = 9;
// This field is filled in if the client has requested that scan metrics be tracked.
// The metrics tracked here are sent back to the client to be tracked together with
// the existing client side metrics.
optional ScanMetrics scan_metrics = 10;
} }
/** /**

View File

@ -5423,6 +5423,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
nextKv = heap.peek(); nextKv = heap.peek();
moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
if (scannerContext.checkBatchLimit(limitScope)) { if (scannerContext.checkBatchLimit(limitScope)) {
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
@ -5496,8 +5497,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// progress should be kept. // progress should be kept.
if (scannerContext.getKeepProgress()) { if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation. // Progress should be kept. Reset to initial values seen at start of method invocation.
scannerContext scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
.setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress); initialTimeProgress);
} else { } else {
scannerContext.clearProgress(); scannerContext.clearProgress();
} }
@ -5562,7 +5563,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Check if rowkey filter wants to exclude this row. If so, loop to next. // Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call. // Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(currentRow, offset, length)) { if (filterRowKey(currentRow, offset, length)) {
boolean moreRows = nextRow(currentRow, offset, length); incrementCountOfRowsFilteredMetric(scannerContext);
// Typically the count of rows scanned is incremented inside #populateResult. However,
// here we are filtering a row based purely on its row key, preventing us from calling
// #populateResult. Thus, perform the necessary increment here to rows scanned metric
incrementCountOfRowsScannedMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) { if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
@ -5611,9 +5617,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear(); results.clear();
boolean moreRows = nextRow(currentRow, offset, length); boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) { if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
@ -5656,14 +5663,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Double check to prevent empty rows from appearing in result. It could be // Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used. // the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) { if (results.isEmpty()) {
boolean moreRows = nextRow(currentRow, offset, length); incrementCountOfRowsFilteredMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);
if (!moreRows) { if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} }
if (!stopRow) continue; if (!stopRow) continue;
} }
// We are done. Return the result.
if (stopRow) { if (stopRow) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} else { } else {
@ -5672,6 +5679,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
}
protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
}
/** /**
* @param currentRow * @param currentRow
* @param offset * @param offset
@ -5718,7 +5737,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
&& filter.filterRowKey(row, offset, length); && filter.filterRowKey(row, offset, length);
} }
protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException { protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
short length) throws IOException {
assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read."; assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
Cell next; Cell next;
while ((next = this.storeHeap.peek()) != null && while ((next = this.storeHeap.peek()) != null &&
@ -5726,6 +5746,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.storeHeap.next(MOCKED_LIST); this.storeHeap.next(MOCKED_LIST);
} }
resetFilters(); resetFilters();
// Calling the hook in CP which allows it to do a fast forward // Calling the hook in CP which allows it to do a fast forward
return this.region.getCoprocessorHost() == null return this.region.getCoprocessorHost() == null
|| this.region.getCoprocessorHost() || this.region.getCoprocessorHost()

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
public class NoLimitScannerContext extends ScannerContext { public class NoLimitScannerContext extends ScannerContext {
public NoLimitScannerContext() { public NoLimitScannerContext() {
super(false, null); super(false, null, false);
} }
/** /**

View File

@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@ -144,9 +145,11 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.ScanMetrics;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@ -2364,12 +2367,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
final LimitScope timeScope = final LimitScope timeScope =
allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
boolean trackMetrics =
request.hasTrackScanMetrics() && request.getTrackScanMetrics();
// Configure with limits for this RPC. Set keep progress true since size progress // Configure with limits for this RPC. Set keep progress true since size progress
// towards size limit should be kept between calls to nextRaw // towards size limit should be kept between calls to nextRaw
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
contextBuilder.setSizeLimit(sizeScope, maxResultSize); contextBuilder.setSizeLimit(sizeScope, maxResultSize);
contextBuilder.setBatchLimit(scanner.getBatch()); contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
ScannerContext scannerContext = contextBuilder.build(); ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false; boolean limitReached = false;
@ -2423,6 +2430,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// We didn't get a single batch // We didn't get a single batch
builder.setMoreResultsInRegion(false); builder.setMoreResultsInRegion(false);
} }
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
if (trackMetrics) {
Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
for (Entry<String, Long> entry : metrics.entrySet()) {
pairBuilder.setName(entry.getKey());
pairBuilder.setValue(entry.getValue());
metricBuilder.addMetrics(pairBuilder.build());
}
builder.setScanMetrics(metricBuilder.build());
}
} }
region.updateReadRequestsCount(i); region.updateReadRequestsCount(i);
region.getMetrics().updateScanNext(totalCellSize); region.getMetrics().updateScanNext(totalCellSize);

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
@ -63,13 +63,14 @@ class ReversedRegionScannerImpl extends RegionScannerImpl {
} }
@Override @Override
protected boolean nextRow(byte[] currentRow, int offset, short length) protected boolean nextRow(ScannerContext scannerContext, byte[] currentRow, int offset,
throws IOException { short length) throws IOException {
assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; assert super.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
byte row[] = new byte[length]; byte row[] = new byte[length];
System.arraycopy(currentRow, offset, row, 0, length); System.arraycopy(currentRow, offset, row, 0, length);
this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row)); this.storeHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(row));
resetFilters(); resetFilters();
// Calling the hook in CP which allows it to do a fast forward // Calling the hook in CP which allows it to do a fast forward
if (this.region.getCoprocessorHost() != null) { if (this.region.getCoprocessorHost() != null) {
return this.region.getCoprocessorHost().postScannerFilterRow(this, return this.region.getCoprocessorHost().postScannerFilterRow(this,

View File

@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
/** /**
* ScannerContext instances encapsulate limit tracking AND progress towards those limits during * ScannerContext instances encapsulate limit tracking AND progress towards those limits during
@ -96,7 +97,12 @@ public class ScannerContext {
boolean keepProgress; boolean keepProgress;
private static boolean DEFAULT_KEEP_PROGRESS = false; private static boolean DEFAULT_KEEP_PROGRESS = false;
ScannerContext(boolean keepProgress, LimitFields limitsToCopy) { /**
* Tracks the relevant server side metrics during scans. null when metrics should not be tracked
*/
final ServerSideScanMetrics metrics;
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
this.limits = new LimitFields(); this.limits = new LimitFields();
if (limitsToCopy != null) this.limits.copy(limitsToCopy); if (limitsToCopy != null) this.limits.copy(limitsToCopy);
@ -105,6 +111,21 @@ public class ScannerContext {
this.keepProgress = keepProgress; this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE; this.scannerState = DEFAULT_STATE;
this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
}
boolean isTrackingMetrics() {
return this.metrics != null;
}
/**
* Get the metrics instance. Should only be called after a call to {@link #isTrackingMetrics()}
* has been made to confirm that metrics are indeed being tracked.
* @return {@link ServerSideScanMetrics} instance that is tracking metrics for this scan
*/
ServerSideScanMetrics getMetrics() {
assert isTrackingMetrics();
return this.metrics;
} }
/** /**
@ -331,6 +352,7 @@ public class ScannerContext {
public static final class Builder { public static final class Builder {
boolean keepProgress = DEFAULT_KEEP_PROGRESS; boolean keepProgress = DEFAULT_KEEP_PROGRESS;
boolean trackMetrics = false;
LimitFields limits = new LimitFields(); LimitFields limits = new LimitFields();
private Builder() { private Builder() {
@ -345,6 +367,11 @@ public class ScannerContext {
return this; return this;
} }
public Builder setTrackMetrics(boolean trackMetrics) {
this.trackMetrics = trackMetrics;
return this;
}
public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) { public Builder setSizeLimit(LimitScope sizeScope, long sizeLimit) {
limits.setSize(sizeLimit); limits.setSize(sizeLimit);
limits.setSizeScope(sizeScope); limits.setSizeScope(sizeScope);
@ -363,7 +390,7 @@ public class ScannerContext {
} }
public ScannerContext build() { public ScannerContext build() {
return new ScannerContext(keepProgress, limits); return new ScannerContext(keepProgress, limits, trackMetrics);
} }
} }

View File

@ -49,6 +49,8 @@ module HBaseConstants
METHOD = "METHOD" METHOD = "METHOD"
MAXLENGTH = "MAXLENGTH" MAXLENGTH = "MAXLENGTH"
CACHE_BLOCKS = "CACHE_BLOCKS" CACHE_BLOCKS = "CACHE_BLOCKS"
ALL_METRICS = "ALL_METRICS"
METRICS = "METRICS"
REVERSED = "REVERSED" REVERSED = "REVERSED"
REPLICATION_SCOPE = "REPLICATION_SCOPE" REPLICATION_SCOPE = "REPLICATION_SCOPE"
INTERVAL = 'INTERVAL' INTERVAL = 'INTERVAL'

View File

@ -407,6 +407,8 @@ EOF
def _hash_to_scan(args) def _hash_to_scan(args)
if args.any? if args.any?
enablemetrics = args["ALL_METRICS"].nil? ? false : args["ALL_METRICS"]
enablemetrics = enablemetrics || !args["METRICS"].nil?
filter = args["FILTER"] filter = args["FILTER"]
startrow = args["STARTROW"] || '' startrow = args["STARTROW"] || ''
stoprow = args["STOPROW"] stoprow = args["STOPROW"]
@ -454,6 +456,7 @@ EOF
scan.setFilter(org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter)) scan.setFilter(org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter))
end end
scan.setScanMetricsEnabled(enablemetrics) if enablemetrics
scan.setTimeStamp(timestamp) if timestamp scan.setTimeStamp(timestamp) if timestamp
scan.setCacheBlocks(cache_blocks) scan.setCacheBlocks(cache_blocks)
scan.setReversed(reversed) scan.setReversed(reversed)
@ -478,8 +481,10 @@ EOF
#---------------------------------------------------------------------------------------------- #----------------------------------------------------------------------------------------------
# Scans whole table or a range of keys and returns rows matching specific criteria # Scans whole table or a range of keys and returns rows matching specific criteria
def _scan_internal(args = {}) def _scan_internal(args = {}, scan = nil)
raise(ArgumentError, "Arguments should be a Hash") unless args.kind_of?(Hash) raise(ArgumentError, "Args should be a Hash") unless args.kind_of?(Hash)
raise(ArgumentError, "Scan argument should be org.apache.hadoop.hbase.client.Scan") \
unless scan == nil || scan.kind_of?(org.apache.hadoop.hbase.client.Scan)
limit = args["LIMIT"] || -1 limit = args["LIMIT"] || -1
maxlength = args.delete("MAXLENGTH") || -1 maxlength = args.delete("MAXLENGTH") || -1
@ -489,7 +494,8 @@ EOF
@converters.clear() @converters.clear()
# Start the scanner # Start the scanner
scanner = @table.getScanner(_hash_to_scan(args)) scan = scan == nil ? _hash_to_scan(args) : scan
scanner = @table.getScanner(scan)
iter = scanner.iterator iter = scanner.iterator
# Iterate results # Iterate results
@ -519,6 +525,7 @@ EOF
break break
end end
end end
scanner.close()
return ((block_given?) ? count : res) return ((block_given?) ? count : res)
end end

View File

@ -25,7 +25,7 @@ module Shell
Scan a table; pass table name and optionally a dictionary of scanner Scan a table; pass table name and optionally a dictionary of scanner
specifications. Scanner specifications may include one or more of: specifications. Scanner specifications may include one or more of:
TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, ROWPREFIXFILTER, TIMESTAMP, TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, ROWPREFIXFILTER, TIMESTAMP,
MAXLENGTH or COLUMNS, CACHE or RAW, VERSIONS MAXLENGTH or COLUMNS, CACHE or RAW, VERSIONS, ALL_METRICS or METRICS
If no columns are specified, all columns will be scanned. If no columns are specified, all columns will be scanned.
To scan all members of a column family, leave the qualifier empty as in To scan all members of a column family, leave the qualifier empty as in
@ -36,6 +36,11 @@ The filter can be specified in two ways:
Filter Language document attached to the HBASE-4176 JIRA Filter Language document attached to the HBASE-4176 JIRA
2. Using the entire package name of the filter. 2. Using the entire package name of the filter.
If you wish to see metrics regarding the execution of the scan, the
ALL_METRICS boolean should be set to true. Alternatively, if you would
prefer to see only a subset of the metrics, the METRICS array can be
defined to include the names of only the metrics you care about.
Some examples: Some examples:
hbase> scan 'hbase:meta' hbase> scan 'hbase:meta'
@ -44,6 +49,8 @@ Some examples:
hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'} hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}
hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]} hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]}
hbase> scan 't1', {REVERSED => true} hbase> scan 't1', {REVERSED => true}
hbase> scan 't1', {ALL_METRICS => true}
hbase> scan 't1', {METRICS => ['RPC_RETRIES', 'ROWS_FILTERED']}
hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => " hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => "
(QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"} (QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"}
hbase> scan 't1', {FILTER => hbase> scan 't1', {FILTER =>
@ -100,12 +107,18 @@ EOF
now = Time.now now = Time.now
formatter.header(["ROW", "COLUMN+CELL"]) formatter.header(["ROW", "COLUMN+CELL"])
scan = table._hash_to_scan(args)
#actually do the scanning #actually do the scanning
count = table._scan_internal(args) do |row, cells| count = table._scan_internal(args, scan) do |row, cells|
formatter.row([ row, cells ]) formatter.row([ row, cells ])
end end
formatter.footer(now, count) formatter.footer(now, count)
# if scan metrics were enabled, print them after the results
if (scan != nil && scan.isScanMetricsEnabled())
formatter.scan_metrics(scan.getScanMetrics(), args["METRICS"])
end
end end
end end
end end

View File

@ -112,6 +112,37 @@ module Shell
@row_count += 1 @row_count += 1
end end
# Output the scan metrics. Can be filtered to output only those metrics whose keys exists
# in the metric_filter
def scan_metrics(scan_metrics = nil, metric_filter = [])
return if scan_metrics == nil
raise(ArgumentError, \
"Argument should be org.apache.hadoop.hbase.client.metrics.ScanMetrics") \
unless scan_metrics.kind_of?(org.apache.hadoop.hbase.client.metrics.ScanMetrics)
# prefix output with empty line
@out.puts
# save row count to restore after printing metrics
# (metrics should not count towards row count)
saved_row_count = @row_count
iter = scan_metrics.getMetricsMap().entrySet().iterator()
metric_hash = Hash.new()
# put keys in hash so they can be sorted easily
while iter.hasNext
metric = iter.next
metric_hash[metric.getKey.to_s] = metric.getValue.to_s
end
# print in alphabetical order
row(["METRIC", "VALUE"], false)
metric_hash.sort.map do |key, value|
if (not metric_filter or metric_filter.length == 0 or metric_filter.include?(key))
row([key, value])
end
end
@row_count = saved_row_count
return
end
def split(width, str) def split(width, str)
if width == 0 if width == 0
return [str] return [str]