diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index d2363425fee..cde7d41d9c9 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; @@ -58,8 +59,6 @@ import org.apache.hadoop.hbase.util.Pair; import com.google.protobuf.ByteString; import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; /** * This client class is for invoking the aggregate functions deployed on the @@ -82,59 +81,12 @@ import com.google.protobuf.RpcController; * *

Call {@link #close()} when done. */ -@InterfaceAudience.Public +@InterfaceAudience.Private public class AggregationClient implements Closeable { // TODO: This class is not used. Move to examples? private static final Log log = LogFactory.getLog(AggregationClient.class); private final Connection connection; - /** - * An RpcController implementation for use here in this endpoint. - */ - static class AggregationClientRpcController implements RpcController { - private String errorText; - private boolean cancelled = false; - private boolean failed = false; - - @Override - public String errorText() { - return this.errorText; - } - - @Override - public boolean failed() { - return this.failed; - } - - @Override - public boolean isCanceled() { - return this.cancelled; - } - - @Override - public void notifyOnCancel(RpcCallback arg0) { - throw new UnsupportedOperationException(); - } - - @Override - public void reset() { - this.errorText = null; - this.cancelled = false; - this.failed = false; - } - - @Override - public void setFailed(String errorText) { - this.failed = true; - this.errorText = errorText; - } - - @Override - public void startCancel() { - this.cancelled = true; - } - } - /** * Constructor with Conf object * @param cfg @@ -208,13 +160,13 @@ public class AggregationClient implements Closeable { new Batch.Call() { @Override public R call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getMax(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); @@ -296,13 +248,13 @@ public class AggregationClient implements Closeable { @Override public R call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getMin(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); @@ -371,13 +323,13 @@ public class AggregationClient implements Closeable { new Batch.Call() { @Override public Long call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getRowNum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); @@ -436,14 +388,14 @@ public class AggregationClient implements Closeable { new Batch.Call() { @Override public S call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); // Not sure what is going on here why I have to do these casts. TODO. CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getSum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } if (response.getFirstPartCount() == 0) { return null; @@ -504,13 +456,13 @@ public class AggregationClient implements Closeable { new Batch.Call>() { @Override public Pair call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getAvg(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } Pair pair = new Pair(null, 0L); if (response.getFirstPartCount() == 0) { @@ -608,13 +560,13 @@ public class AggregationClient implements Closeable { new Batch.Call, Long>>() { @Override public Pair, Long> call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getStd(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } Pair, Long> pair = new Pair, Long>(new ArrayList(), 0L); if (response.getFirstPartCount() == 0) { @@ -724,13 +676,13 @@ public class AggregationClient implements Closeable { new Batch.Call>() { @Override public List call(AggregateService instance) throws IOException { - RpcController controller = new AggregationClientRpcController(); + ServerRpcController controller = new ServerRpcController(); CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback(); instance.getMedian(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failed()) { - throw new IOException(controller.errorText()); + if (controller.failedOnException()) { + throw controller.getFailedOn(); } List list = new ArrayList();