HBASE-6369 HTable is not closed in AggregationClient (binlijin)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1360171 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-07-11 13:39:29 +00:00
parent c53b1c6a62
commit bbe0d87f8f
1 changed files with 126 additions and 71 deletions

View File

@ -94,8 +94,6 @@ public class AggregationClient {
public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci, public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
final Scan scan) throws Throwable { final Scan scan) throws Throwable {
validateParameters(scan); validateParameters(scan);
HTable table = new HTable(conf, tableName);
class MaxCallBack implements Batch.Callback<R> { class MaxCallBack implements Batch.Callback<R> {
R max = null; R max = null;
@ -109,13 +107,21 @@ public class AggregationClient {
} }
} }
MaxCallBack aMaxCallBack = new MaxCallBack(); MaxCallBack aMaxCallBack = new MaxCallBack();
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan HTable table = null;
.getStopRow(), new Batch.Call<AggregateProtocol, R>() { try {
@Override table = new HTable(conf, tableName);
public R call(AggregateProtocol instance) throws IOException { table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
return instance.getMax(ci, scan); scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
@Override
public R call(AggregateProtocol instance) throws IOException {
return instance.getMax(ci, scan);
}
}, aMaxCallBack);
} finally {
if (table != null) {
table.close();
} }
}, aMaxCallBack); }
return aMaxCallBack.getMax(); return aMaxCallBack.getMax();
} }
@ -158,16 +164,23 @@ public class AggregationClient {
min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min;
} }
} }
HTable table = new HTable(conf, tableName);
MinCallBack minCallBack = new MinCallBack(); MinCallBack minCallBack = new MinCallBack();
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan HTable table = null;
.getStopRow(), new Batch.Call<AggregateProtocol, R>() { try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
@Override @Override
public R call(AggregateProtocol instance) throws IOException { public R call(AggregateProtocol instance) throws IOException {
return instance.getMin(ci, scan); return instance.getMin(ci, scan);
}
}, minCallBack);
} finally {
if (table != null) {
table.close();
} }
}, minCallBack); }
log.debug("Min fom all regions is: " + minCallBack.getMinimum()); log.debug("Min fom all regions is: " + minCallBack.getMinimum());
return minCallBack.getMinimum(); return minCallBack.getMinimum();
} }
@ -201,14 +214,21 @@ public class AggregationClient {
} }
} }
RowNumCallback rowNum = new RowNumCallback(); RowNumCallback rowNum = new RowNumCallback();
HTable table = new HTable(conf, tableName); HTable table = null;
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan try {
.getStopRow(), new Batch.Call<AggregateProtocol, Long>() { table = new HTable(conf, tableName);
@Override table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
public Long call(AggregateProtocol instance) throws IOException { scan.getStopRow(), new Batch.Call<AggregateProtocol, Long>() {
return instance.getRowNum(ci, scan); @Override
public Long call(AggregateProtocol instance) throws IOException {
return instance.getRowNum(ci, scan);
}
}, rowNum);
} finally {
if (table != null) {
table.close();
} }
}, rowNum); }
return rowNum.getRowNumCount(); return rowNum.getRowNumCount();
} }
@ -237,14 +257,21 @@ public class AggregationClient {
} }
} }
SumCallBack sumCallBack = new SumCallBack(); SumCallBack sumCallBack = new SumCallBack();
HTable table = new HTable(conf, tableName); HTable table = null;
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan try {
.getStopRow(), new Batch.Call<AggregateProtocol, S>() { table = new HTable(conf, tableName);
@Override table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
public S call(AggregateProtocol instance) throws IOException { scan.getStopRow(), new Batch.Call<AggregateProtocol, S>() {
return instance.getSum(ci, scan); @Override
public S call(AggregateProtocol instance) throws IOException {
return instance.getSum(ci, scan);
}
}, sumCallBack);
} finally {
if (table != null) {
table.close();
} }
}, sumCallBack); }
return sumCallBack.getSumResult(); return sumCallBack.getSumResult();
} }
@ -274,14 +301,23 @@ public class AggregationClient {
} }
} }
AvgCallBack avgCallBack = new AvgCallBack(); AvgCallBack avgCallBack = new AvgCallBack();
HTable table = new HTable(conf, tableName); HTable table = null;
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan try {
.getStopRow(), new Batch.Call<AggregateProtocol, Pair<S, Long>>() { table = new HTable(conf, tableName);
@Override table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
public Pair<S, Long> call(AggregateProtocol instance) throws IOException { scan.getStopRow(),
return instance.getAvg(ci, scan); new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
@Override
public Pair<S, Long> call(AggregateProtocol instance)
throws IOException {
return instance.getAvg(ci, scan);
}
}, avgCallBack);
} finally {
if (table != null) {
table.close();
} }
}, avgCallBack); }
return avgCallBack.getAvgArgs(); return avgCallBack.getAvgArgs();
} }
@ -337,17 +373,24 @@ public class AggregationClient {
} }
} }
StdCallback stdCallback = new StdCallback(); StdCallback stdCallback = new StdCallback();
HTable table = new HTable(conf, tableName); HTable table = null;
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan try {
.getStopRow(), table = new HTable(conf, tableName);
new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() { table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
@Override scan.getStopRow(),
public Pair<List<S>, Long> call(AggregateProtocol instance) new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {
throws IOException { @Override
return instance.getStd(ci, scan); public Pair<List<S>, Long> call(AggregateProtocol instance)
} throws IOException {
return instance.getStd(ci, scan);
}
}, stdCallback); }, stdCallback);
} finally {
if (table != null) {
table.close();
}
}
return stdCallback.getStdParams(); return stdCallback.getStdParams();
} }
@ -412,17 +455,22 @@ public class AggregationClient {
} }
} }
StdCallback stdCallback = new StdCallback(); StdCallback stdCallback = new StdCallback();
HTable table = new HTable(conf, tableName); HTable table = null;
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan try {
.getStopRow(), table = new HTable(conf, tableName);
new Batch.Call<AggregateProtocol, List<S>>() { table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
@Override scan.getStopRow(), new Batch.Call<AggregateProtocol, List<S>>() {
public List<S> call(AggregateProtocol instance) @Override
throws IOException { public List<S> call(AggregateProtocol instance) throws IOException {
return instance.getMedian(ci, scan); return instance.getMedian(ci, scan);
} }
}, stdCallback); }, stdCallback);
} finally {
if (table != null) {
table.close();
}
}
return stdCallback.getMedianParams(); return stdCallback.getMedianParams();
} }
@ -464,20 +512,22 @@ public class AggregationClient {
Scan scan2 = new Scan(scan); Scan scan2 = new Scan(scan);
// inherit stop row from method parameter // inherit stop row from method parameter
if (startRow != null) scan2.setStartRow(startRow); if (startRow != null) scan2.setStartRow(startRow);
HTable table = new HTable(conf, tableName); HTable table = null;
int cacheSize = scan2.getCaching(); ResultScanner scanner = null;
if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
scan2.setCacheBlocks(true);
cacheSize = 5;
scan2.setCaching(cacheSize);
}
ResultScanner scanner = table.getScanner(scan2);
Result[] results = null;
byte[] qualifier = quals.pollFirst();
// qualifier for the weight column
byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
R value = null;
try { try {
table = new HTable(conf, tableName);
int cacheSize = scan2.getCaching();
if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
scan2.setCacheBlocks(true);
cacheSize = 5;
scan2.setCaching(cacheSize);
}
scanner = table.getScanner(scan2);
Result[] results = null;
byte[] qualifier = quals.pollFirst();
// qualifier for the weight column
byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
R value = null;
do { do {
results = scanner.next(cacheSize); results = scanner.next(cacheSize);
if (results != null && results.length > 0) { if (results != null && results.length > 0) {
@ -495,11 +545,16 @@ public class AggregationClient {
movingSumVal = newSumVal; movingSumVal = newSumVal;
kv = r.getColumnLatest(colFamily, qualifier); kv = r.getColumnLatest(colFamily, qualifier);
value = ci.getValue(colFamily, qualifier, kv); value = ci.getValue(colFamily, qualifier, kv);
}
} }
}
} while (results != null && results.length > 0); } while (results != null && results.length > 0);
} finally { } finally {
scanner.close(); if (scanner != null) {
scanner.close();
}
if (table != null) {
table.close();
}
} }
return null; return null;
} }