diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java new file mode 100644 index 00000000000..149e1d3bc0c --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientCoprocessorRpcController.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Client side rpc controller for coprocessor implementation. It is only used to pass error. + */ +@InterfaceAudience.Private +public class ClientCoprocessorRpcController implements RpcController { + + private Throwable error; + + @Override + public void reset() { + } + + @Override + public boolean failed() { + return error != null; + } + + @Override + public String errorText() { + return error != null ? error.getMessage() : null; + } + + @Override + public void startCancel() { + throw new UnsupportedOperationException(); + } + + @Override + public void setFailed(String reason) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCanceled() { + return false; + } + + @Override + public void notifyOnCancel(RpcCallback callback) { + throw new UnsupportedOperationException(); + } + + public void setFailed(Throwable error) { + this.error = error; + } + + public Throwable getFailed() { + return error; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index 67099e86756..59924cfe6d3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -60,4 +68,185 @@ public interface RawAsyncTable extends AsyncTableBase { * @param consumer the consumer used to receive results. */ void scan(Scan scan, RawScanResultConsumer consumer); + + /** + * Delegate to a protobuf rpc call. + *

+ * Usually, it is just a simple lambda expression, like: + * + *

+   * 
+   * (stub, controller, rpcCallback) -> {
+   *   XXXRequest request = ...; // prepare the request
+   *   stub.xxx(controller, request, rpcCallback);
+   * }
+   * 
+   * 
+ * + * And if you can prepare the {@code request} before calling the coprocessorService method, the + * lambda expression will be: + * + *
+   * 
+   * (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback)
+   * 
+   * 
+ */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + @FunctionalInterface + interface CoprocessorCallable { + + /** + * Represent the actual protobuf rpc call. + * @param stub the asynchronous stub + * @param controller the rpc controller, has already been prepared for you + * @param rpcCallback the rpc callback, has already been prepared for you + */ + void call(S stub, RpcController controller, RpcCallback rpcCallback); + } + + /** + * Execute the given coprocessor call on the region which contains the given {@code row}. + *

+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + *

+   * 
+   * channel -> xxxService.newStub(channel)
+   * 
+   * 
+ * + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link CoprocessorCallable} for more details. + * @param row The row key used to identify the remote region location + * @param the type of the asynchronous stub + * @param the type of the return value + * @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}. + * @see CoprocessorCallable + */ + CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] row); + + /** + * The callback when we want to execute a coprocessor call on a range of regions. + *

+ * As the locating itself also takes some time, the implementation may want to send rpc calls on + * the fly, which means we do not know how many regions we have when we get the return value of + * the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have + * passed all the return values to you(through the {@link #onRegionComplete(HRegionInfo, Object)} + * or {@link #onRegionError(HRegionInfo, Throwable)} calls), i.e, there will be no + * {@link #onRegionComplete(HRegionInfo, Object)} or + * {@link #onRegionError(HRegionInfo, Throwable)} calls in the future. + *

+ * Here is a pseudo code to describe a typical implementation of a range coprocessor service + * method to help you better understand how the {@link CoprocessorCallback} will be called. The + * {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the + * {@code whenComplete} is {@code CompletableFuture.whenComplete}. + * + *

+   * locateThenCall(byte[] row) {
+   *   locate(row).whenComplete((location, locateError) -> {
+   *     if (locateError != null) {
+   *       callback.onError(locateError);
+   *       return;
+   *     }
+   *     incPendingCall();
+   *     region = location.getRegion();
+   *     if (region.getEndKey() > endKey) {
+   *       locateEnd = true;
+   *     } else {
+   *       locateThenCall(region.getEndKey());
+   *     }
+   *     sendCall().whenComplete((resp, error) -> {
+   *       if (error != null) {
+   *         callback.onRegionError(region, error);
+   *       } else {
+   *         callback.onRegionComplete(region, resp);
+   *       }
+   *       if (locateEnd && decPendingCallAndGet() == 0) {
+   *         callback.onComplete();
+   *       }
+   *     });
+   *   });
+   * }
+   * 
+ */ + @InterfaceAudience.Public + @InterfaceStability.Unstable + interface CoprocessorCallback { + + /** + * @param region the region that the response belongs to + * @param resp the response of the coprocessor call + */ + void onRegionComplete(HRegionInfo region, R resp); + + /** + * @param region the region that the error belongs to + * @param error the response error of the coprocessor call + */ + void onRegionError(HRegionInfo region, Throwable error); + + /** + * Indicate that all responses of the regions have been notified by calling + * {@link #onRegionComplete(HRegionInfo, Object)} or + * {@link #onRegionError(HRegionInfo, Throwable)}. + */ + void onComplete(); + + /** + * Indicate that we got an error which does not belong to any regions. Usually a locating error. + */ + void onError(Throwable error); + } + + /** + * Execute the given coprocessor call on the regions which are covered by the range from + * {@code startKey} inclusive and {@code endKey} exclusive. See the comment of + * {@link #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, CoprocessorCallback)} + * for more details. + * @see #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, + * CoprocessorCallback) + */ + default void coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] startKey, byte[] endKey, + CoprocessorCallback callback) { + coprocessorService(stubMaker, callable, startKey, true, endKey, false, callback); + } + + /** + * Execute the given coprocessor call on the regions which are covered by the range from + * {@code startKey} and {@code endKey}. The inclusive of boundaries are specified by + * {@code startKeyInclusive} and {@code endKeyInclusive}. The {@code stubMaker} is just a + * delegation to the {@code xxxService.newStub} call. Usually it is only a one line lambda + * expression, like: + * + *
+   * 
+   * channel -> xxxService.newStub(channel)
+   * 
+   * 
+ * + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link CoprocessorCallable} for more details. + * @param startKey start region selection with region containing this row. If {@code null}, the + * selection will start with the first table region. + * @param startKeyInclusive whether to include the startKey + * @param endKey select regions up to and including the region containing this row. If + * {@code null}, selection will continue through the last table region. + * @param endKeyInclusive whether to include the endKey + * @param callback callback to get the response. See the comment of {@link CoprocessorCallback} + * for more details. + * @param the type of the asynchronous stub + * @param the type of the return value + * @see CoprocessorCallable + * @see CoprocessorCallback + */ + void coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey, + boolean endKeyInclusive, CoprocessorCallback callback); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 87323ac278b..00f255e6cbf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -18,17 +18,26 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; +import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; + +import com.google.protobuf.RpcChannel; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -436,4 +445,90 @@ class RawAsyncTableImpl implements RawAsyncTable { return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS); } + private CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, HRegionInfo region, byte[] row) { + RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName, + region, row, rpcTimeoutNs, operationTimeoutNs); + S stub = stubMaker.apply(channel); + CompletableFuture future = new CompletableFuture<>(); + ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController(); + callable.call(stub, controller, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + }); + return future; + } + + @Override + public CompletableFuture coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] row) { + return coprocessorService(stubMaker, callable, null, row); + } + + private boolean locateFinished(HRegionInfo region, byte[] endKey, boolean endKeyInclusive) { + if (isEmptyStopRow(endKey)) { + if (isEmptyStopRow(region.getEndKey())) { + return true; + } + return false; + } else { + if (isEmptyStopRow(region.getEndKey())) { + return true; + } + int c = Bytes.compareTo(endKey, region.getEndKey()); + // 1. if the region contains endKey + // 2. endKey is equal to the region's endKey and we do not want to include endKey. + return c < 0 || c == 0 && !endKeyInclusive; + } + } + + private void onLocateComplete(Function stubMaker, + CoprocessorCallable callable, CoprocessorCallback callback, + List locs, byte[] endKey, boolean endKeyInclusive, + AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc, + Throwable error) { + if (error != null) { + callback.onError(error); + return; + } + unfinishedRequest.incrementAndGet(); + HRegionInfo region = loc.getRegionInfo(); + if (locateFinished(region, endKey, endKeyInclusive)) { + locateFinished.set(true); + } else { + conn.getLocator() + .getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT, + operationTimeoutNs) + .whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, + endKeyInclusive, locateFinished, unfinishedRequest, l, e)); + } + coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> { + if (e != null) { + callback.onRegionError(region, e); + } else { + callback.onRegionComplete(region, r); + } + if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { + callback.onComplete(); + } + }); + } + + @Override + public void coprocessorService(Function stubMaker, + CoprocessorCallable callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey, + boolean endKeyInclusive, CoprocessorCallback callback) { + byte[] nonNullStartKey = Optional.ofNullable(startKey).orElse(EMPTY_START_ROW); + byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW); + List locs = new ArrayList<>(); + conn.getLocator() + .getRegionLocation(tableName, nonNullStartKey, + startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs) + .whenComplete( + (loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey, + endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error)); + } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java new file mode 100644 index 00000000000..28a55649d00 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcChannel; +import com.google.protobuf.RpcController; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * The implementation of a region based coprocessor rpc channel. + */ +@InterfaceAudience.Private +class RegionCoprocessorRpcChannelImpl implements RpcChannel { + + private final AsyncConnectionImpl conn; + + private final TableName tableName; + + private final HRegionInfo region; + + private final byte[] row; + + private final long rpcTimeoutNs; + + private final long operationTimeoutNs; + + RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, HRegionInfo region, + byte[] row, long rpcTimeoutNs, long operationTimeoutNs) { + this.conn = conn; + this.tableName = tableName; + this.region = region; + this.row = row; + this.rpcTimeoutNs = rpcTimeoutNs; + this.operationTimeoutNs = operationTimeoutNs; + } + + private CompletableFuture rpcCall(MethodDescriptor method, Message request, + Message responsePrototype, HBaseRpcController controller, HRegionLocation loc, + ClientService.Interface stub) { + CompletableFuture future = new CompletableFuture<>(); + if (region != null + && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) { + future.completeExceptionally(new DoNotRetryIOException( + "Region name is changed, expected " + region.getRegionNameAsString() + ", actual " + + loc.getRegionInfo().getRegionNameAsString())); + return future; + } + CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method, + request, row, loc.getRegionInfo().getRegionName()); + stub.execService(controller, csr, + new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback() { + + @Override + public void run(CoprocessorServiceResponse resp) { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + try { + future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype)); + } catch (IOException e) { + future.completeExceptionally(e); + } + } + } + }); + return future; + } + + @Override + public void callMethod(MethodDescriptor method, RpcController controller, Message request, + Message responsePrototype, RpcCallback done) { + conn.callerFactory. single().table(tableName).row(row) + .locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) + .action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call() + .whenComplete((r, e) -> { + if (e != null) { + ((ClientCoprocessorRpcController) controller).setFailed(e); + } + done.run(r); + }); + } + +} diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index 1eda730fcfb..304722ec39f 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -19,12 +19,16 @@ package org.apache.hadoop.hbase.client.coprocessor; +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + import java.io.Closeable; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -49,18 +53,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import com.google.protobuf.ByteString; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - /** * This client class is for invoking the aggregate functions deployed on the * Region Server side via the AggregateService. This class will implement the @@ -227,23 +225,7 @@ public class AggregationClient implements Closeable { return aMaxCallBack.getMax(); } - /* - * @param scan - * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan - */ - private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException { - if (scan == null - || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals( - scan.getStartRow(), HConstants.EMPTY_START_ROW)) - || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals( - scan.getStopRow(), HConstants.EMPTY_END_ROW))) { - throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow"); - } else if (!canFamilyBeAbsent) { - if (scan.getFamilyMap().size() != 1) { - throw new IOException("There must be only one family."); - } - } - } + /** * It gives the minimum value of a column for a given column family for the @@ -846,22 +828,6 @@ public class AggregationClient implements Closeable { return null; } - AggregateRequest - validateArgAndGetPB(Scan scan, ColumnInterpreter ci, boolean canFamilyBeAbsent) - throws IOException { - validateParameters(scan, canFamilyBeAbsent); - final AggregateRequest.Builder requestBuilder = - AggregateRequest.newBuilder(); - requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); - P columnInterpreterSpecificData = null; - if ((columnInterpreterSpecificData = ci.getRequestData()) - != null) { - requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); - } - requestBuilder.setScan(ProtobufUtil.toScan(scan)); - return requestBuilder.build(); - } - byte[] getBytesFromResponse(ByteString response) { ByteBuffer bb = response.asReadOnlyByteBuffer(); bb.rewind(); @@ -873,40 +839,4 @@ public class AggregationClient implements Closeable { } return bytes; } - - /** - * Get an instance of the argument type declared in a class's signature. The - * argument type is assumed to be a PB Message subclass, and the instance is - * created using parseFrom method on the passed ByteString. - * @param runtimeClass the runtime type of the class - * @param position the position of the argument in the class declaration - * @param b the ByteString which should be parsed to get the instance created - * @return the instance - * @throws IOException - */ - @SuppressWarnings("unchecked") - // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO. - public static - T getParsedGenericInstance(Class runtimeClass, int position, ByteString b) - throws IOException { - Type type = runtimeClass.getGenericSuperclass(); - Type argType = ((ParameterizedType)type).getActualTypeArguments()[position]; - Class classType = (Class)argType; - T inst; - try { - Method m = classType.getMethod("parseFrom", ByteString.class); - inst = (T)m.invoke(null, b); - return inst; - } catch (SecurityException e) { - throw new IOException(e); - } catch (NoSuchMethodException e) { - throw new IOException(e); - } catch (IllegalArgumentException e) { - throw new IOException(e); - } catch (InvocationTargetException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { - throw new IOException(e); - } - } } \ No newline at end of file diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java new file mode 100644 index 00000000000..b91128c8e85 --- /dev/null +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationHelper.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.coprocessor; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Helper class for constructing aggregation request and response. + */ +@InterfaceAudience.Private +public class AggregationHelper { + + /** + * @param scan + * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan + */ + private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException { + if (scan == null + || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) + && !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW)) + || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) + && !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) { + throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow"); + } else if (!canFamilyBeAbsent) { + if (scan.getFamilyMap().size() != 1) { + throw new IOException("There must be only one family."); + } + } + } + + static AggregateRequest + validateArgAndGetPB(Scan scan, ColumnInterpreter ci, boolean canFamilyBeAbsent) + throws IOException { + validateParameters(scan, canFamilyBeAbsent); + final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder(); + requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); + P columnInterpreterSpecificData = null; + if ((columnInterpreterSpecificData = ci.getRequestData()) != null) { + requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); + } + requestBuilder.setScan(ProtobufUtil.toScan(scan)); + return requestBuilder.build(); + } + + /** + * Get an instance of the argument type declared in a class's signature. The argument type is + * assumed to be a PB Message subclass, and the instance is created using parseFrom method on the + * passed ByteString. + * @param runtimeClass the runtime type of the class + * @param position the position of the argument in the class declaration + * @param b the ByteString which should be parsed to get the instance created + * @return the instance + * @throws IOException + */ + @SuppressWarnings("unchecked") + // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO. + public static T getParsedGenericInstance(Class runtimeClass, int position, + ByteString b) throws IOException { + Type type = runtimeClass.getGenericSuperclass(); + Type argType = ((ParameterizedType) type).getActualTypeArguments()[position]; + Class classType = (Class) argType; + T inst; + try { + Method m = classType.getMethod("parseFrom", ByteString.class); + inst = (T) m.invoke(null, b); + return inst; + } catch (SecurityException e) { + throw new IOException(e); + } catch (NoSuchMethodException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (InvocationTargetException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } +} diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java new file mode 100644 index 00000000000..f8d0a1982e5 --- /dev/null +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java @@ -0,0 +1,464 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.coprocessor; + +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB; + +import com.google.protobuf.Message; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.RawAsyncTable; +import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback; +import org.apache.hadoop.hbase.client.RawScanResultConsumer; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * This client class is for invoking the aggregate functions deployed on the Region Server side via + * the AggregateService. This class will implement the supporting functionality for + * summing/processing the individual results obtained from the AggregateService for each region. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class AsyncAggregationClient { + + private static abstract class AbstractAggregationCallback + implements CoprocessorCallback { + + private final CompletableFuture future; + + protected boolean finished = false; + + private void completeExceptionally(Throwable error) { + if (finished) { + return; + } + finished = true; + future.completeExceptionally(error); + } + + protected AbstractAggregationCallback(CompletableFuture future) { + this.future = future; + } + + @Override + public synchronized void onRegionError(HRegionInfo region, Throwable error) { + completeExceptionally(error); + } + + @Override + public synchronized void onError(Throwable error) { + completeExceptionally(error); + } + + protected abstract void aggregate(HRegionInfo region, AggregateResponse resp) + throws IOException; + + @Override + public synchronized void onRegionComplete(HRegionInfo region, AggregateResponse resp) { + try { + aggregate(region, resp); + } catch (IOException e) { + completeExceptionally(e); + } + } + + protected abstract T getFinalResult(); + + @Override + public synchronized void onComplete() { + if (finished) { + return; + } + finished = true; + future.complete(getFinalResult()); + } + } + + private static R + getCellValueFromProto(ColumnInterpreter ci, AggregateResponse resp, + int firstPartIndex) throws IOException { + Q q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(firstPartIndex)); + return ci.getCellValueFromProto(q); + } + + private static S + getPromotedValueFromProto(ColumnInterpreter ci, AggregateResponse resp, + int firstPartIndex) throws IOException { + T t = getParsedGenericInstance(ci.getClass(), 4, resp.getFirstPart(firstPartIndex)); + return ci.getPromotedValueFromProto(t); + } + + public static CompletableFuture + max(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture future = new CompletableFuture<>(); + AggregateRequest req; + try { + req = validateArgAndGetPB(scan, ci, false); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { + + private R max; + + @Override + protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException { + if (resp.getFirstPartCount() > 0) { + R result = getCellValueFromProto(ci, resp, 0); + if (max == null || (result != null && ci.compare(max, result) < 0)) { + max = result; + } + } + } + + @Override + protected R getFinalResult() { + return max; + } + }; + table.coprocessorService(channel -> AggregateService.newStub(channel), + (stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), + scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), + callback); + return future; + } + + public static CompletableFuture + min(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture future = new CompletableFuture<>(); + AggregateRequest req; + try { + req = validateArgAndGetPB(scan, ci, false); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { + + private R min; + + @Override + protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException { + if (resp.getFirstPartCount() > 0) { + R result = getCellValueFromProto(ci, resp, 0); + if (min == null || (result != null && ci.compare(min, result) > 0)) { + min = result; + } + } + } + + @Override + protected R getFinalResult() { + return min; + } + }; + table.coprocessorService(channel -> AggregateService.newStub(channel), + (stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), + scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), + callback); + return future; + } + + public static + CompletableFuture + rowCount(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture future = new CompletableFuture<>(); + AggregateRequest req; + try { + req = validateArgAndGetPB(scan, ci, true); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { + + private long count; + + @Override + protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException { + count += resp.getFirstPart(0).asReadOnlyByteBuffer().getLong(); + } + + @Override + protected Long getFinalResult() { + return count; + } + }; + table.coprocessorService(channel -> AggregateService.newStub(channel), + (stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), + scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), + callback); + return future; + } + + public static CompletableFuture + sum(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture future = new CompletableFuture<>(); + AggregateRequest req; + try { + req = validateArgAndGetPB(scan, ci, false); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { + + private S sum; + + @Override + protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException { + if (resp.getFirstPartCount() > 0) { + S s = getPromotedValueFromProto(ci, resp, 0); + sum = ci.add(sum, s); + } + } + + @Override + protected S getFinalResult() { + return sum; + } + }; + table.coprocessorService(channel -> AggregateService.newStub(channel), + (stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), + scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), + callback); + return future; + } + + public static + CompletableFuture + avg(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture future = new CompletableFuture<>(); + AggregateRequest req; + try { + req = validateArgAndGetPB(scan, ci, false); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { + + private S sum; + + long count = 0L; + + @Override + protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException { + if (resp.getFirstPartCount() > 0) { + sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0)); + count += resp.getSecondPart().asReadOnlyByteBuffer().getLong(); + } + } + + @Override + protected Double getFinalResult() { + return ci.divideForAvg(sum, count); + } + }; + table.coprocessorService(channel -> AggregateService.newStub(channel), + (stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), + scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), + callback); + return future; + } + + public static + CompletableFuture + std(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture future = new CompletableFuture<>(); + AggregateRequest req; + try { + req = validateArgAndGetPB(scan, ci, false); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + AbstractAggregationCallback callback = new AbstractAggregationCallback(future) { + + private S sum; + + private S sumSq; + + private long count; + + @Override + protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException { + if (resp.getFirstPartCount() > 0) { + sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0)); + sumSq = ci.add(sumSq, getPromotedValueFromProto(ci, resp, 1)); + count += resp.getSecondPart().asReadOnlyByteBuffer().getLong(); + } + } + + @Override + protected Double getFinalResult() { + double avg = ci.divideForAvg(sum, count); + double avgSq = ci.divideForAvg(sumSq, count); + return Math.sqrt(avgSq - avg * avg); + } + }; + table.coprocessorService(channel -> AggregateService.newStub(channel), + (stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), + scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), + callback); + return future; + } + + // the map key is the startRow of the region + private static + CompletableFuture> + sumByRegion(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture> future = + new CompletableFuture>(); + AggregateRequest req; + try { + req = validateArgAndGetPB(scan, ci, false); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + int firstPartIndex = scan.getFamilyMap().get(scan.getFamilies()[0]).size() - 1; + AbstractAggregationCallback> callback = + new AbstractAggregationCallback>(future) { + + private final NavigableMap map = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + @Override + protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException { + if (resp.getFirstPartCount() > 0) { + map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex)); + } + } + + @Override + protected NavigableMap getFinalResult() { + return map; + } + }; + table.coprocessorService(channel -> AggregateService.newStub(channel), + (stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), + scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), + callback); + return future; + } + + private static void findMedian( + CompletableFuture future, RawAsyncTable table, ColumnInterpreter ci, + Scan scan, NavigableMap sumByRegion) { + double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L); + S movingSum = null; + byte[] startRow = null; + for (Map.Entry entry : sumByRegion.entrySet()) { + startRow = entry.getKey(); + S newMovingSum = ci.add(movingSum, entry.getValue()); + if (ci.divideForAvg(newMovingSum, 1L) > halfSum) { + break; + } + movingSum = newMovingSum; + } + if (startRow != null) { + scan.withStartRow(startRow); + } + // we can not pass movingSum directly to an anonymous class as it is not final. + S baseSum = movingSum; + byte[] family = scan.getFamilies()[0]; + NavigableSet qualifiers = scan.getFamilyMap().get(family); + byte[] weightQualifier = qualifiers.last(); + byte[] valueQualifier = qualifiers.first(); + table.scan(scan, new RawScanResultConsumer() { + + private S sum = baseSum; + + private R value = null; + + @Override + public boolean onNext(Result[] results) { + try { + for (Result result : results) { + Cell weightCell = result.getColumnLatestCell(family, weightQualifier); + R weight = ci.getValue(family, weightQualifier, weightCell); + sum = ci.add(sum, ci.castToReturnType(weight)); + if (ci.divideForAvg(sum, 1L) > halfSum) { + if (value != null) { + future.complete(value); + } else { + future.completeExceptionally(new NoSuchElementException()); + } + return false; + } + Cell valueCell = result.getColumnLatestCell(family, valueQualifier); + value = ci.getValue(family, valueQualifier, valueCell); + } + return true; + } catch (IOException e) { + future.completeExceptionally(e); + return false; + } + } + + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + if (!future.isDone()) { + // we should not reach here as the future should be completed in onNext. + future.completeExceptionally(new NoSuchElementException()); + } + } + }); + } + + public static CompletableFuture + median(RawAsyncTable table, ColumnInterpreter ci, Scan scan) { + CompletableFuture future = new CompletableFuture<>(); + sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else if (sumByRegion.isEmpty()) { + future.completeExceptionally(new NoSuchElementException()); + } else { + findMedian(future, table, ci, ReflectionUtils.newInstance(scan.getClass(), scan), + sumByRegion); + } + }); + return future; + } +} diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java index 08b05628495..bccb76a7201 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java @@ -18,6 +18,14 @@ */ package org.apache.hadoop.hbase.coprocessor; +import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -31,7 +39,6 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -40,12 +47,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRespo import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import com.google.protobuf.ByteString; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - /** * A concrete AggregateProtocol implementation. Its system level coprocessor * that computes the aggregate function at a region level. @@ -485,7 +486,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { ColumnInterpreter ci = (ColumnInterpreter) cls.newInstance(); if (request.hasInterpreterSpecificBytes()) { ByteString b = request.getInterpreterSpecificBytes(); - P initMsg = AggregationClient.getParsedGenericInstance(ci.getClass(), 2, b); + P initMsg = getParsedGenericInstance(ci.getClass(), 2, b); ci.initialize(initMsg); } return ci; diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java new file mode 100644 index 00000000000..1274dd5205b --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAggregationClient.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.coprocessor.AsyncAggregationClient; +import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; +import org.apache.hadoop.hbase.coprocessor.AggregateImplementation; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, CoprocessorTests.class }) +public class TestAsyncAggregationClient { + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("TestAsyncAggregationClient"); + + private static byte[] CF = Bytes.toBytes("CF"); + + private static byte[] CQ = Bytes.toBytes("CQ"); + + private static byte[] CQ2 = Bytes.toBytes("CQ2"); + + private static int COUNT = 1000; + + private static AsyncConnection CONN; + + private static RawAsyncTable TABLE; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + AggregateImplementation.class.getName()); + UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + UTIL.createTable(TABLE_NAME, CF, splitKeys); + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()); + TABLE = CONN.getRawTable(TABLE_NAME); + TABLE.putAll(LongStream.range(0, COUNT) + .mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l))) + .addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, Bytes.toBytes(l * l))) + .collect(Collectors.toList())).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + UTIL.shutdownMiniCluster(); + } + + @Test + public void testMax() throws InterruptedException, ExecutionException { + assertEquals(COUNT - 1, AsyncAggregationClient + .max(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue()); + } + + @Test + public void testMin() throws InterruptedException, ExecutionException { + assertEquals(0, AsyncAggregationClient + .min(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue()); + } + + @Test + public void testRowCount() throws InterruptedException, ExecutionException { + assertEquals(COUNT, + AsyncAggregationClient + .rowCount(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get() + .longValue()); + } + + @Test + public void testSum() throws InterruptedException, ExecutionException { + assertEquals(COUNT * (COUNT - 1) / 2, AsyncAggregationClient + .sum(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue()); + } + + private static final double DELTA = 1E-3; + + @Test + public void testAvg() throws InterruptedException, ExecutionException { + assertEquals((COUNT - 1) / 2.0, AsyncAggregationClient + .avg(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().doubleValue(), + DELTA); + } + + @Test + public void testStd() throws InterruptedException, ExecutionException { + double avgSq = + LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() + / (double) COUNT; + double avg = (COUNT - 1) / 2.0; + double std = Math.sqrt(avgSq - avg * avg); + assertEquals(std, AsyncAggregationClient + .std(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().doubleValue(), + DELTA); + } + + @Test + public void testMedian() throws InterruptedException, ExecutionException { + long halfSum = COUNT * (COUNT - 1) / 4; + long median = 0L; + long sum = 0L; + for (int i = 0; i < COUNT; i++) { + sum += i; + if (sum > halfSum) { + median = i - 1; + break; + } + } + assertEquals(median, + AsyncAggregationClient + .median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get() + .longValue()); + } + + @Test + public void testMedianWithWeight() throws InterruptedException, ExecutionException { + long halfSum = + LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() / 2; + long median = 0L; + long sum = 0L; + for (int i = 0; i < COUNT; i++) { + sum += i * i; + if (sum > halfSum) { + median = i - 1; + break; + } + } + assertEquals(median, AsyncAggregationClient + .median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ).addColumn(CF, CQ2)) + .get().longValue()); + } +}