HBASE-8218 Pass HTable as parameter to methods of AggregationClient

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1482478 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
anoopsamjohn 2013-05-14 17:29:31 +00:00
parent 90bd4de7cf
commit 487f1d437f
1 changed files with 351 additions and 228 deletions

View File

@ -98,8 +98,34 @@ public class AggregationClient {
* The caller is supposed to handle the exception as they are thrown
* & propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message> R max(
final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
HTable table = null;
try {
table = new HTable(conf, tableName);
return max(table, ci, scan);
} finally {
if (table != null) {
table.close();
}
}
}
/**
* It gives the maximum value of a column for a given column family for the
* given range. In case qualifier is null, a max of all values for the given
* family is returned.
* @param table
* @param ci
* @param scan
* @return max val <R>
* @throws Throwable
* The caller is supposed to handle the exception as they are thrown
* & propagated to it.
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
R max(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
R max(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class MaxCallBack implements Batch.Callback<R> {
@ -115,34 +141,26 @@ public class AggregationClient {
}
}
MaxCallBack aMaxCallBack = new MaxCallBack();
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, R>() {
@Override
public R call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMax(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
if (response.getFirstPartCount() > 0) {
ByteString b = response.getFirstPart(0);
Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
return ci.getCellValueFromProto(q);
}
return null;
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, R>() {
@Override
public R call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMax(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
}, aMaxCallBack);
} finally {
if (table != null) {
table.close();
}
}
if (response.getFirstPartCount() > 0) {
ByteString b = response.getFirstPart(0);
Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
return ci.getCellValueFromProto(q);
}
return null;
}
}, aMaxCallBack);
return aMaxCallBack.getMax();
}
@ -169,8 +187,32 @@ public class AggregationClient {
* @return min val <R>
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message> R min(
final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
HTable table = null;
try {
table = new HTable(conf, tableName);
return min(table, ci, scan);
} finally {
if (table != null) {
table.close();
}
}
}
/**
* It gives the minimum value of a column for a given column family for the
* given range. In case qualifier is null, a min of all values for the given
* family is returned.
* @param table
* @param ci
* @param scan
* @return min val <R>
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
R min(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
R min(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class MinCallBack implements Batch.Callback<R> {
@ -187,35 +229,27 @@ public class AggregationClient {
}
}
MinCallBack minCallBack = new MinCallBack();
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, R>() {
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, R>() {
@Override
public R call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMin(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
if (response.getFirstPartCount() > 0) {
ByteString b = response.getFirstPart(0);
Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
return ci.getCellValueFromProto(q);
}
return null;
@Override
public R call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMin(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
}, minCallBack);
} finally {
if (table != null) {
table.close();
}
}
if (response.getFirstPartCount() > 0) {
ByteString b = response.getFirstPart(0);
Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
return ci.getCellValueFromProto(q);
}
return null;
}
}, minCallBack);
log.debug("Min fom all regions is: " + minCallBack.getMinimum());
return minCallBack.getMinimum();
}
@ -233,8 +267,35 @@ public class AggregationClient {
* @return <R, S>
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount(
final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
HTable table = null;
try {
table = new HTable(conf, tableName);
return rowCount(table, ci, scan);
} finally {
if (table != null) {
table.close();
}
}
}
/**
* It gives the row count, by summing up the individual results obtained from
* regions. In case the qualifier is null, FirstKeyValueFilter is used to
* optimised the operation. In case qualifier is provided, I can't use the
* filter as it may set the flag to skip to next row, but the value read is
* not of the given filter: in this case, this particular row will not be
* counted ==> an error.
* @param table
* @param ci
* @param scan
* @return <R, S>
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
long rowCount(final byte[] tableName,
long rowCount(final HTable table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class RowNumCallback implements Batch.Callback<Long> {
@ -250,32 +311,24 @@ public class AggregationClient {
}
}
RowNumCallback rowNum = new RowNumCallback();
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, Long>() {
@Override
public Long call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getRowNum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
bb.rewind();
return bb.getLong();
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, Long>() {
@Override
public Long call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getRowNum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
}, rowNum);
} finally {
if (table != null) {
table.close();
}
}
byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
bb.rewind();
return bb.getLong();
}
}, rowNum);
return rowNum.getRowNumCount();
}
@ -288,8 +341,31 @@ public class AggregationClient {
* @return sum <S>
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message> S sum(
final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
HTable table = null;
try {
table = new HTable(conf, tableName);
return sum(table, ci, scan);
} finally {
if (table != null) {
table.close();
}
}
}
/**
* It sums up the value returned from various regions. In case qualifier is
* null, summation of all the column qualifiers in the given family is done.
* @param table
* @param ci
* @param scan
* @return sum <S>
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
S sum(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
S sum(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
@ -306,35 +382,27 @@ public class AggregationClient {
}
}
SumCallBack sumCallBack = new SumCallBack();
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, S>() {
@Override
public S call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getSum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
if (response.getFirstPartCount() == 0) {
return null;
}
ByteString b = response.getFirstPart(0);
T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
S s = ci.getPromotedValueFromProto(t);
return s;
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, S>() {
@Override
public S call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getSum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
}, sumCallBack);
} finally {
if (table != null) {
table.close();
}
}
if (response.getFirstPartCount() == 0) {
return null;
}
ByteString b = response.getFirstPart(0);
T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
S s = ci.getPromotedValueFromProto(t);
return s;
}
}, sumCallBack);
return sumCallBack.getSumResult();
}
@ -346,8 +414,30 @@ public class AggregationClient {
* @param scan
* @throws Throwable
*/
private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs(
final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan)
throws Throwable {
HTable table = null;
try {
table = new HTable(conf, tableName);
return getAvgArgs(table, ci, scan);
} finally {
if (table != null) {
table.close();
}
}
}
/**
* It computes average while fetching sum and row count from all the
* corresponding regions. Approach is to compute a global sum of region level
* sum and rowcount and then compute the average.
* @param table
* @param scan
* @throws Throwable
*/
private <R, S, P extends Message, Q extends Message, T extends Message>
Pair<S, Long> getAvgArgs(final byte[] tableName,
Pair<S, Long> getAvgArgs(final HTable table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
@ -365,43 +455,33 @@ public class AggregationClient {
}
}
AvgCallBack avgCallBack = new AvgCallBack();
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(),
new Batch.Call<AggregateService, Pair<S, Long>>() {
@Override
public Pair<S, Long> call(AggregateService instance)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getAvg(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
Pair<S,Long> pair = new Pair<S, Long>(null, 0L);
if (response.getFirstPartCount() == 0) {
return pair;
}
ByteString b = response.getFirstPart(0);
T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
S s = ci.getPromotedValueFromProto(t);
pair.setFirst(s);
ByteBuffer bb = ByteBuffer.allocate(8).put(
getBytesFromResponse(response.getSecondPart()));
bb.rewind();
pair.setSecond(bb.getLong());
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, Pair<S, Long>>() {
@Override
public Pair<S, Long> call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getAvg(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
if (response.getFirstPartCount() == 0) {
return pair;
}
}, avgCallBack);
} finally {
if (table != null) {
table.close();
}
}
ByteString b = response.getFirstPart(0);
T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
S s = ci.getPromotedValueFromProto(t);
pair.setFirst(s);
ByteBuffer bb = ByteBuffer.allocate(8).put(
getBytesFromResponse(response.getSecondPart()));
bb.rewind();
pair.setSecond(bb.getLong());
return pair;
}
}, avgCallBack);
return avgCallBack.getAvgArgs();
}
@ -424,19 +504,37 @@ public class AggregationClient {
return ci.divideForAvg(p.getFirst(), p.getSecond());
}
/**
* This is the client side interface/handle for calling the average method for
* a given cf-cq combination. It was necessary to add one more call stack as
* its return type should be a decimal value, irrespective of what
* columninterpreter says. So, this methods collects the necessary parameters
* to compute the average and returs the double value.
* @param table
* @param ci
* @param scan
* @return <R, S>
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message> double avg(
final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
Pair<S, Long> p = getAvgArgs(table, ci, scan);
return ci.divideForAvg(p.getFirst(), p.getSecond());
}
/**
* It computes a global standard deviation for a given column and its value.
* Standard deviation is square root of (average of squares -
* average*average). From individual regions, it obtains sum, square sum and
* number of rows. With these, the above values are computed to get the global
* std.
* @param tableName
* @param table
* @param scan
* @return
* @throws Throwable
*/
private <R, S, P extends Message, Q extends Message, T extends Message>
Pair<List<S>, Long> getStdArgs(final byte[] tableName,
Pair<List<S>, Long> getStdArgs(final HTable table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
@ -461,48 +559,37 @@ public class AggregationClient {
}
}
StdCallback stdCallback = new StdCallback();
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(),
new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
@Override
public Pair<List<S>, Long> call(AggregateService instance)
throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getStd(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
Pair<List<S>,Long> pair =
new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
if (response.getFirstPartCount() == 0) {
return pair;
}
List<S> list = new ArrayList<S>();
for (int i = 0; i < response.getFirstPartCount(); i++) {
ByteString b = response.getFirstPart(i);
T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
S s = ci.getPromotedValueFromProto(t);
list.add(s);
}
pair.setFirst(list);
ByteBuffer bb = ByteBuffer.allocate(8).put(
getBytesFromResponse(response.getSecondPart()));
bb.rewind();
pair.setSecond(bb.getLong());
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
@Override
public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getStd(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
if (response.getFirstPartCount() == 0) {
return pair;
}
}, stdCallback);
} finally {
if (table != null) {
table.close();
}
}
List<S> list = new ArrayList<S>();
for (int i = 0; i < response.getFirstPartCount(); i++) {
ByteString b = response.getFirstPart(i);
T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
S s = ci.getPromotedValueFromProto(t);
list.add(s);
}
pair.setFirst(list);
ByteBuffer bb = ByteBuffer.allocate(8).put(
getBytesFromResponse(response.getSecondPart()));
bb.rewind();
pair.setSecond(bb.getLong());
return pair;
}
}, stdCallback);
return stdCallback.getStdParams();
}
@ -521,7 +608,32 @@ public class AggregationClient {
public <R, S, P extends Message, Q extends Message, T extends Message>
double std(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) throws Throwable {
Pair<List<S>, Long> p = getStdArgs(tableName, ci, scan);
HTable table = null;
try {
table = new HTable(conf, tableName);
return std(table, ci, scan);
} finally {
if (table != null) {
table.close();
}
}
}
/**
* This is the client side interface/handle for calling the std method for a
* given cf-cq combination. It was necessary to add one more call stack as its
* return type should be a decimal value, irrespective of what
* columninterpreter says. So, this methods collects the necessary parameters
* to compute the std and returns the double value.
* @param table
* @param ci
* @param scan
* @return <R, S>
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message> double std(
final HTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
Pair<List<S>, Long> p = getStdArgs(table, ci, scan);
double res = 0d;
double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
@ -534,7 +646,7 @@ public class AggregationClient {
* It helps locate the region with median for a given column whose weight
* is specified in an optional column.
* From individual regions, it obtains sum of values and sum of weights.
* @param tableName
* @param table
* @param ci
* @param scan
* @return pair whose first element is a map between start row of the region
@ -544,7 +656,7 @@ public class AggregationClient {
*/
private <R, S, P extends Message, Q extends Message, T extends Message>
Pair<NavigableMap<byte[], List<S>>, List<S>>
getMedianArgs(final byte[] tableName,
getMedianArgs(final HTable table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
final NavigableMap<byte[], List<S>> map =
@ -569,38 +681,30 @@ public class AggregationClient {
}
}
StdCallback stdCallback = new StdCallback();
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, List<S>>() {
@Override
public List<S> call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMedian(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
List<S> list = new ArrayList<S>();
for (int i = 0; i < response.getFirstPartCount(); i++) {
ByteString b = response.getFirstPart(i);
T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
S s = ci.getPromotedValueFromProto(t);
list.add(s);
}
return list;
table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<AggregateService, List<S>>() {
@Override
public List<S> call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMedian(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
}, stdCallback);
} finally {
if (table != null) {
table.close();
}
}
List<S> list = new ArrayList<S>();
for (int i = 0; i < response.getFirstPartCount(); i++) {
ByteString b = response.getFirstPart(i);
T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
S s = ci.getPromotedValueFromProto(t);
list.add(s);
}
return list;
}
}, stdCallback);
return stdCallback.getMedianParams();
}
@ -617,7 +721,31 @@ public class AggregationClient {
public <R, S, P extends Message, Q extends Message, T extends Message>
R median(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) throws Throwable {
Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(tableName, ci, scan);
HTable table = null;
try {
table = new HTable(conf, tableName);
return median(table, ci, scan);
} finally {
if (table != null) {
table.close();
}
}
}
/**
* This is the client side interface/handler for calling the median method for a
* given cf-cq combination. This method collects the necessary parameters
* to compute the median and returns the median.
* @param table
* @param ci
* @param scan
* @return R the median
* @throws Throwable
*/
public <R, S, P extends Message, Q extends Message, T extends Message>
R median(final HTable table, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan) throws Throwable {
Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan);
byte[] startRow = null;
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
@ -643,10 +771,8 @@ public class AggregationClient {
Scan scan2 = new Scan(scan);
// inherit stop row from method parameter
if (startRow != null) scan2.setStartRow(startRow);
HTable table = null;
ResultScanner scanner = null;
try {
table = new HTable(conf, tableName);
int cacheSize = scan2.getCaching();
if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
scan2.setCacheBlocks(true);
@ -683,9 +809,6 @@ public class AggregationClient {
if (scanner != null) {
scanner.close();
}
if (table != null) {
table.close();
}
}
return null;
}