HBASE-17584 Expose ScanMetrics with ResultScanner rather than Scan
This commit is contained in:
parent
02d9bf0c57
commit
b973d3fd46
@ -42,13 +42,11 @@ public abstract class AbstractClientScanner implements ResultScanner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used internally accumulating metrics on scan. To
|
* Used internally accumulating metrics on scan. To enable collection of metrics on a Scanner,
|
||||||
* enable collection of metrics on a Scanner, call {@link Scan#setScanMetricsEnabled(boolean)}.
|
* call {@link Scan#setScanMetricsEnabled(boolean)}.
|
||||||
* These metrics are cleared at key transition points. Metrics are accumulated in the
|
|
||||||
* {@link Scan} object itself.
|
|
||||||
* @see Scan#getScanMetrics()
|
|
||||||
* @return Returns the running {@link ScanMetrics} instance or null if scan metrics not enabled.
|
* @return Returns the running {@link ScanMetrics} instance or null if scan metrics not enabled.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public ScanMetrics getScanMetrics() {
|
public ScanMetrics getScanMetrics() {
|
||||||
return scanMetrics;
|
return scanMetrics;
|
||||||
}
|
}
|
||||||
@ -63,7 +61,7 @@ public abstract class AbstractClientScanner implements ResultScanner {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Result [] next(int nbRows) throws IOException {
|
public Result[] next(int nbRows) throws IOException {
|
||||||
// Collect values to be returned here
|
// Collect values to be returned here
|
||||||
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
|
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
|
||||||
for(int i = 0; i < nbRows; i++) {
|
for(int i = 0; i < nbRows; i++) {
|
||||||
@ -124,11 +122,4 @@ public abstract class AbstractClientScanner implements ResultScanner {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* Allow the client to renew the scanner's lease on the server.
|
|
||||||
* @return true if the lease was successfully renewed, false otherwise.
|
|
||||||
*/
|
|
||||||
// Note that this method should be on ResultScanner, but that is marked stable.
|
|
||||||
// Callers have to cast their instance of ResultScanner to AbstractClientScanner to use this.
|
|
||||||
public abstract boolean renewLease();
|
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
|
|||||||
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
@ -291,15 +290,17 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
|||||||
* for scan/map reduce scenarios, we will have multiple scans running at the same time. By
|
* for scan/map reduce scenarios, we will have multiple scans running at the same time. By
|
||||||
* default, scan metrics are disabled; if the application wants to collect them, this behavior can
|
* default, scan metrics are disabled; if the application wants to collect them, this behavior can
|
||||||
* be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
|
* be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
|
||||||
* <p>
|
|
||||||
* This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
|
|
||||||
*/
|
*/
|
||||||
protected void writeScanMetrics() {
|
protected void writeScanMetrics() {
|
||||||
if (this.scanMetrics == null || scanMetricsPublished) {
|
if (this.scanMetrics == null || scanMetricsPublished) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
|
// Publish ScanMetrics to the Scan Object.
|
||||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
|
// As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not
|
||||||
|
// call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published
|
||||||
|
// to Scan will be messed up.
|
||||||
|
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
|
||||||
|
ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray());
|
||||||
scanMetricsPublished = true;
|
scanMetricsPublished = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
|
|||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -45,11 +46,22 @@ public interface ResultScanner extends Closeable, Iterable<Result> {
|
|||||||
* @return Between zero and nbRows results
|
* @return Between zero and nbRows results
|
||||||
* @throws IOException e
|
* @throws IOException e
|
||||||
*/
|
*/
|
||||||
Result [] next(int nbRows) throws IOException;
|
Result[] next(int nbRows) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes the scanner and releases any resources it has allocated
|
* Closes the scanner and releases any resources it has allocated
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allow the client to renew the scanner's lease on the server.
|
||||||
|
* @return true if the lease was successfully renewed, false otherwise.
|
||||||
|
*/
|
||||||
|
boolean renewLease();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the scan metrics, or {@code null} if we do not enable metrics.
|
||||||
|
*/
|
||||||
|
ScanMetrics getScanMetrics();
|
||||||
}
|
}
|
||||||
|
@ -1126,9 +1126,13 @@ public class Scan extends Query {
|
|||||||
/**
|
/**
|
||||||
* @return Metrics on this Scan, if metrics were enabled.
|
* @return Metrics on this Scan, if metrics were enabled.
|
||||||
* @see #setScanMetricsEnabled(boolean)
|
* @see #setScanMetricsEnabled(boolean)
|
||||||
|
* @deprecated Use {@link ResultScanner#getScanMetrics()} instead. And notice that, please do not
|
||||||
|
* use this method and {@link ResultScanner#getScanMetrics()} together, the metrics
|
||||||
|
* will be messed up.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public ScanMetrics getScanMetrics() {
|
public ScanMetrics getScanMetrics() {
|
||||||
byte [] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
|
byte[] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
|
||||||
if (bytes == null) return null;
|
if (bytes == null) return null;
|
||||||
return ProtobufUtil.toScanMetrics(bytes);
|
return ProtobufUtil.toScanMetrics(bytes);
|
||||||
}
|
}
|
||||||
|
@ -106,11 +106,20 @@ public class ServerSideScanMetrics {
|
|||||||
* @return A Map of String -> Long for metrics
|
* @return A Map of String -> Long for metrics
|
||||||
*/
|
*/
|
||||||
public Map<String, Long> getMetricsMap() {
|
public Map<String, Long> getMetricsMap() {
|
||||||
|
return getMetricsMap(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all of the values. If reset is true, we will reset the all AtomicLongs back to 0.
|
||||||
|
* @param reset whether to reset the AtomicLongs to 0.
|
||||||
|
* @return A Map of String -> Long for metrics
|
||||||
|
*/
|
||||||
|
public Map<String, Long> getMetricsMap(boolean reset) {
|
||||||
// Create a builder
|
// Create a builder
|
||||||
ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
|
ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
|
||||||
// For every entry add the value and reset the AtomicLong back to zero
|
|
||||||
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
|
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
|
||||||
builder.put(e.getKey(), e.getValue().getAndSet(0));
|
long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
|
||||||
|
builder.put(e.getKey(), value);
|
||||||
}
|
}
|
||||||
// Build the immutable map so that people can't mess around with it.
|
// Build the immutable map so that people can't mess around with it.
|
||||||
return builder.build();
|
return builder.build();
|
||||||
|
@ -2669,9 +2669,9 @@ public final class ProtobufUtil {
|
|||||||
return scanMetrics;
|
return scanMetrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics) {
|
public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics, boolean reset) {
|
||||||
MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder();
|
MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder();
|
||||||
Map<String, Long> metrics = scanMetrics.getMetricsMap();
|
Map<String, Long> metrics = scanMetrics.getMetricsMap(reset);
|
||||||
for (Entry<String, Long> e : metrics.entrySet()) {
|
for (Entry<String, Long> e : metrics.entrySet()) {
|
||||||
HBaseProtos.NameInt64Pair nameInt64Pair =
|
HBaseProtos.NameInt64Pair nameInt64Pair =
|
||||||
HBaseProtos.NameInt64Pair.newBuilder()
|
HBaseProtos.NameInt64Pair.newBuilder()
|
||||||
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||||
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||||
import org.apache.hadoop.hbase.io.TimeRange;
|
import org.apache.hadoop.hbase.io.TimeRange;
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
||||||
@ -637,6 +638,16 @@ public class RemoteHTable implements Table {
|
|||||||
LOG.warn(StringUtils.stringifyException(e));
|
LOG.warn(StringUtils.stringifyException(e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean renewLease() {
|
||||||
|
throw new RuntimeException("renewLease() not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ScanMetrics getScanMetrics() {
|
||||||
|
throw new RuntimeException("getScanMetrics() not supported");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -52,7 +52,6 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
|
|||||||
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
|
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
|
||||||
Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics)
|
Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
// region is immutable, set isolation level
|
// region is immutable, set isolation level
|
||||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ public class TableRecordReaderImpl {
|
|||||||
*/
|
*/
|
||||||
public void restart(byte[] firstRow) throws IOException {
|
public void restart(byte[] firstRow) throws IOException {
|
||||||
currentScan = new Scan(scan);
|
currentScan = new Scan(scan);
|
||||||
currentScan.setStartRow(firstRow);
|
currentScan.withStartRow(firstRow);
|
||||||
currentScan.setScanMetricsEnabled(true);
|
currentScan.setScanMetricsEnabled(true);
|
||||||
if (this.scanner != null) {
|
if (this.scanner != null) {
|
||||||
if (logScannerActivity) {
|
if (logScannerActivity) {
|
||||||
@ -273,7 +273,7 @@ public class TableRecordReaderImpl {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void updateCounters() throws IOException {
|
private void updateCounters() throws IOException {
|
||||||
ScanMetrics scanMetrics = currentScan.getScanMetrics();
|
ScanMetrics scanMetrics = scanner.getScanMetrics();
|
||||||
if (scanMetrics == null) {
|
if (scanMetrics == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -182,15 +182,15 @@ public class TestServerSideScanMetricsFromClientSide {
|
|||||||
|
|
||||||
for (int i = 0; i < ROWS.length - 1; i++) {
|
for (int i = 0; i < ROWS.length - 1; i++) {
|
||||||
scan = new Scan(baseScan);
|
scan = new Scan(baseScan);
|
||||||
scan.setStartRow(ROWS[0]);
|
scan.withStartRow(ROWS[0]);
|
||||||
scan.setStopRow(ROWS[i + 1]);
|
scan.withStopRow(ROWS[i + 1]);
|
||||||
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, i + 1);
|
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, i + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = ROWS.length - 1; i > 0; i--) {
|
for (int i = ROWS.length - 1; i > 0; i--) {
|
||||||
scan = new Scan(baseScan);
|
scan = new Scan(baseScan);
|
||||||
scan.setStartRow(ROWS[i - 1]);
|
scan.withStartRow(ROWS[i - 1]);
|
||||||
scan.setStopRow(ROWS[ROWS.length - 1]);
|
scan.withStopRow(ROWS[ROWS.length - 1]);
|
||||||
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length - i);
|
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length - i);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -308,12 +308,12 @@ public class TestServerSideScanMetricsFromClientSide {
|
|||||||
public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
|
public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
|
||||||
assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
|
assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
|
||||||
ResultScanner scanner = TABLE.getScanner(scan);
|
ResultScanner scanner = TABLE.getScanner(scan);
|
||||||
|
|
||||||
// Iterate through all the results
|
// Iterate through all the results
|
||||||
for (Result r : scanner) {
|
while (scanner.next() != null) {
|
||||||
|
|
||||||
}
|
}
|
||||||
scanner.close();
|
scanner.close();
|
||||||
ScanMetrics metrics = scan.getScanMetrics();
|
ScanMetrics metrics = scanner.getScanMetrics();
|
||||||
assertTrue("Metrics are null", metrics != null);
|
assertTrue("Metrics are null", metrics != null);
|
||||||
assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
|
assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
|
||||||
final long actualMetricValue = metrics.getCounter(metricKey).get();
|
final long actualMetricValue = metrics.getCounter(metricKey).get();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user