HBASE-1512 Coprocessors: Support aggregate functions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1096620 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-04-25 22:02:24 +00:00
parent fd470dbf23
commit 6ad14f3c73
7 changed files with 1725 additions and 0 deletions

View File

@ -185,6 +185,7 @@ Release 0.91.0 - Unreleased
HBASE-3798 [REST] Allow representation to elide row key and column key HBASE-3798 [REST] Allow representation to elide row key and column key
HBASE-3812 Tidy up naming consistency and documentation in coprocessor HBASE-3812 Tidy up naming consistency and documentation in coprocessor
framework (Mingjie Lai) framework (Mingjie Lai)
HBASE-1512 Support aggregate functions (Himanshu Vashishtha)
TASKS TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -0,0 +1,362 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
/**
* This client class is for invoking the aggregate functions deployed on the
* Region Server side via the AggregateProtocol. This class will implement the
* supporting functionality for summing/processing the individual results
* obtained from the AggregateProtocol for each region.
* <p>
* This will serve as the client side handler for invoking the aggregate
* functions.
* <ul>
* For all aggregate functions,
* <li>start row < end row is an essential condition (if they are not
* {@link HConstants#EMPTY_BYTE_ARRAY})
* <li>Column family can't be null. In case where multiple families are
* provided, an IOException will be thrown. An optional column qualifier can
* also be defined.
* <li>For methods to find maximum, minimum, sum, rowcount, it returns the
* parameter type. For average and std, it returns a double value. For row
* count, it returns a long value.
*/
public class AggregationClient {
private static final Log log = LogFactory.getLog(AggregationClient.class);
Configuration conf;
/**
* Constructor with Conf object
* @param cfg
*/
public AggregationClient(Configuration cfg) {
this.conf = cfg;
}
/**
* It gives the maximum value of a column for a given column family for the
* given range. In case qualifier is null, a max of all values for the given
* family is returned.
* @param tableName
* @param ci
* @param scan
* @return max val <R>
* @throws Throwable
* The caller is supposed to handle the exception as they are thrown
* & propagated to it.
*/
public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
final Scan scan) throws Throwable {
validateParameters(scan);
HTable table = new HTable(conf, tableName);
class MaxCallBack implements Batch.Callback<R> {
R max = null;
R getMax() {
return max;
}
@Override
public void update(byte[] region, byte[] row, R result) {
max = ci.compare(max, result) < 0 ? result : max;
}
}
MaxCallBack aMaxCallBack = new MaxCallBack();
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
@Override
public R call(AggregateProtocol instance) throws IOException {
return instance.getMax(ci, scan);
}
}, aMaxCallBack);
return aMaxCallBack.getMax();
}
private void validateParameters(Scan scan) throws IOException {
if (scan == null ||
(Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))){
throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
}else if(scan.getFamilyMap().size() != 1) {
throw new IOException("There must be only one family.");
}
}
/**
* It gives the minimum value of a column for a given column family for the
* given range. In case qualifier is null, a min of all values for the given
* family is returned.
* @param tableName
* @param ci
* @param scan
* @return min val <R>
* @throws Throwable
*/
public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci,
final Scan scan) throws Throwable {
validateParameters(scan);
class MinCallBack implements Batch.Callback<R> {
private R min = null;
public R getMinimum() {
return min;
}
@Override
public void update(byte[] region, byte[] row, R result) {
min = (min == null || ci.compare(result, min) < 0) ? result : min;
}
}
HTable table = new HTable(conf, tableName);
MinCallBack minCallBack = new MinCallBack();
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
@Override
public R call(AggregateProtocol instance) throws IOException {
return instance.getMin(ci, scan);
}
}, minCallBack);
log.debug("Min fom all regions is: " + minCallBack.getMinimum());
return minCallBack.getMinimum();
}
/**
* It gives the row count, by summing up the individual results obtained from
* regions. In case the qualifier is null, FirstKEyValueFilter is used to
* optimised the operation. In case qualifier is provided, I can't use the
* filter as it may set the flag to skip to next row, but the value read is
* not of the given filter: in this case, this particular row will not be
* counted ==> an error.
* @param tableName
* @param ci
* @param scan
* @return
* @throws Throwable
*/
public <R, S> long rowCount(final byte[] tableName,
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
validateParameters(scan);
class RowNumCallback implements Batch.Callback<Long> {
private long rowCountL = 0l;
public long getRowNumCount() {
return rowCountL;
}
@Override
public void update(byte[] region, byte[] row, Long result) {
rowCountL += result.longValue();
}
}
RowNumCallback rowNum = new RowNumCallback();
HTable table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
.getStopRow(), new Batch.Call<AggregateProtocol, Long>() {
@Override
public Long call(AggregateProtocol instance) throws IOException {
return instance.getRowNum(ci, scan);
}
}, rowNum);
return rowNum.getRowNumCount();
}
/**
* It sums up the value returned from various regions. In case qualifier is
* null, summation of all the column qualifiers in the given family is done.
* @param tableName
* @param ci
* @param scan
* @return sum <S>
* @throws Throwable
*/
public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci,
final Scan scan) throws Throwable {
validateParameters(scan);
class SumCallBack implements Batch.Callback<S> {
S sumVal = null;
public S getSumResult() {
return sumVal;
}
@Override
public void update(byte[] region, byte[] row, S result) {
sumVal = ci.add(sumVal, result);
}
}
SumCallBack sumCallBack = new SumCallBack();
HTable table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
.getStopRow(), new Batch.Call<AggregateProtocol, S>() {
@Override
public S call(AggregateProtocol instance) throws IOException {
return instance.getSum(ci, scan);
}
}, sumCallBack);
return sumCallBack.getSumResult();
}
/**
* It computes average while fetching sum and row count from all the
* corresponding regions. Approach is to compute a global sum of region level
* sum and rowcount and then compute the average.
* @param tableName
* @param scan
* @throws Throwable
*/
private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName,
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
validateParameters(scan);
class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
S sum = null;
Long rowCount = 0l;
public Pair<S, Long> getAvgArgs() {
return new Pair<S, Long>(sum, rowCount);
}
@Override
public void update(byte[] region, byte[] row, Pair<S, Long> result) {
sum = ci.add(sum, result.getFirst());
rowCount += result.getSecond();
}
}
AvgCallBack avgCallBack = new AvgCallBack();
HTable table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
.getStopRow(), new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
@Override
public Pair<S, Long> call(AggregateProtocol instance) throws IOException {
return instance.getAvg(ci, scan);
}
}, avgCallBack);
return avgCallBack.getAvgArgs();
}
/**
* This is the client side interface/handle for calling the average method for
* a given cf-cq combination. It was necessary to add one more call stack as
* its return type should be a decimal value, irrespective of what
* columninterpreter says. So, this methods collects the necessary parameters
* to compute the average and returs the double value.
* @param tableName
* @param ci
* @param scan
* @return
* @throws Throwable
*/
public <R, S> double avg(final byte[] tableName,
final ColumnInterpreter<R, S> ci, Scan scan) throws Throwable {
Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
return ci.divideForAvg(p.getFirst(), p.getSecond());
}
/**
* It computes a global standard deviation for a given column and its value.
* Standard deviation is square root of (average of squares -
* average*average). From individual regions, it obtains sum, square sum and
* number of rows. With these, the above values are computed to get the global
* std.
* @param tableName
* @param scan
* @return
* @throws Throwable
*/
private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName,
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
validateParameters(scan);
class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
long rowCountVal = 0l;
S sumVal = null, sumSqVal = null;
public Pair<List<S>, Long> getStdParams() {
List<S> l = new ArrayList<S>();
l.add(sumVal);
l.add(sumSqVal);
Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
return p;
}
@Override
public void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
sumVal = ci.add(sumVal, result.getFirst().get(0));
sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
rowCountVal += result.getSecond();
}
}
StdCallback stdCallback = new StdCallback();
HTable table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
.getStopRow(),
new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {
@Override
public Pair<List<S>, Long> call(AggregateProtocol instance)
throws IOException {
return instance.getStd(ci, scan);
}
}, stdCallback);
return stdCallback.getStdParams();
}
/**
* This is the client side interface/handle for calling the std method for a
* given cf-cq combination. It was necessary to add one more call stack as its
* return type should be a decimal value, irrespective of what
* columninterpreter says. So, this methods collects the necessary parameters
* to compute the std and returns the double value.
* @param tableName
* @param ci
* @param scan
* @return
* @throws Throwable
*/
public <R, S> double std(final byte[] tableName, ColumnInterpreter<R, S> ci,
Scan scan) throws Throwable {
Pair<List<S>, Long> p = getStdArgs(tableName, ci, scan);
double res = 0d;
double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond());
double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond());
res = avgOfSumSq - (avg) * (avg); // variance
res = Math.pow(res, 0.5);
return res;
}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.coprocessor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;
/**
* a concrete column interpreter implementation. The cell value is a Long value
* and its promoted data type is also a Long value. For computing aggregation
* function, this class is used to find the datatype of the cell value. Client
* is supposed to instantiate it and passed along as a parameter. See
* {@link TestAggregateProtocol} methods for its sample usage.
* Its methods handle null arguments gracefully.
*/
public class LongColumnInterpreter implements ColumnInterpreter<Long, Long> {
public Long getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
throws IOException {
if (kv == null || kv.getValue().length != Bytes.SIZEOF_LONG)
return null;
return Bytes.toLong(kv.getValue());
}
@Override
public Long add(Long l1, Long l2) {
if (l1 == null ^ l2 == null) {
return (l1 == null) ? l2 : l1; // either of one is null.
} else if (l1 == null) // both are null
return null;
return l1 + l2;
}
@Override
public int compare(final Long l1, final Long l2) {
if (l1 == null ^ l2 == null) {
return l1 == null ? -1 : 1; // either of one is null.
} else if (l1 == null)
return 0; // both are null
return l1.compareTo(l2); // natural ordering.
}
@Override
public Long getMaxValue() {
return Long.MAX_VALUE;
}
@Override
public Long increment(Long o) {
return o == null ? null : (o + 1l);
}
@Override
public Long multiply(Long l1, Long l2) {
return (l1 == null || l2 == null) ? null : l1 * l2;
}
@Override
public Long getMinValue() {
return Long.MIN_VALUE;
}
@Override
public void readFields(DataInput arg0) throws IOException {
// nothing to serialize
}
@Override
public void write(DataOutput arg0) throws IOException {
// nothing to serialize
}
@Override
public double divideForAvg(Long l1, Long l2) {
return (l2 == null || l1 == null) ? Double.NaN : (l1.doubleValue() / l2
.doubleValue());
}
@Override
public Long castToReturnType(Long o) {
return o;
}
}

View File

@ -0,0 +1,224 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Pair;
/**
* A concrete AggregateProtocol implementation. Its system level coprocessor
* that computes the aggregate function at a region level.
*/
public class AggregateImplementation extends BaseEndpointCoprocessor implements
AggregateProtocol {
protected static Log log = LogFactory.getLog(AggregateImplementation.class);
@Override
public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
T temp;
T max = null;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
// qualifier can be null.
try {
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
for (KeyValue kv : results) {
temp = ci.getValue(colFamily, qualifier, kv);
max = (max == null || ci.compare(temp, max) > 0) ? temp : max;
}
results.clear();
} while (hasMoreRows);
} finally {
scanner.close();
}
log.info("Maximum from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + max);
return max;
}
@Override
public <T, S> T getMin(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
T min = null;
T temp;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
try {
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
for (KeyValue kv : results) {
temp = ci.getValue(colFamily, qualifier, kv);
min = (min == null || ci.compare(temp, min) < 0) ? temp : min;
}
results.clear();
} while (hasMoreRows);
} finally {
scanner.close();
}
log.info("Minimum from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + min);
return min;
}
@Override
public <T, S> S getSum(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
long sum = 0l;
S sumVal = null;
T temp;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
try {
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
for (KeyValue kv : results) {
temp = ci.getValue(colFamily, qualifier, kv);
if (temp != null)
sumVal = ci.add(sumVal, ci.castToReturnType(temp));
}
results.clear();
} while (hasMoreRows);
} finally {
scanner.close();
}
log.debug("Sum from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + sum);
return sumVal;
}
@Override
public <T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
long counter = 0l;
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
if (scan.getFilter() == null && qualifier == null)
scan.setFilter(new FirstKeyOnlyFilter());
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
try {
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
if (results.size() > 0) {
counter++;
}
results.clear();
} while (hasMoreRows);
} finally {
scanner.close();
}
log.info("Row counter from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + counter);
return counter;
}
@Override
public <T, S> Pair<S, Long> getAvg(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
S sumVal = null;
Long rowCountVal = 0l;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
try {
do {
results.clear();
hasMoreRows = scanner.next(results);
for (KeyValue kv : results) {
sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily,
qualifier, kv)));
}
rowCountVal++;
} while (hasMoreRows);
} finally {
scanner.close();
}
Pair<S, Long> pair = new Pair<S, Long>(sumVal, rowCountVal);
return pair;
}
@Override
public <T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
S sumVal = null, sumSqVal = null, tempVal = null;
long rowCountVal = 0l;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
try {
do {
tempVal = null;
hasMoreRows = scanner.next(results);
for (KeyValue kv : results) {
tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
qualifier, kv)));
}
results.clear();
sumVal = ci.add(sumVal, tempVal);
sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
rowCountVal++;
} while (hasMoreRows);
} finally {
scanner.close();
}
List<S> l = new ArrayList<S>();
l.add(sumVal);
l.add(sumSqVal);
Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
return p;
}
}

View File

@ -0,0 +1,129 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.util.Pair;
/**
* Defines the aggregation functions that are to be supported in this
* Coprocessor. For each method, it takes a Scan object and a columnInterpreter.
* The scan object should have a column family (else an exception will be
* thrown), and an optional column qualifier. In the current implementation
* {@link AggregateImplementation}, only one column family and column qualifier
* combination is served. In case there are more than one, only first one will
* be picked. Refer to {@link AggregationClient} for some general conditions on
* input parameters.
*/
public interface AggregateProtocol extends CoprocessorProtocol {
/**
* Gives the maximum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, maximum value for the
* entire column family will be returned.
* @param ci
* @param scan
* @return max value as mentioned above
* @throws IOException
*/
<T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
/**
* Gives the minimum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, minimum value for the
* entire column family will be returned.
* @param ci
* @param scan
* @return min as mentioned above
* @throws IOException
*/
<T, S> T getMin(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
/**
* Gives the sum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, sum for the entire column
* family will be returned.
* @param ci
* @param scan
* @return sum of values as defined by the column interpreter
* @throws IOException
*/
<T, S> S getSum(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
/**
* Gives the row count for the given column family and column qualifier, in
* the given row range as defined in the Scan object.
* @param ci
* @param scan
* @return
* @throws IOException
*/
<T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
/**
* Gives a Pair with first object as Sum and second object as row count,
* computed for a given combination of column qualifier and column family in
* the given row range as defined in the Scan object. In its current
* implementation, it takes one column family and one column qualifier (if
* provided). In case of null column qualifier, an aggregate sum over all the
* entire column family will be returned.
* <p>
* The average is computed in
* {@link AggregationClient#avg(byte[], ColumnInterpreter, Scan)} by
* processing results from all regions, so its "ok" to pass sum and a Long
* type.
* @param ci
* @param scan
* @return
* @throws IOException
*/
<T, S> Pair<S, Long> getAvg(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
/**
* Gives a Pair with first object a List containing Sum and sum of squares,
* and the second object as row count. It is computed for a given combination of
* column qualifier and column family in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* one column qualifier (if provided). The idea is get the value of variance first:
* the average of the squares less the square of the average a standard
* deviation is square root of variance.
* @param ci
* @param scan
* @return
* @throws IOException
*/
<T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
}

View File

@ -0,0 +1,118 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.io.Writable;
/**
* Defines how value for specific column is interpreted and provides utility
* methods like compare, add, multiply etc for them. Takes column family, column
* qualifier and return the cell value. Its concrete implementation should
* handle null case gracefully. Refer to {@link LongColumnInterpreter} for an
* example.
* <p>
* Takes two generic parameters. The cell value type of the interpreter is <T>.
* During some computations like sum, average, the return type can be different
* than the cell value data type, for eg, sum of int cell values might overflow
* in case of a int result, we should use Long for its result. Therefore, this
* class mandates to use a different (promoted) data type for result of these
* computations <S>. All computations are performed on the promoted data type
* <S>. There is a conversion method
* {@link ColumnInterpreter#castToReturnType(Object)} which takes a <T> type and
* returns a <S> type.
* @param <T, S>: T - cell value data type, S - promoted data type
*/
public interface ColumnInterpreter<T, S> extends Writable {
/**
* @param colFamily
* @param colQualifier
* @param value
* @return value of type T
* @throws IOException
*/
T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
throws IOException;
/**
* returns sum or non null value among (if either of them is null); otherwise
* returns a null.
* @param l1
* @param l2
* @return
*/
public S add(S l1, S l2);
/**
* returns the maximum value for this type T
* @return
*/
T getMaxValue();
/**
* @return
*/
T getMinValue();
/**
* @param o1
* @param o2
* @return
*/
S multiply(S o1, S o2);
/**
* @param o
* @return
*/
S increment(S o);
/**
* provides casting opportunity between the data types.
* @param o
* @return
*/
S castToReturnType(T o);
/**
* This takes care if either of arguments are null. returns 0 if they are
* equal or both are null;
* <ul>
* <li>>0 if l1 > l2 or l1 is not null and l2 is null.
* <li>< 0 if l1 < l2 or l1 is null and l2 is not null.
*/
int compare(final T l1, final T l2);
/**
* used for computing average of <S> data values. Not providing the divide
* method that takes two <S> values as it si not needed as of now.
* @param o
* @param l
* @return
*/
double divideForAvg(S o, Long l);
}

View File

@ -0,0 +1,785 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* A test class to cover aggregate functions, that can be implemented using
* Coprocessors.
*/
public class TestAggregateProtocol {
protected static Log myLog = LogFactory.getLog(TestAggregateProtocol.class);
/**
* Creating the test infrastructure.
*/
private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
private static final byte[] TEST_MULTI_CQ = Bytes.toBytes("TestMultiCQ");
private static byte[] ROW = Bytes.toBytes("testRow");
private static final int ROWSIZE = 20;
private static final int rowSeperator1 = 5;
private static final int rowSeperator2 = 12;
private static byte[][] ROWS = makeN(ROW, ROWSIZE);
private static HBaseTestingUtility util = new HBaseTestingUtility();
private static MiniHBaseCluster cluster = null;
private static Configuration conf = util.getConfiguration();
/**
* A set up method to start the test cluster. AggregateProtocolImpl is
* registered and will be loaded during region startup.
* @throws Exception
*/
@BeforeClass
public static void setupBeforeClass() throws Exception {
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
util.startMiniCluster(2);
cluster = util.getMiniHBaseCluster();
HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
new byte[][] { HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1],
ROWS[rowSeperator2] });
/**
* The testtable has one CQ which is always populated and one variable CQ
* for each row rowkey1: CF:CQ CF:CQ1 rowKey2: CF:CQ CF:CQ2
*/
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
Long l = new Long(i);
put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(l));
table.put(put);
Put p2 = new Put(ROWS[i]);
p2.add(TEST_FAMILY, Bytes.add(TEST_MULTI_CQ, Bytes.toBytes(l)), Bytes
.toBytes(l * 10));
table.put(p2);
}
}
/**
* Shutting down the cluster
* @throws Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
/**
* an infrastructure method to prepare rows for the testtable.
* @param base
* @param n
* @return
*/
private static byte[][] makeN(byte[] base, int n) {
byte[][] ret = new byte[n][];
for (int i = 0; i < n; i++) {
ret[i] = Bytes.add(base, Bytes.toBytes(i));
}
return ret;
}
/**
* **************************** ROW COUNT Test cases *******************
*/
/**
* This will test rowcount with a valid range, i.e., a subset of rows. It will
* be the most common use case.
* @throws Throwable
*/
@Test
public void testRowCountWithValidRange() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[2]);
scan.setStopRow(ROWS[14]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
assertEquals(12, rowCount);
}
/**
* This will test the row count on the entire table. Startrow and endrow will
* be null.
* @throws Throwable
*/
@Test
public void testRowCountAllTable() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long rowCount = aClient.rowCount(TEST_TABLE, ci,
scan);
assertEquals(ROWSIZE, rowCount);
}
/**
* This will test the row count with startrow > endrow. The result should be
* -1.
* @throws Throwable
*/
@Test
public void testRowCountWithInvalidRange1() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[2]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long rowCount = -1;
try {
rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
} catch (Throwable e) {
myLog.error("Exception thrown in the invalidRange method"
+ e.getStackTrace());
}
assertEquals(-1, rowCount);
}
/**
* This will test the row count with startrow = endrow and they will be
* non-null. The result should be 0, as it assumes a non-get query.
* @throws Throwable
*/
@Test
public void testRowCountWithInvalidRange2() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[5]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long rowCount = -1;
try {
rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
} catch (Throwable e) {
rowCount = 0;
}
assertEquals(0, rowCount);
}
/**
* This should return a 0
*/
@Test
public void testRowCountWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long rowCount = -1;
try {
rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
} catch (Throwable e) {
rowCount = 0;
}
assertEquals(0, rowCount);
}
@Test
public void testRowCountWithNullCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long rowCount = aClient.rowCount(TEST_TABLE, ci,
scan);
assertEquals(20, rowCount);
}
@Test
public void testRowCountWithPrefixFilter() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
scan.setFilter(f);
long rowCount = aClient.rowCount(TEST_TABLE, ci,
scan);
assertEquals(0, rowCount);
}
/**
* ***************Test cases for Maximum *******************
*/
/**
* give max for the entire table.
* @throws Throwable
*/
@Test
public void testMaxWithValidRange() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long maximum = aClient.max(TEST_TABLE, ci, scan);
assertEquals(19, maximum);
}
/**
* @throws Throwable
*/
@Test
public void testMaxWithValidRange2() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long max = aClient.max(TEST_TABLE, ci, scan);
assertEquals(14, max);
}
@Test
public void testMaxWithValidRangeWithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long maximum = aClient.max(TEST_TABLE, ci, scan);
assertEquals(190, maximum);
}
@Test
public void testMaxWithValidRange2WithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long max = aClient.max(TEST_TABLE, ci, scan);
assertEquals(60, max);
}
@Test
public void testMaxWithValidRangeWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Scan scan = new Scan();
Long max = null;
try {
max = aClient.max(TEST_TABLE, ci, scan);
} catch (Throwable e) {
max = null;
}
assertEquals(null, max);// CP will throw an IOException about the
// null column family, and max will be set to 0
}
@Test
public void testMaxWithInvalidRange() {
AggregationClient aClient = new AggregationClient(conf);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Scan scan = new Scan();
scan.setStartRow(ROWS[4]);
scan.setStopRow(ROWS[2]);
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
long max = Long.MIN_VALUE;
try {
max = aClient.max(TEST_TABLE, ci, scan);
} catch (Throwable e) {
max = 0;
}
assertEquals(0, max);// control should go to the catch block
}
@Test
public void testMaxWithInvalidRange2() throws Throwable {
long max = Long.MIN_VALUE;
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[4]);
scan.setStopRow(ROWS[4]);
try {
AggregationClient aClient = new AggregationClient(conf);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
max = aClient.max(TEST_TABLE, ci, scan);
} catch (Exception e) {
max = 0;
}
assertEquals(0, max);// control should go to the catch block
}
@Test
public void testMaxWithFilter() throws Throwable {
Long max = 0l;
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
scan.setFilter(f);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
max = aClient.max(TEST_TABLE, ci, scan);
assertEquals(null, max);
}
/**
* **************************Test cases for Minimum ***********************
*/
/**
* @throws Throwable
*/
@Test
public void testMinWithValidRange() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(HConstants.EMPTY_START_ROW);
scan.setStopRow(HConstants.EMPTY_END_ROW);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Long min = aClient.min(TEST_TABLE, ci,
scan);
assertEquals(0l, min.longValue());
}
/**
* @throws Throwable
*/
@Test
public void testMinWithValidRange2() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long min = aClient.min(TEST_TABLE, ci, scan);
assertEquals(5, min);
}
@Test
public void testMinWithValidRangeWithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(HConstants.EMPTY_START_ROW);
scan.setStopRow(HConstants.EMPTY_END_ROW);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long min = aClient.min(TEST_TABLE, ci,
scan);
assertEquals(0, min);
}
@Test
public void testMinWithValidRange2WithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long min = aClient.min(TEST_TABLE, ci, scan);
assertEquals(6, min);
}
@Test
public void testMinWithValidRangeWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Long min = null;
try {
min = aClient.min(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, min);// CP will throw an IOException about the
// null column family, and max will be set to 0
}
@Test
public void testMinWithInvalidRange() {
AggregationClient aClient = new AggregationClient(conf);
Long min = null;
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[4]);
scan.setStopRow(ROWS[2]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
try {
min = aClient.min(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, min);// control should go to the catch block
}
@Test
public void testMinWithInvalidRange2() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[6]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Long min = null;
try {
min = aClient.min(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, min);// control should go to the catch block
}
@Test
public void testMinWithFilter() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
scan.setFilter(f);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Long min = null;
min = aClient.min(TEST_TABLE, ci, scan);
assertEquals(null, min);
}
/**
* *************** Test cases for Sum *********************
*/
/**
* @throws Throwable
*/
@Test
public void testSumWithValidRange() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long sum = aClient.sum(TEST_TABLE, ci,
scan);
assertEquals(190, sum);
}
/**
* @throws Throwable
*/
@Test
public void testSumWithValidRange2() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long sum = aClient.sum(TEST_TABLE, ci, scan);
assertEquals(95, sum);
}
@Test
public void testSumWithValidRangeWithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long sum = aClient.sum(TEST_TABLE, ci,
scan);
assertEquals(190 + 1900, sum);
}
@Test
public void testSumWithValidRange2WithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long sum = aClient.sum(TEST_TABLE, ci, scan);
assertEquals(6 + 60, sum);
}
@Test
public void testSumWithValidRangeWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Long sum = null;
try {
sum = aClient.sum(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, sum);// CP will throw an IOException about the
// null column family, and max will be set to 0
}
@Test
public void testSumWithInvalidRange() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[2]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Long sum = null;
try {
sum = aClient.sum(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, sum);// control should go to the catch block
}
@Test
public void testSumWithFilter() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setFilter(f);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Long sum = null;
sum = aClient.sum(TEST_TABLE, ci, scan);
assertEquals(null, sum);
}
/**
* ****************************** Test Cases for Avg **************
*/
/**
* @throws Throwable
*/
@Test
public void testAvgWithValidRange() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
double avg = aClient.avg(TEST_TABLE, ci,
scan);
assertEquals(9.5, avg, 0);
}
/**
* @throws Throwable
*/
@Test
public void testAvgWithValidRange2() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
double avg = aClient.avg(TEST_TABLE, ci, scan);
assertEquals(9.5, avg, 0);
}
@Test
public void testAvgWithValidRangeWithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
double avg = aClient.avg(TEST_TABLE, ci,
scan);
assertEquals(104.5, avg, 0);
}
@Test
public void testAvgWithValidRange2WithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
double avg = aClient.avg(TEST_TABLE, ci, scan);
assertEquals(6 + 60, avg, 0);
}
@Test
public void testAvgWithValidRangeWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Double avg = null;
try {
avg = aClient.avg(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, avg);// CP will throw an IOException about the
// null column family, and max will be set to 0
}
@Test
public void testAvgWithInvalidRange() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[1]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Double avg = null;
try {
avg = aClient.avg(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, avg);// control should go to the catch block
}
@Test
public void testAvgWithFilter() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
scan.setFilter(f);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Double avg = null;
avg = aClient.avg(TEST_TABLE, ci, scan);
assertEquals(Double.NaN, avg, 0);
}
/**
* ****************** Test cases for STD **********************
*/
/**
* @throws Throwable
*/
@Test
public void testStdWithValidRange() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
double std = aClient.std(TEST_TABLE, ci,
scan);
assertEquals(5.766, std, 0.05d);
}
/**
* @throws Throwable
*/
@Test
public void testStdWithValidRange2() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
double std = aClient.std(TEST_TABLE, ci, scan);
assertEquals(2.87, std, 0.05d);
}
@Test
public void testStdWithValidRangeWithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
double std = aClient.std(TEST_TABLE, ci,
scan);
assertEquals(63.42, std, 0.05d);
}
@Test
public void testStdWithValidRange2WithNoCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[7]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
double std = aClient.std(TEST_TABLE, ci, scan);
assertEquals(0, std, 0);
}
@Test
public void testStdWithValidRangeWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[17]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Double std = null;
try {
std = aClient.std(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, std);// CP will throw an IOException about the
// null column family, and max will be set to 0
}
@Test
public void testStdWithInvalidRange() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setStartRow(ROWS[6]);
scan.setStopRow(ROWS[1]);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Double std = null;
try {
std = aClient.std(TEST_TABLE, ci, scan);
} catch (Throwable e) {
}
assertEquals(null, std);// control should go to the catch block
}
@Test
public void testStdWithFilter() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
Scan scan = new Scan();
scan.addFamily(TEST_FAMILY);
scan.setFilter(f);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
Double std = null;
std = aClient.std(TEST_TABLE, ci, scan);
assertEquals(Double.NaN, std, 0);
}
}