HBASE-6785 Convert AggregateProtocol to protobuf defined coprocessor service (Devaraj Das)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1398175 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-10-15 02:32:09 +00:00
parent 89370eef92
commit d30fff7c92
7 changed files with 2488 additions and 318 deletions

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.client.coprocessor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -39,16 +40,23 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateArgument;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
/**
* This client class is for invoking the aggregate functions deployed on the
* Region Server side via the AggregateProtocol. This class will implement the
* Region Server side via the AggregateService. This class will implement the
* supporting functionality for summing/processing the individual results
* obtained from the AggregateProtocol for each region.
* obtained from the AggregateService for each region.
* <p>
* This will serve as the client side handler for invoking the aggregate
* functions.
@ -92,7 +100,7 @@ public class AggregationClient {
*/
public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
final Scan scan) throws Throwable {
validateParameters(scan);
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class MaxCallBack implements Batch.Callback<R> {
R max = null;
@ -109,11 +117,24 @@ public class AggregationClient {
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, R>() {
@Override
public R call(AggregateProtocol instance) throws IOException {
return instance.getMax(ci, scan);
public R call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMax(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
if (response.getFirstPartCount() > 0) {
return ci.castToCellType(
ci.parseResponseAsPromotedType(
getBytesFromResponse(response.getFirstPart(0))));
}
return null;
}
}, aMaxCallBack);
} finally {
@ -149,7 +170,7 @@ public class AggregationClient {
*/
public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci,
final Scan scan) throws Throwable {
validateParameters(scan);
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class MinCallBack implements Batch.Callback<R> {
private R min = null;
@ -167,12 +188,25 @@ public class AggregationClient {
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateProtocol, R>() {
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, R>() {
@Override
public R call(AggregateProtocol instance) throws IOException {
return instance.getMin(ci, scan);
public R call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMin(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
if (response.getFirstPartCount() > 0) {
return ci.castToCellType(
ci.parseResponseAsPromotedType(
getBytesFromResponse(response.getFirstPart(0))));
}
return null;
}
}, minCallBack);
} finally {
@ -199,7 +233,7 @@ public class AggregationClient {
*/
public <R, S> long rowCount(final byte[] tableName,
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
validateParameters(scan);
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class RowNumCallback implements Batch.Callback<Long> {
private final AtomicLong rowCountL = new AtomicLong(0);
@ -216,11 +250,22 @@ public class AggregationClient {
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateProtocol, Long>() {
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, Long>() {
@Override
public Long call(AggregateProtocol instance) throws IOException {
return instance.getRowNum(ci, scan);
public Long call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getRowNum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
bb.rewind();
return bb.getLong();
}
}, rowNum);
} finally {
@ -242,7 +287,8 @@ public class AggregationClient {
*/
public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci,
final Scan scan) throws Throwable {
validateParameters(scan);
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class SumCallBack implements Batch.Callback<S> {
S sumVal = null;
@ -259,11 +305,23 @@ public class AggregationClient {
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateProtocol, S>() {
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, S>() {
@Override
public S call(AggregateProtocol instance) throws IOException {
return instance.getSum(ci, scan);
public S call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getSum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
if (response.getFirstPartCount() == 0) {
return null;
}
return ci.parseResponseAsPromotedType(
getBytesFromResponse(response.getFirstPart(0)));
}
}, sumCallBack);
} finally {
@ -284,7 +342,7 @@ public class AggregationClient {
*/
private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName,
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
validateParameters(scan);
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
S sum = null;
Long rowCount = 0l;
@ -303,13 +361,31 @@ public class AggregationClient {
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(),
new Batch.Call<AggregateProtocol, Pair<S, Long>>() {
new Batch.Call<AggregateService, Pair<S, Long>>() {
@Override
public Pair<S, Long> call(AggregateProtocol instance)
public Pair<S, Long> call(AggregateService instance)
throws IOException {
return instance.getAvg(ci, scan);
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getAvg(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
Pair<S,Long> pair = new Pair<S, Long>(null, 0L);
if (response.getFirstPartCount() == 0) {
return pair;
}
pair.setFirst(ci.parseResponseAsPromotedType(
getBytesFromResponse(response.getFirstPart(0))));
ByteBuffer bb = ByteBuffer.allocate(8).put(
getBytesFromResponse(response.getSecondPart()));
bb.rewind();
pair.setSecond(bb.getLong());
return pair;
}
}, avgCallBack);
} finally {
@ -351,7 +427,7 @@ public class AggregationClient {
*/
private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName,
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
validateParameters(scan);
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
long rowCountVal = 0l;
S sumVal = null, sumSqVal = null;
@ -366,24 +442,48 @@ public class AggregationClient {
@Override
public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
sumVal = ci.add(sumVal, result.getFirst().get(0));
sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
rowCountVal += result.getSecond();
if (result.getFirst().size() > 0) {
sumVal = ci.add(sumVal, result.getFirst().get(0));
sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
rowCountVal += result.getSecond();
}
}
}
StdCallback stdCallback = new StdCallback();
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(),
new Batch.Call<AggregateProtocol, Pair<List<S>, Long>>() {
new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
@Override
public Pair<List<S>, Long> call(AggregateProtocol instance)
public Pair<List<S>, Long> call(AggregateService instance)
throws IOException {
return instance.getStd(ci, scan);
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getStd(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
Pair<List<S>,Long> pair =
new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
if (response.getFirstPartCount() == 0) {
return pair;
}
List<S> list = new ArrayList<S>();
for (int i = 0; i < response.getFirstPartCount(); i++) {
list.add(ci.parseResponseAsPromotedType(
getBytesFromResponse(response.getFirstPart(i))));
}
pair.setFirst(list);
ByteBuffer bb = ByteBuffer.allocate(8).put(
getBytesFromResponse(response.getSecondPart()));
bb.rewind();
pair.setSecond(bb.getLong());
return pair;
}
}, stdCallback);
} finally {
if (table != null) {
@ -431,7 +531,7 @@ public class AggregationClient {
private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
getMedianArgs(final byte[] tableName,
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
validateParameters(scan);
final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
final NavigableMap<byte[], List<S>> map =
new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
class StdCallback implements Batch.Callback<List<S>> {
@ -457,11 +557,25 @@ public class AggregationClient {
HTable table = null;
try {
table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateProtocol, List<S>>() {
table.coprocessorService(AggregateService.class, scan.getStartRow(),
scan.getStopRow(), new Batch.Call<AggregateService, List<S>>() {
@Override
public List<S> call(AggregateProtocol instance) throws IOException {
return instance.getMedian(ci, scan);
public List<S> call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<AggregateResponse> rpcCallback =
new BlockingRpcCallback<AggregateResponse>();
instance.getMedian(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
List<S> list = new ArrayList<S>();
for (int i = 0; i < response.getFirstPartCount(); i++) {
list.add(ci.parseResponseAsPromotedType(
getBytesFromResponse(response.getFirstPart(i))));
}
return list;
}
}, stdCallback);
@ -557,4 +671,30 @@ public class AggregationClient {
}
return null;
}
<R,S>AggregateArgument validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S> ci)
throws IOException {
validateParameters(scan);
final AggregateArgument.Builder requestBuilder =
AggregateArgument.newBuilder();
requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
if (ci.columnInterpreterSpecificData() != null) {
requestBuilder.setInterpreterSpecificBytes(
ci.columnInterpreterSpecificData());
}
requestBuilder.setScan(ProtobufUtil.toScan(scan));
return requestBuilder.build();
}
byte[] getBytesFromResponse(ByteString response) {
ByteBuffer bb = response.asReadOnlyByteBuffer();
bb.rewind();
byte[] bytes;
if (bb.hasArray()) {
bytes = bb.array();
} else {
bytes = response.toByteArray();
}
return bytes;
}
}

View File

@ -18,9 +18,8 @@
*/
package org.apache.hadoop.hbase.client.coprocessor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -28,6 +27,8 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ByteString;
/**
* a concrete column interpreter implementation. The cell value is a Long value
* and its promoted data type is also a Long value. For computing aggregation
@ -85,16 +86,6 @@ public class LongColumnInterpreter implements ColumnInterpreter<Long, Long> {
return Long.MIN_VALUE;
}
@Override
public void readFields(DataInput arg0) throws IOException {
// nothing to serialize
}
@Override
public void write(DataOutput arg0) throws IOException {
// nothing to serialize
}
@Override
public double divideForAvg(Long l1, Long l2) {
return (l2 == null || l1 == null) ? Double.NaN : (l1.doubleValue() / l2
@ -106,4 +97,45 @@ public class LongColumnInterpreter implements ColumnInterpreter<Long, Long> {
return o;
}
}
@Override
public Long parseResponseAsPromotedType(byte[] response) {
ByteBuffer b = ByteBuffer.allocate(8).put(response);
b.rewind();
long l = b.getLong();
return l;
}
@Override
public Long castToCellType(Long l) {
return l;
}
@Override
public ByteString columnInterpreterSpecificData() {
// nothing
return null;
}
@Override
public void initialize(ByteString bytes) {
// nothing
}
@Override
public ByteString getProtoForCellType(Long t) {
return getProtoForPromotedOrCellType(t);
}
@Override
public ByteString getProtoForPromotedType(Long s) {
return getProtoForPromotedOrCellType(s);
}
private ByteString getProtoForPromotedOrCellType(Long s) {
ByteBuffer bb = ByteBuffer.allocate(8).putLong(s);
bb.rewind();
ByteString bs = ByteString.copyFrom(bb);
return bs;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
@ -27,45 +28,59 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateArgument;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* A concrete AggregateProtocol implementation. Its system level coprocessor
* that computes the aggregate function at a region level.
* @param <T>
* @param <S>
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AggregateImplementation extends BaseEndpointCoprocessor implements
AggregateProtocol {
public class AggregateImplementation<T, S> extends AggregateService implements
CoprocessorService, Coprocessor {
protected static Log log = LogFactory.getLog(AggregateImplementation.class);
private RegionCoprocessorEnvironment env;
/**
* Gives the maximum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, maximum value for the
* entire column family will be returned.
*/
@Override
public ProtocolSignature getProtocolSignature(
String protocol, long version, int clientMethodsHashCode)
throws IOException {
if (AggregateProtocol.class.getName().equals(protocol)) {
return new ProtocolSignature(AggregateProtocol.VERSION, null);
}
throw new IOException("Unknown protocol: " + protocol);
}
@Override
public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
T temp;
public void getMax(RpcController controller, AggregateArgument request,
RpcCallback<AggregateResponse> done) {
InternalScanner scanner = null;
AggregateResponse response = null;
T max = null;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
// qualifier can be null.
try {
ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = env.getRegion().getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
// qualifier can be null.
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
@ -75,26 +90,46 @@ public class AggregateImplementation extends BaseEndpointCoprocessor implements
}
results.clear();
} while (hasMoreRows);
if (max != null) {
AggregateResponse.Builder builder = AggregateResponse.newBuilder();
builder.addFirstPart(ci.getProtoForCellType(max));
response = builder.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
scanner.close();
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
log.info("Maximum from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + max);
return max;
+ env.getRegion().getRegionNameAsString() + ": " + max);
done.run(response);
}
/**
* Gives the minimum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, minimum value for the
* entire column family will be returned.
*/
@Override
public <T, S> T getMin(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
public void getMin(RpcController controller, AggregateArgument request,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
T min = null;
T temp;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
try {
ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = env.getRegion().getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
@ -104,27 +139,46 @@ public class AggregateImplementation extends BaseEndpointCoprocessor implements
}
results.clear();
} while (hasMoreRows);
if (min != null) {
response = AggregateResponse.newBuilder().addFirstPart(
ci.getProtoForCellType(min)).build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
scanner.close();
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
log.info("Minimum from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + min);
return min;
+ env.getRegion().getRegionNameAsString() + ": " + min);
done.run(response);
}
/**
* Gives the sum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, sum for the entire column
* family will be returned.
*/
@Override
public <T, S> S getSum(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
public void getSum(RpcController controller, AggregateArgument request,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
long sum = 0l;
S sumVal = null;
T temp;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
try {
ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null;
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
@ -135,27 +189,43 @@ public class AggregateImplementation extends BaseEndpointCoprocessor implements
}
results.clear();
} while (hasMoreRows);
if (sumVal != null) {
response = AggregateResponse.newBuilder().addFirstPart(
ci.getProtoForPromotedType(sumVal)).build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
scanner.close();
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
log.debug("Sum from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + sum);
return sumVal;
+ env.getRegion().getRegionNameAsString() + ": " + sum);
done.run(response);
}
/**
* Gives the row count for the given column family and column qualifier, in
* the given row range as defined in the Scan object.
* @throws IOException
*/
@Override
public <T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
public void getRowNum(RpcController controller, AggregateArgument request,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
long counter = 0l;
List<KeyValue> results = new ArrayList<KeyValue>();
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
if (scan.getFilter() == null && qualifier == null)
scan.setFilter(new FirstKeyOnlyFilter());
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
InternalScanner scanner = null;
try {
Scan scan = ProtobufUtil.toScan(request.getScan());
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
if (scan.getFilter() == null && qualifier == null)
scan.setFilter(new FirstKeyOnlyFilter());
scanner = env.getRegion().getScanner(scan);
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
@ -164,27 +234,53 @@ public class AggregateImplementation extends BaseEndpointCoprocessor implements
}
results.clear();
} while (hasMoreRows);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
bb.rewind();
response = AggregateResponse.newBuilder().addFirstPart(
ByteString.copyFrom(bb)).build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
scanner.close();
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
log.info("Row counter from this region is "
+ ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getRegionNameAsString() + ": " + counter);
return counter;
+ env.getRegion().getRegionNameAsString() + ": " + counter);
done.run(response);
}
/**
* Gives a Pair with first object as Sum and second object as row count,
* computed for a given combination of column qualifier and column family in
* the given row range as defined in the Scan object. In its current
* implementation, it takes one column family and one column qualifier (if
* provided). In case of null column qualifier, an aggregate sum over all the
* entire column family will be returned.
* <p>
* The average is computed in
* {@link AggregationClient#avg(byte[], ColumnInterpreter, Scan)} by
* processing results from all regions, so its "ok" to pass sum and a Long
* type.
*/
@Override
public <T, S> Pair<S, Long> getAvg(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
S sumVal = null;
Long rowCountVal = 0l;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
public void getAvg(RpcController controller, AggregateArgument request,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
try {
ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null;
Long rowCountVal = 0l;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
do {
results.clear();
hasMoreRows = scanner.next(results);
@ -194,26 +290,53 @@ public class AggregateImplementation extends BaseEndpointCoprocessor implements
}
rowCountVal++;
} while (hasMoreRows);
if (sumVal != null) {
ByteString first = ci.getProtoForPromotedType(sumVal);
AggregateResponse.Builder pair = AggregateResponse.newBuilder();
pair.addFirstPart(first);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
bb.rewind();
pair.setSecondPart(ByteString.copyFrom(bb));
response = pair.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
scanner.close();
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
Pair<S, Long> pair = new Pair<S, Long>(sumVal, rowCountVal);
return pair;
done.run(response);
}
/**
* Gives a Pair with first object a List containing Sum and sum of squares,
* and the second object as row count. It is computed for a given combination of
* column qualifier and column family in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* one column qualifier (if provided). The idea is get the value of variance first:
* the average of the squares less the square of the average a standard
* deviation is square root of variance.
*/
@Override
public <T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
S sumVal = null, sumSqVal = null, tempVal = null;
long rowCountVal = 0l;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
public void getStd(RpcController controller, AggregateArgument request,
RpcCallback<AggregateResponse> done) {
InternalScanner scanner = null;
AggregateResponse response = null;
try {
ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null, sumSqVal = null, tempVal = null;
long rowCountVal = 0l;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
do {
tempVal = null;
hasMoreRows = scanner.next(results);
@ -226,32 +349,56 @@ public class AggregateImplementation extends BaseEndpointCoprocessor implements
sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
rowCountVal++;
} while (hasMoreRows);
if (sumVal != null) {
ByteString first_sumVal = ci.getProtoForPromotedType(sumVal);
ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal);
AggregateResponse.Builder pair = AggregateResponse.newBuilder();
pair.addFirstPart(first_sumVal);
pair.addFirstPart(first_sumSqVal);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
bb.rewind();
pair.setSecondPart(ByteString.copyFrom(bb));
response = pair.build();
}
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
scanner.close();
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
List<S> l = new ArrayList<S>();
l.add(sumVal);
l.add(sumSqVal);
Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal);
return p;
done.run(response);
}
/**
* Gives a List containing sum of values and sum of weights.
* It is computed for the combination of column
* family and column qualifier(s) in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* two column qualifiers. The first qualifier is for values column and
* the second qualifier (optional) is for weight column.
*/
@Override
public <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
byte[] valQualifier = quals.pollFirst();
// if weighted median is requested, get qualifier for the weight column
byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
public void getMedian(RpcController controller, AggregateArgument request,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
try {
ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
Scan scan = ProtobufUtil.toScan(request.getScan());
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
byte[] valQualifier = quals.pollFirst();
// if weighted median is requested, get qualifier for the weight column
byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
do {
tempVal = null;
tempWeight = null;
@ -268,13 +415,73 @@ public class AggregateImplementation extends BaseEndpointCoprocessor implements
sumVal = ci.add(sumVal, tempVal);
sumWeights = ci.add(sumWeights, tempWeight);
} while (hasMoreRows);
ByteString first_sumVal = ci.getProtoForPromotedType(sumVal);
S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
ByteString first_sumWeights = ci.getProtoForPromotedType(s);
AggregateResponse.Builder pair = AggregateResponse.newBuilder();
pair.addFirstPart(first_sumVal);
pair.addFirstPart(first_sumWeights);
response = pair.build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
scanner.close();
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
List<S> l = new ArrayList<S>();
l.add(sumVal);
l.add(sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights);
return l;
done.run(response);
}
@SuppressWarnings("unchecked")
ColumnInterpreter<T,S> constructColumnInterpreterFromRequest(
AggregateArgument request) throws IOException {
String className = request.getInterpreterClassName();
Class<?> cls;
try {
cls = Class.forName(className);
ColumnInterpreter<T,S> ci = (ColumnInterpreter<T, S>) cls.newInstance();
if (request.hasInterpreterSpecificBytes()) {
ci.initialize(request.getInterpreterSpecificBytes());
}
return ci;
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
@Override
public Service getService() {
return this;
}
/**
* Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
* coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
* on a table region, so always expects this to be an instance of
* {@link RegionCoprocessorEnvironment}.
* @param env the environment provided by the coprocessor host
* @throws IOException if the provided environment is not an instance of
* {@code RegionCoprocessorEnvironment}
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
}

View File

@ -1,147 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.util.Pair;
/**
* Defines the aggregation functions that are to be supported in this
* Coprocessor. For each method, it takes a Scan object and a columnInterpreter.
* The scan object should have a column family (else an exception will be
* thrown), and an optional column qualifier. In the current implementation
* {@link AggregateImplementation}, only one column family and column qualifier
* combination is served. In case there are more than one, only first one will
* be picked. Refer to {@link AggregationClient} for some general conditions on
* input parameters.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface AggregateProtocol extends CoprocessorProtocol {
public static final long VERSION = 1L;
/**
* Gives the maximum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, maximum value for the
* entire column family will be returned.
* @param ci
* @param scan
* @return max value as mentioned above
* @throws IOException
*/
<T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
/**
* Gives the minimum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, minimum value for the
* entire column family will be returned.
* @param ci
* @param scan
* @return min as mentioned above
* @throws IOException
*/
<T, S> T getMin(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
/**
* Gives the sum for a given combination of column qualifier and column
* family, in the given row range as defined in the Scan object. In its
* current implementation, it takes one column family and one column qualifier
* (if provided). In case of null column qualifier, sum for the entire column
* family will be returned.
* @param ci
* @param scan
* @return sum of values as defined by the column interpreter
* @throws IOException
*/
<T, S> S getSum(ColumnInterpreter<T, S> ci, Scan scan) throws IOException;
/**
* @param ci
* @param scan
* @return Row count for the given column family and column qualifier, in
* the given row range as defined in the Scan object.
* @throws IOException
*/
<T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
/**
* Gives a Pair with first object as Sum and second object as row count,
* computed for a given combination of column qualifier and column family in
* the given row range as defined in the Scan object. In its current
* implementation, it takes one column family and one column qualifier (if
* provided). In case of null column qualifier, an aggregate sum over all the
* entire column family will be returned.
* <p>
* The average is computed in
* {@link AggregationClient#avg(byte[], ColumnInterpreter, Scan)} by
* processing results from all regions, so its "ok" to pass sum and a Long
* type.
* @param ci
* @param scan
* @return Average
* @throws IOException
*/
<T, S> Pair<S, Long> getAvg(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
/**
* Gives a Pair with first object a List containing Sum and sum of squares,
* and the second object as row count. It is computed for a given combination of
* column qualifier and column family in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* one column qualifier (if provided). The idea is get the value of variance first:
* the average of the squares less the square of the average a standard
* deviation is square root of variance.
* @param ci
* @param scan
* @return STD
* @throws IOException
*/
<T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
/**
* Gives a List containing sum of values and sum of weights.
* It is computed for the combination of column
* family and column qualifier(s) in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* two column qualifiers. The first qualifier is for values column and
* the second qualifier (optional) is for weight column.
* @param ci
* @param scan
* @return Pair
* @throws IOException
*/
<T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
}

View File

@ -25,7 +25,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.io.Writable;
import com.google.protobuf.ByteString;
/**
* Defines how value for specific column is interpreted and provides utility
@ -48,7 +49,7 @@ import org.apache.hadoop.io.Writable;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ColumnInterpreter<T, S> extends Writable {
public interface ColumnInterpreter<T, S> {
/**
* @param colFamily
@ -114,4 +115,50 @@ public interface ColumnInterpreter<T, S> extends Writable {
* @return Average
*/
double divideForAvg(S o, Long l);
}
/**
* This method should return any additional data that is needed on the
* server side to construct the ColumnInterpreter. The server
* will pass this to the {@link #initialize(org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.ColumnInterpreter)}
* method. If there is no ColumnInterpreter specific data (for e.g.,
* {@link LongColumnInterpreter}) then null should be returned.
* @return the PB message
*/
ByteString columnInterpreterSpecificData();
/**
* Return the PB for type T
* @param t
* @return PB-message
*/
ByteString getProtoForCellType(T t);
/**
* Return the PB for type S
* @param s
* @return PB-message
*/
ByteString getProtoForPromotedType(S s);
/**
* This method should initialize any field(s) of the ColumnInterpreter with
* a parsing of the passed message bytes (used on the server side).
* @param bytes
*/
void initialize(ByteString bytes);
/**
* Converts the bytes in the server's response to the expected type S
* @param response
* @return response of type S constructed from the message
*/
S parseResponseAsPromotedType(byte[] response);
/**
* The response message comes as type S. This will convert/cast it to T.
* In some sense, performs the opposite of {@link #castToReturnType(Object)}
* @param response
* @return cast
*/
T castToCellType(S response);
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "AggregateProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "Client.proto";
message AggregateArgument {
/** The argument passed to the AggregateService consists of three parts
* (1) the (canonical) classname of the ColumnInterpreter implementation
* (2) the Scan query
* (3) any bytes required to construct the ColumnInterpreter object
* properly
*/
required string interpreterClassName = 1;
required Scan scan = 2;
optional bytes interpreterSpecificBytes = 3;
}
message AggregateResponse {
/**
* The AggregateService methods all have a response that either is a Pair
* or a simple object. When it is a Pair both firstPart and secondPart
* have defined values (and the secondPart is not present in the response
* when the response is not a pair). Refer to the AggregateImplementation
* class for an overview of the AggregateResponse object constructions.
*/
repeated bytes firstPart = 1;
optional bytes secondPart = 2;
}
/** Refer to the AggregateImplementation class for an overview of the
* AggregateService method implementations and their functionality.
*/
service AggregateService {
rpc getMax (AggregateArgument) returns (AggregateResponse);
rpc getMin (AggregateArgument) returns (AggregateResponse);
rpc getSum (AggregateArgument) returns (AggregateResponse);
rpc getRowNum (AggregateArgument) returns (AggregateResponse);
rpc getAvg (AggregateArgument) returns (AggregateResponse);
rpc getStd (AggregateArgument) returns (AggregateResponse);
rpc getMedian (AggregateArgument) returns (AggregateResponse);
}