HBASE-5717 Scanner metrics are only reported if you get to the end of a scanner (Ian Varley)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1325344 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8575edd908
commit
51ec5b2b23
|
@ -228,20 +228,17 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* publish the scan metrics
|
* Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the
|
||||||
* For now, we use scan.setAttribute to pass the metrics for application
|
* application or TableInputFormat.Later, we could push it to other systems. We don't use metrics
|
||||||
* or TableInputFormat to consume
|
* framework because it doesn't support multi-instances of the same metrics on the same machine;
|
||||||
* Later, we could push it to other systems
|
* for scan/map reduce scenarios, we will have multiple scans running at the same time.
|
||||||
* We don't use metrics framework because it doesn't support
|
*
|
||||||
* multi instances of the same metrics on the same machine; for scan/map
|
* By default, scan metrics are disabled; if the application wants to collect them, this behavior
|
||||||
* reduce scenarios, we will have multiple scans running at the same time
|
* can be turned on by calling calling:
|
||||||
|
*
|
||||||
|
* scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE))
|
||||||
*/
|
*/
|
||||||
private void writeScanMetrics() throws IOException
|
private void writeScanMetrics() throws IOException {
|
||||||
{
|
|
||||||
// by default, scanMetrics is null
|
|
||||||
// if application wants to collect scanMetrics, it can turn it on by
|
|
||||||
// calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE,
|
|
||||||
// Bytes.toBytes(Boolean.TRUE))
|
|
||||||
if (this.scanMetrics == null) {
|
if (this.scanMetrics == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -251,10 +248,8 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Result next() throws IOException {
|
public Result next() throws IOException {
|
||||||
// If the scanner is closed but there is some rows left in the cache,
|
// If the scanner is closed and there's nothing left in the cache, next is a no-op.
|
||||||
// it will first empty it before returning null
|
|
||||||
if (cache.size() == 0 && this.closed) {
|
if (cache.size() == 0 && this.closed) {
|
||||||
writeScanMetrics();
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (cache.size() == 0) {
|
if (cache.size() == 0) {
|
||||||
|
@ -316,8 +311,7 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
long currentTime = System.currentTimeMillis();
|
long currentTime = System.currentTimeMillis();
|
||||||
if (this.scanMetrics != null ) {
|
if (this.scanMetrics != null ) {
|
||||||
this.scanMetrics.sumOfMillisSecBetweenNexts.inc(
|
this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime-lastNext);
|
||||||
currentTime-lastNext);
|
|
||||||
}
|
}
|
||||||
lastNext = currentTime;
|
lastNext = currentTime;
|
||||||
if (values != null && values.length > 0) {
|
if (values != null && values.length > 0) {
|
||||||
|
@ -337,6 +331,8 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
if (cache.size() > 0) {
|
if (cache.size() > 0) {
|
||||||
return cache.poll();
|
return cache.poll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if we exhausted this scanner before calling close, write out the scan metrics
|
||||||
writeScanMetrics();
|
writeScanMetrics();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -374,6 +370,13 @@ public class ClientScanner extends AbstractClientScanner {
|
||||||
// have since decided that it's not nice for a scanner's close to
|
// have since decided that it's not nice for a scanner's close to
|
||||||
// throw exceptions. Chances are it was just an UnknownScanner
|
// throw exceptions. Chances are it was just an UnknownScanner
|
||||||
// exception due to lease time out.
|
// exception due to lease time out.
|
||||||
|
} finally {
|
||||||
|
// we want to output the scan metrics even if an error occurred on close
|
||||||
|
try {
|
||||||
|
writeScanMetrics();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// As above, we still don't want the scanner close() method to throw.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
callable = null;
|
callable = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4485,30 +4485,74 @@ public class TestFromClientSide {
|
||||||
|
|
||||||
// Create multiple regions for this table
|
// Create multiple regions for this table
|
||||||
int numOfRegions = TEST_UTIL.createMultiRegions(ht, FAMILY);
|
int numOfRegions = TEST_UTIL.createMultiRegions(ht, FAMILY);
|
||||||
|
// Create 3 rows in the table, with rowkeys starting with "z*" so that
|
||||||
|
// scan are forced to hit all the regions.
|
||||||
|
Put put1 = new Put(Bytes.toBytes("z1"));
|
||||||
|
put1.add(FAMILY, QUALIFIER, VALUE);
|
||||||
|
Put put2 = new Put(Bytes.toBytes("z2"));
|
||||||
|
put2.add(FAMILY, QUALIFIER, VALUE);
|
||||||
|
Put put3 = new Put(Bytes.toBytes("z3"));
|
||||||
|
put3.add(FAMILY, QUALIFIER, VALUE);
|
||||||
|
ht.put(Arrays.asList(put1, put2, put3));
|
||||||
|
|
||||||
Scan scan1 = new Scan();
|
Scan scan1 = new Scan();
|
||||||
|
int numRecords = 0;
|
||||||
for(Result result : ht.getScanner(scan1)) {
|
for(Result result : ht.getScanner(scan1)) {
|
||||||
|
numRecords++;
|
||||||
}
|
}
|
||||||
|
LOG.info("test data has " + numRecords + " records.");
|
||||||
|
|
||||||
// by default, scan metrics collection is turned off
|
// by default, scan metrics collection is turned off
|
||||||
assertEquals(null, scan1.getAttribute(
|
assertEquals(null, scan1.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA));
|
||||||
Scan.SCAN_ATTRIBUTES_METRICS_DATA));
|
|
||||||
|
|
||||||
// turn on scan metrics
|
// turn on scan metrics
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
|
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||||
Bytes.toBytes(Boolean.TRUE));
|
ResultScanner scanner = ht.getScanner(scan);
|
||||||
for(Result result : ht.getScanner(scan)) {
|
// per HBASE-5717, this should still collect even if you don't run all the way to
|
||||||
|
// the end of the scanner. So this is asking for 2 of the 3 rows we inserted.
|
||||||
|
for (Result result : scanner.next(numRecords - 1)) {
|
||||||
|
}
|
||||||
|
scanner.close();
|
||||||
|
|
||||||
|
ScanMetrics scanMetrics = getScanMetrics(scan);
|
||||||
|
assertEquals("Did not access all the regions in the table", numOfRegions,
|
||||||
|
scanMetrics.countOfRegions.getCurrentIntervalValue());
|
||||||
|
|
||||||
|
// now, test that the metrics are still collected even if you don't call close, but do
|
||||||
|
// run past the end of all the records
|
||||||
|
Scan scanWithoutClose = new Scan();
|
||||||
|
scanWithoutClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||||
|
ResultScanner scannerWithoutClose = ht.getScanner(scanWithoutClose);
|
||||||
|
for (Result result : scannerWithoutClose.next(numRecords + 1)) {
|
||||||
|
}
|
||||||
|
ScanMetrics scanMetricsWithoutClose = getScanMetrics(scanWithoutClose);
|
||||||
|
assertEquals("Did not access all the regions in the table", numOfRegions,
|
||||||
|
scanMetricsWithoutClose.countOfRegions.getCurrentIntervalValue());
|
||||||
|
|
||||||
|
// finally, test that the metrics are collected correctly if you both run past all the records,
|
||||||
|
// AND close the scanner
|
||||||
|
Scan scanWithClose = new Scan();
|
||||||
|
scanWithClose.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
|
||||||
|
ResultScanner scannerWithClose = ht.getScanner(scanWithClose);
|
||||||
|
for (Result result : scannerWithClose.next(numRecords + 1)) {
|
||||||
|
}
|
||||||
|
scannerWithClose.close();
|
||||||
|
ScanMetrics scanMetricsWithClose = getScanMetrics(scanWithClose);
|
||||||
|
assertEquals("Did not access all the regions in the table", numOfRegions,
|
||||||
|
scanMetricsWithClose.countOfRegions.getCurrentIntervalValue());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] serializedMetrics = scan.getAttribute(
|
private ScanMetrics getScanMetrics(Scan scan) throws Exception {
|
||||||
Scan.SCAN_ATTRIBUTES_METRICS_DATA);
|
byte[] serializedMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
|
||||||
|
assertTrue("Serialized metrics were not found.", serializedMetrics != null);
|
||||||
|
|
||||||
DataInputBuffer in = new DataInputBuffer();
|
DataInputBuffer in = new DataInputBuffer();
|
||||||
in.reset(serializedMetrics, 0, serializedMetrics.length);
|
in.reset(serializedMetrics, 0, serializedMetrics.length);
|
||||||
ScanMetrics scanMetrics = new ScanMetrics();
|
ScanMetrics scanMetrics = new ScanMetrics();
|
||||||
scanMetrics.readFields(in);
|
scanMetrics.readFields(in);
|
||||||
assertEquals(numOfRegions, scanMetrics.countOfRegions.getCurrentIntervalValue());
|
return scanMetrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue