HBASE-9605 Allow AggregationClient to skip specifying column family for row count aggregate

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1526987 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-09-27 17:31:47 +00:00
parent 77ca737375
commit 9f46b87576
3 changed files with 22 additions and 37 deletions

View File

@ -127,7 +127,7 @@ public class AggregationClient {
public <R, S, P extends Message, Q extends Message, T extends Message>
R max(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class MaxCallBack implements Batch.Callback<R> {
R max = null;
@ -164,7 +164,11 @@ public class AggregationClient {
return aMaxCallBack.getMax();
}
private void validateParameters(Scan scan) throws IOException {
/*
* @param scan
* @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
*/
private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
if (scan == null
|| (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes
.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
@ -172,8 +176,10 @@ public class AggregationClient {
!Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
throw new IOException(
"Agg client Exception: Startrow should be smaller than Stoprow");
} else if (scan.getFamilyMap().size() != 1) {
throw new IOException("There must be only one family.");
} else if (!canFamilyBeAbsent) {
if (scan.getFamilyMap().size() != 1) {
throw new IOException("There must be only one family.");
}
}
}
@ -214,7 +220,7 @@ public class AggregationClient {
public <R, S, P extends Message, Q extends Message, T extends Message>
R min(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class MinCallBack implements Batch.Callback<R> {
private R min = null;
@ -297,7 +303,7 @@ public class AggregationClient {
public <R, S, P extends Message, Q extends Message, T extends Message>
long rowCount(final HTable table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true);
class RowNumCallback implements Batch.Callback<Long> {
private final AtomicLong rowCountL = new AtomicLong(0);
@ -367,7 +373,7 @@ public class AggregationClient {
public <R, S, P extends Message, Q extends Message, T extends Message>
S sum(final HTable table, final ColumnInterpreter<R, S, P, Q, T> ci,
final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class SumCallBack implements Batch.Callback<S> {
S sumVal = null;
@ -439,7 +445,7 @@ public class AggregationClient {
private <R, S, P extends Message, Q extends Message, T extends Message>
Pair<S, Long> getAvgArgs(final HTable table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
S sum = null;
Long rowCount = 0l;
@ -536,7 +542,7 @@ public class AggregationClient {
private <R, S, P extends Message, Q extends Message, T extends Message>
Pair<List<S>, Long> getStdArgs(final HTable table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
long rowCountVal = 0l;
S sumVal = null, sumSqVal = null;
@ -658,7 +664,7 @@ public class AggregationClient {
Pair<NavigableMap<byte[], List<S>>, List<S>>
getMedianArgs(final HTable table,
final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci);
final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false);
final NavigableMap<byte[], List<S>> map =
new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
class StdCallback implements Batch.Callback<List<S>> {
@ -814,9 +820,9 @@ public class AggregationClient {
}
<R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci)
validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
throws IOException {
validateParameters(scan);
validateParameters(scan, canFamilyBeAbsent);
final AggregateRequest.Builder requestBuilder =
AggregateRequest.newBuilder();
requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());

View File

@ -237,8 +237,10 @@ extends AggregateService implements CoprocessorService, Coprocessor {
InternalScanner scanner = null;
try {
Scan scan = ProtobufUtil.toScan(request.getScan());
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
byte[][] colFamilies = scan.getFamilies();
byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
NavigableSet<byte[]> qualifiers = colFamilies != null ?
scan.getFamilyMap().get(colFamily) : null;
byte[] qualifier = null;
if (qualifiers != null && !qualifiers.isEmpty()) {
qualifier = qualifiers.pollFirst();

View File

@ -173,7 +173,6 @@ public class TestAggregateProtocol {
public void testRowCountAllTable() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
new LongColumnInterpreter();
long rowCount = aClient.rowCount(TEST_TABLE, ci,
@ -190,7 +189,6 @@ public class TestAggregateProtocol {
public void testRowCountWithInvalidRange1() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[2]);
@ -215,7 +213,6 @@ public class TestAggregateProtocol {
public void testRowCountWithInvalidRange2() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[5]);
@ -230,26 +227,6 @@ public class TestAggregateProtocol {
assertEquals(0, rowCount);
}
/**
* This should return a 0
*/
@Test (timeout=300000)
public void testRowCountWithNullCF() {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.setStartRow(ROWS[5]);
scan.setStopRow(ROWS[15]);
final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
new LongColumnInterpreter();
long rowCount = -1;
try {
rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
} catch (Throwable e) {
rowCount = 0;
}
assertEquals(0, rowCount);
}
@Test (timeout=300000)
public void testRowCountWithNullCQ() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);