diff --git a/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 97293aa4aa1..3167f233cf8 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -228,20 +228,17 @@ public class ClientScanner extends AbstractClientScanner { } /** - * publish the scan metrics - * For now, we use scan.setAttribute to pass the metrics for application - * or TableInputFormat to consume - * Later, we could push it to other systems - * We don't use metrics framework because it doesn't support - * multi instances of the same metrics on the same machine; for scan/map - * reduce scenarios, we will have multiple scans running at the same time + * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the + * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics + * framework because it doesn't support multi-instances of the same metrics on the same machine; + * for scan/map reduce scenarios, we will have multiple scans running at the same time. + * + * By default, scan metrics are disabled; if the application wants to collect them, this behavior + * can be turned on by calling calling: + * + * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)) */ - private void writeScanMetrics() throws IOException - { - // by default, scanMetrics is null - // if application wants to collect scanMetrics, it can turn it on by - // calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, - // Bytes.toBytes(Boolean.TRUE)) + private void writeScanMetrics() throws IOException { if (this.scanMetrics == null) { return; } @@ -251,10 +248,8 @@ public class ClientScanner extends AbstractClientScanner { } public Result next() throws IOException { - // If the scanner is closed but there is some rows left in the cache, - // it will first empty it before returning null + // If the scanner is closed and there's nothing left in the cache, next is a no-op. if (cache.size() == 0 && this.closed) { - writeScanMetrics(); return null; } if (cache.size() == 0) { @@ -316,8 +311,7 @@ public class ClientScanner extends AbstractClientScanner { } long currentTime = System.currentTimeMillis(); if (this.scanMetrics != null ) { - this.scanMetrics.sumOfMillisSecBetweenNexts.inc( - currentTime-lastNext); + this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime-lastNext); } lastNext = currentTime; if (values != null && values.length > 0) { @@ -337,6 +331,8 @@ public class ClientScanner extends AbstractClientScanner { if (cache.size() > 0) { return cache.poll(); } + + // if we exhausted this scanner before calling close, write out the scan metrics writeScanMetrics(); return null; } @@ -374,6 +370,13 @@ public class ClientScanner extends AbstractClientScanner { // have since decided that it's not nice for a scanner's close to // throw exceptions. Chances are it was just an UnknownScanner // exception due to lease time out. + } finally { + // we want to output the scan metrics even if an error occurred on close + try { + writeScanMetrics(); + } catch (IOException e) { + // As above, we still don't want the scanner close() method to throw. + } } callable = null; } diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index f7430ee1728..a7f3a1ac244 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -4485,30 +4485,74 @@ public class TestFromClientSide { // Create multiple regions for this table int numOfRegions = TEST_UTIL.createMultiRegions(ht, FAMILY); + // Create 3 rows in the table, with rowkeys starting with "z*" so that + // scan are forced to hit all the regions. + Put put1 = new Put(Bytes.toBytes("z1")); + put1.add(FAMILY, QUALIFIER, VALUE); + Put put2 = new Put(Bytes.toBytes("z2")); + put2.add(FAMILY, QUALIFIER, VALUE); + Put put3 = new Put(Bytes.toBytes("z3")); + put3.add(FAMILY, QUALIFIER, VALUE); + ht.put(Arrays.asList(put1, put2, put3)); Scan scan1 = new Scan(); + int numRecords = 0; for(Result result : ht.getScanner(scan1)) { + numRecords++; } + LOG.info("test data has " + numRecords + " records."); // by default, scan metrics collection is turned off - assertEquals(null, scan1.getAttribute( - Scan.SCAN_ATTRIBUTES_METRICS_DATA)); + assertEquals(null, scan1.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA)); // turn on scan metrics Scan scan = new Scan(); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, - Bytes.toBytes(Boolean.TRUE)); - for(Result result : ht.getScanner(scan)) { + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); + ResultScanner scanner = ht.getScanner(scan); + // per HBASE-5717, this should still collect even if you don't run all the way to + // the end of the scanner. So this is asking for 2 of the 3 rows we inserted. + for (Result result : scanner.next(numRecords - 1)) { } + scanner.close(); - byte[] serializedMetrics = scan.getAttribute( - Scan.SCAN_ATTRIBUTES_METRICS_DATA); + ScanMetrics scanMetrics = getScanMetrics(scan); + assertEquals("Did not access all the regions in the table", numOfRegions, + scanMetrics.countOfRegions.getCurrentIntervalValue()); + + // now, test that the metrics are still collected even if you don't call close, but do + // run past the end of all the records + Scan scanWithoutClose = new Scan(); + scanWithoutClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); + ResultScanner scannerWithoutClose = ht.getScanner(scanWithoutClose); + for (Result result : scannerWithoutClose.next(numRecords + 1)) { + } + ScanMetrics scanMetricsWithoutClose = getScanMetrics(scanWithoutClose); + assertEquals("Did not access all the regions in the table", numOfRegions, + scanMetricsWithoutClose.countOfRegions.getCurrentIntervalValue()); + + // finally, test that the metrics are collected correctly if you both run past all the records, + // AND close the scanner + Scan scanWithClose = new Scan(); + scanWithClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); + ResultScanner scannerWithClose = ht.getScanner(scanWithClose); + for (Result result : scannerWithClose.next(numRecords + 1)) { + } + scannerWithClose.close(); + ScanMetrics scanMetricsWithClose = getScanMetrics(scanWithClose); + assertEquals("Did not access all the regions in the table", numOfRegions, + scanMetricsWithClose.countOfRegions.getCurrentIntervalValue()); + + } + + private ScanMetrics getScanMetrics(Scan scan) throws Exception { + byte[] serializedMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); + assertTrue("Serialized metrics were not found.", serializedMetrics != null); DataInputBuffer in = new DataInputBuffer(); in.reset(serializedMetrics, 0, serializedMetrics.length); ScanMetrics scanMetrics = new ScanMetrics(); scanMetrics.readFields(in); - assertEquals(numOfRegions, scanMetrics.countOfRegions.getCurrentIntervalValue()); + return scanMetrics; } /**