From b973d3fd467b4382198aefc35d2c4e9b4c41ee6d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Fri, 24 Feb 2017 14:08:10 +0800 Subject: [PATCH] HBASE-17584 Expose ScanMetrics with ResultScanner rather than Scan --- .../hbase/client/AbstractClientScanner.java | 17 ++++------------- .../hadoop/hbase/client/ClientScanner.java | 11 ++++++----- .../hadoop/hbase/client/ResultScanner.java | 14 +++++++++++++- .../org/apache/hadoop/hbase/client/Scan.java | 6 +++++- .../client/metrics/ServerSideScanMetrics.java | 13 +++++++++++-- .../hadoop/hbase/protobuf/ProtobufUtil.java | 4 ++-- .../hadoop/hbase/rest/client/RemoteHTable.java | 11 +++++++++++ .../hbase/client/ClientSideRegionScanner.java | 1 - .../hbase/mapreduce/TableRecordReaderImpl.java | 4 ++-- ...TestServerSideScanMetricsFromClientSide.java | 14 +++++++------- 10 files changed, 61 insertions(+), 34 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java index f926fa97f70..f9ab7e9fd4a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java @@ -42,13 +42,11 @@ public abstract class AbstractClientScanner implements ResultScanner { } /** - * Used internally accumulating metrics on scan. To - * enable collection of metrics on a Scanner, call {@link Scan#setScanMetricsEnabled(boolean)}. - * These metrics are cleared at key transition points. Metrics are accumulated in the - * {@link Scan} object itself. - * @see Scan#getScanMetrics() + * Used internally accumulating metrics on scan. To enable collection of metrics on a Scanner, + * call {@link Scan#setScanMetricsEnabled(boolean)}. * @return Returns the running {@link ScanMetrics} instance or null if scan metrics not enabled. */ + @Override public ScanMetrics getScanMetrics() { return scanMetrics; } @@ -63,7 +61,7 @@ public abstract class AbstractClientScanner implements ResultScanner { * @throws IOException */ @Override - public Result [] next(int nbRows) throws IOException { + public Result[] next(int nbRows) throws IOException { // Collect values to be returned here ArrayList resultSets = new ArrayList(nbRows); for(int i = 0; i < nbRows; i++) { @@ -124,11 +122,4 @@ public abstract class AbstractClientScanner implements ResultScanner { } }; } - /** - * Allow the client to renew the scanner's lease on the server. - * @return true if the lease was successfully renewed, false otherwise. - */ - // Note that this method should be on ResultScanner, but that is marked stable. - // Callers have to cast their instance of ResultScanner to AbstractClientScanner to use this. - public abstract boolean renewLease(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 57586d89b27..abcb67ea070 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.util.Bytes; @@ -291,15 +290,17 @@ public abstract class ClientScanner extends AbstractClientScanner { * for scan/map reduce scenarios, we will have multiple scans running at the same time. By * default, scan metrics are disabled; if the application wants to collect them, this behavior can * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)} - *

- * This invocation clears the scan metrics. Metrics are aggregated in the Scan instance. */ protected void writeScanMetrics() { if (this.scanMetrics == null || scanMetricsPublished) { return; } - MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray()); + // Publish ScanMetrics to the Scan Object. + // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not + // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published + // to Scan will be messed up. + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, + ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray()); scanMetricsPublished = true; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java index f3f437a490c..4bc91e695dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import java.io.Closeable; import java.io.IOException; @@ -45,11 +46,22 @@ public interface ResultScanner extends Closeable, Iterable { * @return Between zero and nbRows results * @throws IOException e */ - Result [] next(int nbRows) throws IOException; + Result[] next(int nbRows) throws IOException; /** * Closes the scanner and releases any resources it has allocated */ @Override void close(); + + /** + * Allow the client to renew the scanner's lease on the server. + * @return true if the lease was successfully renewed, false otherwise. + */ + boolean renewLease(); + + /** + * @return the scan metrics, or {@code null} if we do not enable metrics. + */ + ScanMetrics getScanMetrics(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 1481e5f5582..9ce40e85844 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -1126,9 +1126,13 @@ public class Scan extends Query { /** * @return Metrics on this Scan, if metrics were enabled. * @see #setScanMetricsEnabled(boolean) + * @deprecated Use {@link ResultScanner#getScanMetrics()} instead. And notice that, please do not + * use this method and {@link ResultScanner#getScanMetrics()} together, the metrics + * will be messed up. */ + @Deprecated public ScanMetrics getScanMetrics() { - byte [] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); + byte[] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); if (bytes == null) return null; return ProtobufUtil.toScanMetrics(bytes); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java index 4b3e0ce5fd1..6e4e9b81fe5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java @@ -106,11 +106,20 @@ public class ServerSideScanMetrics { * @return A Map of String -> Long for metrics */ public Map getMetricsMap() { + return getMetricsMap(true); + } + + /** + * Get all of the values. If reset is true, we will reset the all AtomicLongs back to 0. + * @param reset whether to reset the AtomicLongs to 0. + * @return A Map of String -> Long for metrics + */ + public Map getMetricsMap(boolean reset) { // Create a builder ImmutableMap.Builder builder = ImmutableMap.builder(); - // For every entry add the value and reset the AtomicLong back to zero for (Map.Entry e : this.counters.entrySet()) { - builder.put(e.getKey(), e.getValue().getAndSet(0)); + long value = reset ? e.getValue().getAndSet(0) : e.getValue().get(); + builder.put(e.getKey(), value); } // Build the immutable map so that people can't mess around with it. return builder.build(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index bfdcfbd65b6..a006370a368 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2669,9 +2669,9 @@ public final class ProtobufUtil { return scanMetrics; } - public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics) { + public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics, boolean reset) { MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder(); - Map metrics = scanMetrics.getMetricsMap(); + Map metrics = scanMetrics.getMetricsMap(reset); for (Entry e : metrics.entrySet()) { HBaseProtos.NameInt64Pair nameInt64Pair = HBaseProtos.NameInt64Pair.newBuilder() diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 146cc72d3cf..463b232df33 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -637,6 +638,16 @@ public class RemoteHTable implements Table { LOG.warn(StringUtils.stringifyException(e)); } } + + @Override + public boolean renewLease() { + throw new RuntimeException("renewLease() not supported"); + } + + @Override + public ScanMetrics getScanMetrics() { + throw new RuntimeException("getScanMetrics() not supported"); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index dde2f100120..4fab6a22b3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -52,7 +52,6 @@ public class ClientSideRegionScanner extends AbstractClientScanner { public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { - // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java index 6f1d140e4d9..a8ed5f14de6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java @@ -81,7 +81,7 @@ public class TableRecordReaderImpl { */ public void restart(byte[] firstRow) throws IOException { currentScan = new Scan(scan); - currentScan.setStartRow(firstRow); + currentScan.withStartRow(firstRow); currentScan.setScanMetricsEnabled(true); if (this.scanner != null) { if (logScannerActivity) { @@ -273,7 +273,7 @@ public class TableRecordReaderImpl { * @throws IOException */ private void updateCounters() throws IOException { - ScanMetrics scanMetrics = currentScan.getScanMetrics(); + ScanMetrics scanMetrics = scanner.getScanMetrics(); if (scanMetrics == null) { return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java index da294b128db..2af03236b5a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java @@ -182,15 +182,15 @@ public class TestServerSideScanMetricsFromClientSide { for (int i = 0; i < ROWS.length - 1; i++) { scan = new Scan(baseScan); - scan.setStartRow(ROWS[0]); - scan.setStopRow(ROWS[i + 1]); + scan.withStartRow(ROWS[0]); + scan.withStopRow(ROWS[i + 1]); testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, i + 1); } for (int i = ROWS.length - 1; i > 0; i--) { scan = new Scan(baseScan); - scan.setStartRow(ROWS[i - 1]); - scan.setStopRow(ROWS[ROWS.length - 1]); + scan.withStartRow(ROWS[i - 1]); + scan.withStopRow(ROWS[ROWS.length - 1]); testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length - i); } @@ -308,12 +308,12 @@ public class TestServerSideScanMetricsFromClientSide { public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); ResultScanner scanner = TABLE.getScanner(scan); - // Iterate through all the results - for (Result r : scanner) { + while (scanner.next() != null) { + } scanner.close(); - ScanMetrics metrics = scan.getScanMetrics(); + ScanMetrics metrics = scanner.getScanMetrics(); assertTrue("Metrics are null", metrics != null); assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); final long actualMetricValue = metrics.getCounter(metricKey).get();