HBASE-17346 Add coprocessor service support

This commit is contained in:
zhangduo 2017-01-30 16:23:04 +08:00
parent 2511cc8278
commit b7fc7bf246
9 changed files with 1233 additions and 87 deletions

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Client side rpc controller for coprocessor implementation. It is only used to pass error.
*/
@InterfaceAudience.Private
public class ClientCoprocessorRpcController implements RpcController {
private Throwable error;
@Override
public void reset() {
}
@Override
public boolean failed() {
return error != null;
}
@Override
public String errorText() {
return error != null ? error.getMessage() : null;
}
@Override
public void startCancel() {
throw new UnsupportedOperationException();
}
@Override
public void setFailed(String reason) {
throw new UnsupportedOperationException();
}
@Override
public boolean isCanceled() {
return false;
}
@Override
public void notifyOnCancel(RpcCallback<Object> callback) {
throw new UnsupportedOperationException();
}
public void setFailed(Throwable error) {
this.error = error;
}
public Throwable getFailed() {
return error;
}
}

View File

@ -17,6 +17,14 @@
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -60,4 +68,185 @@ public interface RawAsyncTable extends AsyncTableBase {
* @param consumer the consumer used to receive results.
*/
void scan(Scan scan, RawScanResultConsumer consumer);
/**
* Delegate to a protobuf rpc call.
* <p>
* Usually, it is just a simple lambda expression, like:
*
* <pre>
* <code>
* (stub, controller, rpcCallback) -> {
* XXXRequest request = ...; // prepare the request
* stub.xxx(controller, request, rpcCallback);
* }
* </code>
* </pre>
*
* And if you can prepare the {@code request} before calling the coprocessorService method, the
* lambda expression will be:
*
* <pre>
* <code>
* (stub, controller, rpcCallback) -> stub.xxx(controller, request, rpcCallback)
* </code>
* </pre>
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
@FunctionalInterface
interface CoprocessorCallable<S, R> {
/**
* Represent the actual protobuf rpc call.
* @param stub the asynchronous stub
* @param controller the rpc controller, has already been prepared for you
* @param rpcCallback the rpc callback, has already been prepared for you
*/
void call(S stub, RpcController controller, RpcCallback<R> rpcCallback);
}
/**
* Execute the given coprocessor call on the region which contains the given {@code row}.
* <p>
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
* one line lambda expression, like:
*
* <pre>
* <code>
* channel -> xxxService.newStub(channel)
* </code>
* </pre>
*
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link CoprocessorCallable} for more details.
* @param row The row key used to identify the remote region location
* @param <S> the type of the asynchronous stub
* @param <R> the type of the return value
* @return the return value of the protobuf rpc call, wrapped by a {@link CompletableFuture}.
* @see CoprocessorCallable
*/
<S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, byte[] row);
/**
* The callback when we want to execute a coprocessor call on a range of regions.
* <p>
* As the locating itself also takes some time, the implementation may want to send rpc calls on
* the fly, which means we do not know how many regions we have when we get the return value of
* the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
* passed all the return values to you(through the {@link #onRegionComplete(HRegionInfo, Object)}
* or {@link #onRegionError(HRegionInfo, Throwable)} calls), i.e, there will be no
* {@link #onRegionComplete(HRegionInfo, Object)} or
* {@link #onRegionError(HRegionInfo, Throwable)} calls in the future.
* <p>
* Here is a pseudo code to describe a typical implementation of a range coprocessor service
* method to help you better understand how the {@link CoprocessorCallback} will be called. The
* {@code callback} in the pseudo code is our {@link CoprocessorCallback}. And notice that the
* {@code whenComplete} is {@code CompletableFuture.whenComplete}.
*
* <pre>
* locateThenCall(byte[] row) {
* locate(row).whenComplete((location, locateError) -> {
* if (locateError != null) {
* callback.onError(locateError);
* return;
* }
* incPendingCall();
* region = location.getRegion();
* if (region.getEndKey() > endKey) {
* locateEnd = true;
* } else {
* locateThenCall(region.getEndKey());
* }
* sendCall().whenComplete((resp, error) -> {
* if (error != null) {
* callback.onRegionError(region, error);
* } else {
* callback.onRegionComplete(region, resp);
* }
* if (locateEnd && decPendingCallAndGet() == 0) {
* callback.onComplete();
* }
* });
* });
* }
* </pre>
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
interface CoprocessorCallback<R> {
/**
* @param region the region that the response belongs to
* @param resp the response of the coprocessor call
*/
void onRegionComplete(HRegionInfo region, R resp);
/**
* @param region the region that the error belongs to
* @param error the response error of the coprocessor call
*/
void onRegionError(HRegionInfo region, Throwable error);
/**
* Indicate that all responses of the regions have been notified by calling
* {@link #onRegionComplete(HRegionInfo, Object)} or
* {@link #onRegionError(HRegionInfo, Throwable)}.
*/
void onComplete();
/**
* Indicate that we got an error which does not belong to any regions. Usually a locating error.
*/
void onError(Throwable error);
}
/**
* Execute the given coprocessor call on the regions which are covered by the range from
* {@code startKey} inclusive and {@code endKey} exclusive. See the comment of
* {@link #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, CoprocessorCallback)}
* for more details.
* @see #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean,
* CoprocessorCallback)
*/
default <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, byte[] startKey, byte[] endKey,
CoprocessorCallback<R> callback) {
coprocessorService(stubMaker, callable, startKey, true, endKey, false, callback);
}
/**
* Execute the given coprocessor call on the regions which are covered by the range from
* {@code startKey} and {@code endKey}. The inclusive of boundaries are specified by
* {@code startKeyInclusive} and {@code endKeyInclusive}. The {@code stubMaker} is just a
* delegation to the {@code xxxService.newStub} call. Usually it is only a one line lambda
* expression, like:
*
* <pre>
* <code>
* channel -> xxxService.newStub(channel)
* </code>
* </pre>
*
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link CoprocessorCallable} for more details.
* @param startKey start region selection with region containing this row. If {@code null}, the
* selection will start with the first table region.
* @param startKeyInclusive whether to include the startKey
* @param endKey select regions up to and including the region containing this row. If
* {@code null}, selection will continue through the last table region.
* @param endKeyInclusive whether to include the endKey
* @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
* for more details.
* @param <S> the type of the asynchronous stub
* @param <R> the type of the return value
* @see CoprocessorCallable
* @see CoprocessorCallback
*/
<S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
boolean endKeyInclusive, CoprocessorCallback<R> callback);
}

View File

@ -18,17 +18,26 @@
package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import com.google.protobuf.RpcChannel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -436,4 +445,90 @@ class RawAsyncTableImpl implements RawAsyncTable {
return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
}
private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, HRegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
region, row, rpcTimeoutNs, operationTimeoutNs);
S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>();
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
callable.call(stub, controller, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
});
return future;
}
@Override
public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, byte[] row) {
return coprocessorService(stubMaker, callable, null, row);
}
private boolean locateFinished(HRegionInfo region, byte[] endKey, boolean endKeyInclusive) {
if (isEmptyStopRow(endKey)) {
if (isEmptyStopRow(region.getEndKey())) {
return true;
}
return false;
} else {
if (isEmptyStopRow(region.getEndKey())) {
return true;
}
int c = Bytes.compareTo(endKey, region.getEndKey());
// 1. if the region contains endKey
// 2. endKey is equal to the region's endKey and we do not want to include endKey.
return c < 0 || c == 0 && !endKeyInclusive;
}
}
private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback,
List<HRegionLocation> locs, byte[] endKey, boolean endKeyInclusive,
AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, HRegionLocation loc,
Throwable error) {
if (error != null) {
callback.onError(error);
return;
}
unfinishedRequest.incrementAndGet();
HRegionInfo region = loc.getRegionInfo();
if (locateFinished(region, endKey, endKeyInclusive)) {
locateFinished.set(true);
} else {
conn.getLocator()
.getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
operationTimeoutNs)
.whenComplete((l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey,
endKeyInclusive, locateFinished, unfinishedRequest, l, e));
}
coprocessorService(stubMaker, callable, region, region.getStartKey()).whenComplete((r, e) -> {
if (e != null) {
callback.onRegionError(region, e);
} else {
callback.onRegionComplete(region, r);
}
if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
callback.onComplete();
}
});
}
@Override
public <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
boolean endKeyInclusive, CoprocessorCallback<R> callback) {
byte[] nonNullStartKey = Optional.ofNullable(startKey).orElse(EMPTY_START_ROW);
byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW);
List<HRegionLocation> locs = new ArrayList<>();
conn.getLocator()
.getRegionLocation(tableName, nonNullStartKey,
startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
.whenComplete(
(loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey,
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.util.Bytes;
/**
* The implementation of a region based coprocessor rpc channel.
*/
@InterfaceAudience.Private
class RegionCoprocessorRpcChannelImpl implements RpcChannel {
private final AsyncConnectionImpl conn;
private final TableName tableName;
private final HRegionInfo region;
private final byte[] row;
private final long rpcTimeoutNs;
private final long operationTimeoutNs;
RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, HRegionInfo region,
byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
this.conn = conn;
this.tableName = tableName;
this.region = region;
this.row = row;
this.rpcTimeoutNs = rpcTimeoutNs;
this.operationTimeoutNs = operationTimeoutNs;
}
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
ClientService.Interface stub) {
CompletableFuture<Message> future = new CompletableFuture<>();
if (region != null
&& !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) {
future.completeExceptionally(new DoNotRetryIOException(
"Region name is changed, expected " + region.getRegionNameAsString() + ", actual "
+ loc.getRegionInfo().getRegionNameAsString()));
return future;
}
CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
request, row, loc.getRegionInfo().getRegionName());
stub.execService(controller, csr,
new org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {
@Override
public void run(CoprocessorServiceResponse resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
}
});
return future;
}
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
conn.callerFactory.<Message> single().table(tableName).row(row)
.locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call()
.whenComplete((r, e) -> {
if (e != null) {
((ClientCoprocessorRpcController) controller).setFailed(e);
}
done.run(r);
});
}
}

View File

@ -19,12 +19,16 @@
package org.apache.hadoop.hbase.client.coprocessor;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -49,18 +53,12 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
/**
* This client class is for invoking the aggregate functions deployed on the
* Region Server side via the AggregateService. This class will implement the
@ -227,23 +225,7 @@ public class AggregationClient implements Closeable {
return aMaxCallBack.getMax();
}
/*
* @param scan
* @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
*/
private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
if (scan == null
|| (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals(
scan.getStartRow(), HConstants.EMPTY_START_ROW))
|| ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals(
scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
} else if (!canFamilyBeAbsent) {
if (scan.getFamilyMap().size() != 1) {
throw new IOException("There must be only one family.");
}
}
}
/**
* It gives the minimum value of a column for a given column family for the
@ -846,22 +828,6 @@ public class AggregationClient implements Closeable {
return null;
}
<R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent)
throws IOException {
validateParameters(scan, canFamilyBeAbsent);
final AggregateRequest.Builder requestBuilder =
AggregateRequest.newBuilder();
requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
P columnInterpreterSpecificData = null;
if ((columnInterpreterSpecificData = ci.getRequestData())
!= null) {
requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
}
requestBuilder.setScan(ProtobufUtil.toScan(scan));
return requestBuilder.build();
}
byte[] getBytesFromResponse(ByteString response) {
ByteBuffer bb = response.asReadOnlyByteBuffer();
bb.rewind();
@ -873,40 +839,4 @@ public class AggregationClient implements Closeable {
}
return bytes;
}
/**
* Get an instance of the argument type declared in a class's signature. The
* argument type is assumed to be a PB Message subclass, and the instance is
* created using parseFrom method on the passed ByteString.
* @param runtimeClass the runtime type of the class
* @param position the position of the argument in the class declaration
* @param b the ByteString which should be parsed to get the instance created
* @return the instance
* @throws IOException
*/
@SuppressWarnings("unchecked")
// Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
public static <T extends Message>
T getParsedGenericInstance(Class<?> runtimeClass, int position, ByteString b)
throws IOException {
Type type = runtimeClass.getGenericSuperclass();
Type argType = ((ParameterizedType)type).getActualTypeArguments()[position];
Class<T> classType = (Class<T>)argType;
T inst;
try {
Method m = classType.getMethod("parseFrom", ByteString.class);
inst = (T)m.invoke(null, b);
return inst;
} catch (SecurityException e) {
throw new IOException(e);
} catch (NoSuchMethodException e) {
throw new IOException(e);
} catch (IllegalArgumentException e) {
throw new IOException(e);
} catch (InvocationTargetException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.coprocessor;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Helper class for constructing aggregation request and response.
*/
@InterfaceAudience.Private
public class AggregationHelper {
/**
* @param scan
* @param canFamilyBeAbsent whether column family can be absent in familyMap of scan
*/
private static void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException {
if (scan == null
|| (Bytes.equals(scan.getStartRow(), scan.getStopRow())
&& !Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW))
|| ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0)
&& !Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow");
} else if (!canFamilyBeAbsent) {
if (scan.getFamilyMap().size() != 1) {
throw new IOException("There must be only one family.");
}
}
}
static <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest
validateArgAndGetPB(Scan scan, ColumnInterpreter<R, S, P, Q, T> ci, boolean canFamilyBeAbsent)
throws IOException {
validateParameters(scan, canFamilyBeAbsent);
final AggregateRequest.Builder requestBuilder = AggregateRequest.newBuilder();
requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
P columnInterpreterSpecificData = null;
if ((columnInterpreterSpecificData = ci.getRequestData()) != null) {
requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
}
requestBuilder.setScan(ProtobufUtil.toScan(scan));
return requestBuilder.build();
}
/**
* Get an instance of the argument type declared in a class's signature. The argument type is
* assumed to be a PB Message subclass, and the instance is created using parseFrom method on the
* passed ByteString.
* @param runtimeClass the runtime type of the class
* @param position the position of the argument in the class declaration
* @param b the ByteString which should be parsed to get the instance created
* @return the instance
* @throws IOException
*/
@SuppressWarnings("unchecked")
// Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO.
public static <T extends Message> T getParsedGenericInstance(Class<?> runtimeClass, int position,
ByteString b) throws IOException {
Type type = runtimeClass.getGenericSuperclass();
Type argType = ((ParameterizedType) type).getActualTypeArguments()[position];
Class<T> classType = (Class<T>) argType;
T inst;
try {
Method m = classType.getMethod("parseFrom", ByteString.class);
inst = (T) m.invoke(null, b);
return inst;
} catch (SecurityException e) {
throw new IOException(e);
} catch (NoSuchMethodException e) {
throw new IOException(e);
} catch (IllegalArgumentException e) {
throw new IOException(e);
} catch (InvocationTargetException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
}

View File

@ -0,0 +1,464 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.coprocessor;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.RawAsyncTable;
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
import org.apache.hadoop.hbase.client.RawScanResultConsumer;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* This client class is for invoking the aggregate functions deployed on the Region Server side via
* the AggregateService. This class will implement the supporting functionality for
* summing/processing the individual results obtained from the AggregateService for each region.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class AsyncAggregationClient {
private static abstract class AbstractAggregationCallback<T>
implements CoprocessorCallback<AggregateResponse> {
private final CompletableFuture<T> future;
protected boolean finished = false;
private void completeExceptionally(Throwable error) {
if (finished) {
return;
}
finished = true;
future.completeExceptionally(error);
}
protected AbstractAggregationCallback(CompletableFuture<T> future) {
this.future = future;
}
@Override
public synchronized void onRegionError(HRegionInfo region, Throwable error) {
completeExceptionally(error);
}
@Override
public synchronized void onError(Throwable error) {
completeExceptionally(error);
}
protected abstract void aggregate(HRegionInfo region, AggregateResponse resp)
throws IOException;
@Override
public synchronized void onRegionComplete(HRegionInfo region, AggregateResponse resp) {
try {
aggregate(region, resp);
} catch (IOException e) {
completeExceptionally(e);
}
}
protected abstract T getFinalResult();
@Override
public synchronized void onComplete() {
if (finished) {
return;
}
finished = true;
future.complete(getFinalResult());
}
}
private static <R, S, P extends Message, Q extends Message, T extends Message> R
getCellValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp,
int firstPartIndex) throws IOException {
Q q = getParsedGenericInstance(ci.getClass(), 3, resp.getFirstPart(firstPartIndex));
return ci.getCellValueFromProto(q);
}
private static <R, S, P extends Message, Q extends Message, T extends Message> S
getPromotedValueFromProto(ColumnInterpreter<R, S, P, Q, T> ci, AggregateResponse resp,
int firstPartIndex) throws IOException {
T t = getParsedGenericInstance(ci.getClass(), 4, resp.getFirstPart(firstPartIndex));
return ci.getPromotedValueFromProto(t);
}
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
AggregateRequest req;
try {
req = validateArgAndGetPB(scan, ci, false);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) {
private R max;
@Override
protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
if (resp.getFirstPartCount() > 0) {
R result = getCellValueFromProto(ci, resp, 0);
if (max == null || (result != null && ci.compare(max, result) < 0)) {
max = result;
}
}
}
@Override
protected R getFinalResult() {
return max;
}
};
table.coprocessorService(channel -> AggregateService.newStub(channel),
(stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback),
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
callback);
return future;
}
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
min(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
AggregateRequest req;
try {
req = validateArgAndGetPB(scan, ci, false);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
AbstractAggregationCallback<R> callback = new AbstractAggregationCallback<R>(future) {
private R min;
@Override
protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
if (resp.getFirstPartCount() > 0) {
R result = getCellValueFromProto(ci, resp, 0);
if (min == null || (result != null && ci.compare(min, result) > 0)) {
min = result;
}
}
}
@Override
protected R getFinalResult() {
return min;
}
};
table.coprocessorService(channel -> AggregateService.newStub(channel),
(stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback),
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
callback);
return future;
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Long>
rowCount(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Long> future = new CompletableFuture<>();
AggregateRequest req;
try {
req = validateArgAndGetPB(scan, ci, true);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
AbstractAggregationCallback<Long> callback = new AbstractAggregationCallback<Long>(future) {
private long count;
@Override
protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
count += resp.getFirstPart(0).asReadOnlyByteBuffer().getLong();
}
@Override
protected Long getFinalResult() {
return count;
}
};
table.coprocessorService(channel -> AggregateService.newStub(channel),
(stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback),
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
callback);
return future;
}
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<S>
sum(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<S> future = new CompletableFuture<>();
AggregateRequest req;
try {
req = validateArgAndGetPB(scan, ci, false);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
AbstractAggregationCallback<S> callback = new AbstractAggregationCallback<S>(future) {
private S sum;
@Override
protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
if (resp.getFirstPartCount() > 0) {
S s = getPromotedValueFromProto(ci, resp, 0);
sum = ci.add(sum, s);
}
}
@Override
protected S getFinalResult() {
return sum;
}
};
table.coprocessorService(channel -> AggregateService.newStub(channel),
(stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback),
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
callback);
return future;
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Double>
avg(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Double> future = new CompletableFuture<>();
AggregateRequest req;
try {
req = validateArgAndGetPB(scan, ci, false);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) {
private S sum;
long count = 0L;
@Override
protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
if (resp.getFirstPartCount() > 0) {
sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0));
count += resp.getSecondPart().asReadOnlyByteBuffer().getLong();
}
}
@Override
protected Double getFinalResult() {
return ci.divideForAvg(sum, count);
}
};
table.coprocessorService(channel -> AggregateService.newStub(channel),
(stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback),
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
callback);
return future;
}
public static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<Double>
std(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<Double> future = new CompletableFuture<>();
AggregateRequest req;
try {
req = validateArgAndGetPB(scan, ci, false);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
AbstractAggregationCallback<Double> callback = new AbstractAggregationCallback<Double>(future) {
private S sum;
private S sumSq;
private long count;
@Override
protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
if (resp.getFirstPartCount() > 0) {
sum = ci.add(sum, getPromotedValueFromProto(ci, resp, 0));
sumSq = ci.add(sumSq, getPromotedValueFromProto(ci, resp, 1));
count += resp.getSecondPart().asReadOnlyByteBuffer().getLong();
}
}
@Override
protected Double getFinalResult() {
double avg = ci.divideForAvg(sum, count);
double avgSq = ci.divideForAvg(sumSq, count);
return Math.sqrt(avgSq - avg * avg);
}
};
table.coprocessorService(channel -> AggregateService.newStub(channel),
(stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback),
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
callback);
return future;
}
// the map key is the startRow of the region
private static <R, S, P extends Message, Q extends Message, T extends Message>
CompletableFuture<NavigableMap<byte[], S>>
sumByRegion(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<NavigableMap<byte[], S>> future =
new CompletableFuture<NavigableMap<byte[], S>>();
AggregateRequest req;
try {
req = validateArgAndGetPB(scan, ci, false);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
int firstPartIndex = scan.getFamilyMap().get(scan.getFamilies()[0]).size() - 1;
AbstractAggregationCallback<NavigableMap<byte[], S>> callback =
new AbstractAggregationCallback<NavigableMap<byte[], S>>(future) {
private final NavigableMap<byte[], S> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@Override
protected void aggregate(HRegionInfo region, AggregateResponse resp) throws IOException {
if (resp.getFirstPartCount() > 0) {
map.put(region.getStartKey(), getPromotedValueFromProto(ci, resp, firstPartIndex));
}
}
@Override
protected NavigableMap<byte[], S> getFinalResult() {
return map;
}
};
table.coprocessorService(channel -> AggregateService.newStub(channel),
(stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback),
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
callback);
return future;
}
private static <R, S, P extends Message, Q extends Message, T extends Message> void findMedian(
CompletableFuture<R> future, RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci,
Scan scan, NavigableMap<byte[], S> sumByRegion) {
double halfSum = ci.divideForAvg(sumByRegion.values().stream().reduce(ci::add).get(), 2L);
S movingSum = null;
byte[] startRow = null;
for (Map.Entry<byte[], S> entry : sumByRegion.entrySet()) {
startRow = entry.getKey();
S newMovingSum = ci.add(movingSum, entry.getValue());
if (ci.divideForAvg(newMovingSum, 1L) > halfSum) {
break;
}
movingSum = newMovingSum;
}
if (startRow != null) {
scan.withStartRow(startRow);
}
// we can not pass movingSum directly to an anonymous class as it is not final.
S baseSum = movingSum;
byte[] family = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
byte[] weightQualifier = qualifiers.last();
byte[] valueQualifier = qualifiers.first();
table.scan(scan, new RawScanResultConsumer() {
private S sum = baseSum;
private R value = null;
@Override
public boolean onNext(Result[] results) {
try {
for (Result result : results) {
Cell weightCell = result.getColumnLatestCell(family, weightQualifier);
R weight = ci.getValue(family, weightQualifier, weightCell);
sum = ci.add(sum, ci.castToReturnType(weight));
if (ci.divideForAvg(sum, 1L) > halfSum) {
if (value != null) {
future.complete(value);
} else {
future.completeExceptionally(new NoSuchElementException());
}
return false;
}
Cell valueCell = result.getColumnLatestCell(family, valueQualifier);
value = ci.getValue(family, valueQualifier, valueCell);
}
return true;
} catch (IOException e) {
future.completeExceptionally(e);
return false;
}
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onComplete() {
if (!future.isDone()) {
// we should not reach here as the future should be completed in onNext.
future.completeExceptionally(new NoSuchElementException());
}
}
});
}
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
median(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
CompletableFuture<R> future = new CompletableFuture<>();
sumByRegion(table, ci, scan).whenComplete((sumByRegion, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (sumByRegion.isEmpty()) {
future.completeExceptionally(new NoSuchElementException());
} else {
findMedian(future, table, ci, ReflectionUtils.newInstance(scan.getClass(), scan),
sumByRegion);
}
});
return future;
}
}

View File

@ -18,6 +18,14 @@
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -31,7 +39,6 @@ import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -40,12 +47,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRespo
import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* A concrete AggregateProtocol implementation. Its system level coprocessor
* that computes the aggregate function at a region level.
@ -485,7 +486,7 @@ extends AggregateService implements CoprocessorService, Coprocessor {
ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
if (request.hasInterpreterSpecificBytes()) {
ByteString b = request.getInterpreterSpecificBytes();
P initMsg = AggregationClient.getParsedGenericInstance(ci.getClass(), 2, b);
P initMsg = getParsedGenericInstance(ci.getClass(), 2, b);
ci.initialize(initMsg);
}
return ci;

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.AsyncAggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.AggregateImplementation;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, CoprocessorTests.class })
public class TestAsyncAggregationClient {
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("TestAsyncAggregationClient");
private static byte[] CF = Bytes.toBytes("CF");
private static byte[] CQ = Bytes.toBytes("CQ");
private static byte[] CQ2 = Bytes.toBytes("CQ2");
private static int COUNT = 1000;
private static AsyncConnection CONN;
private static RawAsyncTable TABLE;
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
AggregateImplementation.class.getName());
UTIL.startMiniCluster(3);
byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
}
UTIL.createTable(TABLE_NAME, CF, splitKeys);
CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration());
TABLE = CONN.getRawTable(TABLE_NAME);
TABLE.putAll(LongStream.range(0, COUNT)
.mapToObj(l -> new Put(Bytes.toBytes(String.format("%03d", l)))
.addColumn(CF, CQ, Bytes.toBytes(l)).addColumn(CF, CQ2, Bytes.toBytes(l * l)))
.collect(Collectors.toList())).get();
}
@AfterClass
public static void tearDown() throws Exception {
CONN.close();
UTIL.shutdownMiniCluster();
}
@Test
public void testMax() throws InterruptedException, ExecutionException {
assertEquals(COUNT - 1, AsyncAggregationClient
.max(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue());
}
@Test
public void testMin() throws InterruptedException, ExecutionException {
assertEquals(0, AsyncAggregationClient
.min(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue());
}
@Test
public void testRowCount() throws InterruptedException, ExecutionException {
assertEquals(COUNT,
AsyncAggregationClient
.rowCount(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get()
.longValue());
}
@Test
public void testSum() throws InterruptedException, ExecutionException {
assertEquals(COUNT * (COUNT - 1) / 2, AsyncAggregationClient
.sum(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().longValue());
}
private static final double DELTA = 1E-3;
@Test
public void testAvg() throws InterruptedException, ExecutionException {
assertEquals((COUNT - 1) / 2.0, AsyncAggregationClient
.avg(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().doubleValue(),
DELTA);
}
@Test
public void testStd() throws InterruptedException, ExecutionException {
double avgSq =
LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong()
/ (double) COUNT;
double avg = (COUNT - 1) / 2.0;
double std = Math.sqrt(avgSq - avg * avg);
assertEquals(std, AsyncAggregationClient
.std(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get().doubleValue(),
DELTA);
}
@Test
public void testMedian() throws InterruptedException, ExecutionException {
long halfSum = COUNT * (COUNT - 1) / 4;
long median = 0L;
long sum = 0L;
for (int i = 0; i < COUNT; i++) {
sum += i;
if (sum > halfSum) {
median = i - 1;
break;
}
}
assertEquals(median,
AsyncAggregationClient
.median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ)).get()
.longValue());
}
@Test
public void testMedianWithWeight() throws InterruptedException, ExecutionException {
long halfSum =
LongStream.range(0, COUNT).map(l -> l * l).reduce((l1, l2) -> l1 + l2).getAsLong() / 2;
long median = 0L;
long sum = 0L;
for (int i = 0; i < COUNT; i++) {
sum += i * i;
if (sum > halfSum) {
median = i - 1;
break;
}
}
assertEquals(median, AsyncAggregationClient
.median(TABLE, new LongColumnInterpreter(), new Scan().addColumn(CF, CQ).addColumn(CF, CQ2))
.get().longValue());
}
}