From 05ab41d1bea53295d2c0790fba71c441ff85a6a5 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 3 Jan 2017 19:17:17 -0800 Subject: [PATCH] HBASE-17397 AggregationClient cleanup; Reapplied with proper JIRA number (spotted by Duo Zhang) --- .../client/coprocessor/AggregationClient.java | 94 ++++++++++++++----- 1 file changed, 71 insertions(+), 23 deletions(-) diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index cde7d41d9c9..d2363425fee 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; @@ -59,6 +58,8 @@ import org.apache.hadoop.hbase.util.Pair; import com.google.protobuf.ByteString; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; /** * This client class is for invoking the aggregate functions deployed on the @@ -81,12 +82,59 @@ import com.google.protobuf.Message; * *

Call {@link #close()} when done. */ -@InterfaceAudience.Private +@InterfaceAudience.Public public class AggregationClient implements Closeable { // TODO: This class is not used. Move to examples? private static final Log log = LogFactory.getLog(AggregationClient.class); private final Connection connection; + /** + * An RpcController implementation for use here in this endpoint. + */ + static class AggregationClientRpcController implements RpcController { + private String errorText; + private boolean cancelled = false; + private boolean failed = false; + + @Override + public String errorText() { + return this.errorText; + } + + @Override + public boolean failed() { + return this.failed; + } + + @Override + public boolean isCanceled() { + return this.cancelled; + } + + @Override + public void notifyOnCancel(RpcCallback arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + this.errorText = null; + this.cancelled = false; + this.failed = false; + } + + @Override + public void setFailed(String errorText) { + this.failed = true; + this.errorText = errorText; + } + + @Override + public void startCancel() { + this.cancelled = true; + } + } + /** * Constructor with Conf object * @param cfg @@ -160,13 +208,13 @@ public class AggregationClient implements Closeable { new Batch.Call() { @Override public R call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getMax(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); @@ -248,13 +296,13 @@ public class AggregationClient implements Closeable { @Override public R call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getMin(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); @@ -323,13 +371,13 @@ public class AggregationClient implements Closeable { new Batch.Call() { @Override public Long call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getRowNum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); @@ -388,14 +436,14 @@ public class AggregationClient implements Closeable { new Batch.Call() { @Override public S call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); // Not sure what is going on here why I have to do these casts. TODO. CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getSum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } if (response.getFirstPartCount() == 0) { return null; @@ -456,13 +504,13 @@ public class AggregationClient implements Closeable { new Batch.Call>() { @Override public Pair call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getAvg(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } Pair pair = new Pair(null, 0L); if (response.getFirstPartCount() == 0) { @@ -560,13 +608,13 @@ public class AggregationClient implements Closeable { new Batch.Call, Long>>() { @Override public Pair, Long> call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getStd(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } Pair, Long> pair = new Pair, Long>(new ArrayList(), 0L); if (response.getFirstPartCount() == 0) { @@ -676,13 +724,13 @@ public class AggregationClient implements Closeable { new Batch.Call>() { @Override public List call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getMedian(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } List list = new ArrayList();