HBASE-17346 AggregationClient cleanup

This commit is contained in:
Michael Stack 2017-01-01 16:01:10 -08:00
parent 521730ebc7
commit 0a93241b61
1 changed files with 71 additions and 23 deletions

View File

@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
@ -59,6 +58,8 @@ import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
/** /**
* This client class is for invoking the aggregate functions deployed on the * This client class is for invoking the aggregate functions deployed on the
@ -81,12 +82,59 @@ import com.google.protobuf.Message;
* </ul> * </ul>
* <p>Call {@link #close()} when done. * <p>Call {@link #close()} when done.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Public
public class AggregationClient implements Closeable { public class AggregationClient implements Closeable {
// TODO: This class is not used. Move to examples? // TODO: This class is not used. Move to examples?
private static final Log log = LogFactory.getLog(AggregationClient.class); private static final Log log = LogFactory.getLog(AggregationClient.class);
private final Connection connection; private final Connection connection;
/**
* An RpcController implementation for use here in this endpoint.
*/
static class AggregationClientRpcController implements RpcController {
private String errorText;
private boolean cancelled = false;
private boolean failed = false;
@Override
public String errorText() {
return this.errorText;
}
@Override
public boolean failed() {
return this.failed;
}
@Override
public boolean isCanceled() {
return this.cancelled;
}
@Override
public void notifyOnCancel(RpcCallback<Object> arg0) {
throw new UnsupportedOperationException();
}
@Override
public void reset() {
this.errorText = null;
this.cancelled = false;
this.failed = false;
}
@Override
public void setFailed(String errorText) {
this.failed = true;
this.errorText = errorText;
}
@Override
public void startCancel() {
this.cancelled = true;
}
}
/** /**
* Constructor with Conf object * Constructor with Conf object
* @param cfg * @param cfg
@ -160,13 +208,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, R>() { new Batch.Call<AggregateService, R>() {
@Override @Override
public R call(AggregateService instance) throws IOException { public R call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController(); RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getMax(controller, requestArg, rpcCallback); instance.getMax(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get(); AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) { if (controller.failed()) {
throw controller.getFailedOn(); throw new IOException(controller.errorText());
} }
if (response.getFirstPartCount() > 0) { if (response.getFirstPartCount() > 0) {
ByteString b = response.getFirstPart(0); ByteString b = response.getFirstPart(0);
@ -248,13 +296,13 @@ public class AggregationClient implements Closeable {
@Override @Override
public R call(AggregateService instance) throws IOException { public R call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController(); RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getMin(controller, requestArg, rpcCallback); instance.getMin(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get(); AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) { if (controller.failed()) {
throw controller.getFailedOn(); throw new IOException(controller.errorText());
} }
if (response.getFirstPartCount() > 0) { if (response.getFirstPartCount() > 0) {
ByteString b = response.getFirstPart(0); ByteString b = response.getFirstPart(0);
@ -323,13 +371,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, Long>() { new Batch.Call<AggregateService, Long>() {
@Override @Override
public Long call(AggregateService instance) throws IOException { public Long call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController(); RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getRowNum(controller, requestArg, rpcCallback); instance.getRowNum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get(); AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) { if (controller.failed()) {
throw controller.getFailedOn(); throw new IOException(controller.errorText());
} }
byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); byte[] bytes = getBytesFromResponse(response.getFirstPart(0));
ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes);
@ -388,14 +436,14 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, S>() { new Batch.Call<AggregateService, S>() {
@Override @Override
public S call(AggregateService instance) throws IOException { public S call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController(); RpcController controller = new AggregationClientRpcController();
// Not sure what is going on here why I have to do these casts. TODO. // Not sure what is going on here why I have to do these casts. TODO.
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getSum(controller, requestArg, rpcCallback); instance.getSum(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get(); AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) { if (controller.failed()) {
throw controller.getFailedOn(); throw new IOException(controller.errorText());
} }
if (response.getFirstPartCount() == 0) { if (response.getFirstPartCount() == 0) {
return null; return null;
@ -456,13 +504,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, Pair<S, Long>>() { new Batch.Call<AggregateService, Pair<S, Long>>() {
@Override @Override
public Pair<S, Long> call(AggregateService instance) throws IOException { public Pair<S, Long> call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController(); RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getAvg(controller, requestArg, rpcCallback); instance.getAvg(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get(); AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) { if (controller.failed()) {
throw controller.getFailedOn(); throw new IOException(controller.errorText());
} }
Pair<S, Long> pair = new Pair<S, Long>(null, 0L); Pair<S, Long> pair = new Pair<S, Long>(null, 0L);
if (response.getFirstPartCount() == 0) { if (response.getFirstPartCount() == 0) {
@ -560,13 +608,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, Pair<List<S>, Long>>() { new Batch.Call<AggregateService, Pair<List<S>, Long>>() {
@Override @Override
public Pair<List<S>, Long> call(AggregateService instance) throws IOException { public Pair<List<S>, Long> call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController(); RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getStd(controller, requestArg, rpcCallback); instance.getStd(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get(); AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) { if (controller.failed()) {
throw controller.getFailedOn(); throw new IOException(controller.errorText());
} }
Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L); Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L);
if (response.getFirstPartCount() == 0) { if (response.getFirstPartCount() == 0) {
@ -676,13 +724,13 @@ public class AggregationClient implements Closeable {
new Batch.Call<AggregateService, List<S>>() { new Batch.Call<AggregateService, List<S>>() {
@Override @Override
public List<S> call(AggregateService instance) throws IOException { public List<S> call(AggregateService instance) throws IOException {
ServerRpcController controller = new ServerRpcController(); RpcController controller = new AggregationClientRpcController();
CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>();
instance.getMedian(controller, requestArg, rpcCallback); instance.getMedian(controller, requestArg, rpcCallback);
AggregateResponse response = rpcCallback.get(); AggregateResponse response = rpcCallback.get();
if (controller.failedOnException()) { if (controller.failed()) {
throw controller.getFailedOn(); throw new IOException(controller.errorText());
} }
List<S> list = new ArrayList<S>(); List<S> list = new ArrayList<S>();