diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index ba702633238..662e10fa5ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -94,8 +94,6 @@ public class AggregationClient { public R max(final byte[] tableName, final ColumnInterpreter ci, final Scan scan) throws Throwable { validateParameters(scan); - HTable table = new HTable(conf, tableName); - class MaxCallBack implements Batch.Callback { R max = null; @@ -109,13 +107,21 @@ public class AggregationClient { } } MaxCallBack aMaxCallBack = new MaxCallBack(); - table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan - .getStopRow(), new Batch.Call() { - @Override - public R call(AggregateProtocol instance) throws IOException { - return instance.getMax(ci, scan); + HTable table = null; + try { + table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), + scan.getStopRow(), new Batch.Call() { + @Override + public R call(AggregateProtocol instance) throws IOException { + return instance.getMax(ci, scan); + } + }, aMaxCallBack); + } finally { + if (table != null) { + table.close(); } - }, aMaxCallBack); + } return aMaxCallBack.getMax(); } @@ -158,16 +164,23 @@ public class AggregationClient { min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; } } - HTable table = new HTable(conf, tableName); MinCallBack minCallBack = new MinCallBack(); - table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan - .getStopRow(), new Batch.Call() { + HTable table = null; + try { + table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), + scan.getStopRow(), new Batch.Call() { - @Override - public R call(AggregateProtocol instance) throws IOException { - return instance.getMin(ci, scan); + @Override + public R call(AggregateProtocol instance) throws IOException { + return instance.getMin(ci, scan); + } + }, minCallBack); + } finally { + if (table != null) { + table.close(); } - }, minCallBack); + } log.debug("Min fom all regions is: " + minCallBack.getMinimum()); return minCallBack.getMinimum(); } @@ -201,14 +214,21 @@ public class AggregationClient { } } RowNumCallback rowNum = new RowNumCallback(); - HTable table = new HTable(conf, tableName); - table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan - .getStopRow(), new Batch.Call() { - @Override - public Long call(AggregateProtocol instance) throws IOException { - return instance.getRowNum(ci, scan); + HTable table = null; + try { + table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), + scan.getStopRow(), new Batch.Call() { + @Override + public Long call(AggregateProtocol instance) throws IOException { + return instance.getRowNum(ci, scan); + } + }, rowNum); + } finally { + if (table != null) { + table.close(); } - }, rowNum); + } return rowNum.getRowNumCount(); } @@ -237,14 +257,21 @@ public class AggregationClient { } } SumCallBack sumCallBack = new SumCallBack(); - HTable table = new HTable(conf, tableName); - table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan - .getStopRow(), new Batch.Call() { - @Override - public S call(AggregateProtocol instance) throws IOException { - return instance.getSum(ci, scan); + HTable table = null; + try { + table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), + scan.getStopRow(), new Batch.Call() { + @Override + public S call(AggregateProtocol instance) throws IOException { + return instance.getSum(ci, scan); + } + }, sumCallBack); + } finally { + if (table != null) { + table.close(); } - }, sumCallBack); + } return sumCallBack.getSumResult(); } @@ -274,14 +301,23 @@ public class AggregationClient { } } AvgCallBack avgCallBack = new AvgCallBack(); - HTable table = new HTable(conf, tableName); - table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan - .getStopRow(), new Batch.Call>() { - @Override - public Pair call(AggregateProtocol instance) throws IOException { - return instance.getAvg(ci, scan); + HTable table = null; + try { + table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), + scan.getStopRow(), + new Batch.Call>() { + @Override + public Pair call(AggregateProtocol instance) + throws IOException { + return instance.getAvg(ci, scan); + } + }, avgCallBack); + } finally { + if (table != null) { + table.close(); } - }, avgCallBack); + } return avgCallBack.getAvgArgs(); } @@ -337,17 +373,24 @@ public class AggregationClient { } } StdCallback stdCallback = new StdCallback(); - HTable table = new HTable(conf, tableName); - table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan - .getStopRow(), - new Batch.Call, Long>>() { - @Override - public Pair, Long> call(AggregateProtocol instance) - throws IOException { - return instance.getStd(ci, scan); - } + HTable table = null; + try { + table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), + scan.getStopRow(), + new Batch.Call, Long>>() { + @Override + public Pair, Long> call(AggregateProtocol instance) + throws IOException { + return instance.getStd(ci, scan); + } - }, stdCallback); + }, stdCallback); + } finally { + if (table != null) { + table.close(); + } + } return stdCallback.getStdParams(); } @@ -412,17 +455,22 @@ public class AggregationClient { } } StdCallback stdCallback = new StdCallback(); - HTable table = new HTable(conf, tableName); - table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan - .getStopRow(), - new Batch.Call>() { - @Override - public List call(AggregateProtocol instance) - throws IOException { - return instance.getMedian(ci, scan); - } + HTable table = null; + try { + table = new HTable(conf, tableName); + table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), + scan.getStopRow(), new Batch.Call>() { + @Override + public List call(AggregateProtocol instance) throws IOException { + return instance.getMedian(ci, scan); + } - }, stdCallback); + }, stdCallback); + } finally { + if (table != null) { + table.close(); + } + } return stdCallback.getMedianParams(); } @@ -464,20 +512,22 @@ public class AggregationClient { Scan scan2 = new Scan(scan); // inherit stop row from method parameter if (startRow != null) scan2.setStartRow(startRow); - HTable table = new HTable(conf, tableName); - int cacheSize = scan2.getCaching(); - if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { - scan2.setCacheBlocks(true); - cacheSize = 5; - scan2.setCaching(cacheSize); - } - ResultScanner scanner = table.getScanner(scan2); - Result[] results = null; - byte[] qualifier = quals.pollFirst(); - // qualifier for the weight column - byte[] weightQualifier = weighted ? quals.pollLast() : qualifier; - R value = null; + HTable table = null; + ResultScanner scanner = null; try { + table = new HTable(conf, tableName); + int cacheSize = scan2.getCaching(); + if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { + scan2.setCacheBlocks(true); + cacheSize = 5; + scan2.setCaching(cacheSize); + } + scanner = table.getScanner(scan2); + Result[] results = null; + byte[] qualifier = quals.pollFirst(); + // qualifier for the weight column + byte[] weightQualifier = weighted ? quals.pollLast() : qualifier; + R value = null; do { results = scanner.next(cacheSize); if (results != null && results.length > 0) { @@ -495,11 +545,16 @@ public class AggregationClient { movingSumVal = newSumVal; kv = r.getColumnLatest(colFamily, qualifier); value = ci.getValue(colFamily, qualifier, kv); + } } - } } while (results != null && results.length > 0); } finally { - scanner.close(); + if (scanner != null) { + scanner.close(); + } + if (table != null) { + table.close(); + } } return null; }