HBASE-18972 Use Builder pattern to remove nullable parameters for coprocessor methods in RawAsyncTable interface
This commit is contained in:
parent
49abc2e1c2
commit
fad7d01d8f
|
@ -38,11 +38,6 @@ import com.google.protobuf.RpcController;
|
||||||
* <p>
|
* <p>
|
||||||
* So, only experts that want to build high performance service should use this interface directly,
|
* So, only experts that want to build high performance service should use this interface directly,
|
||||||
* especially for the {@link #scan(Scan, RawScanResultConsumer)} below.
|
* especially for the {@link #scan(Scan, RawScanResultConsumer)} below.
|
||||||
* <p>
|
|
||||||
* TODO: For now the only difference between this interface and {@link AsyncTable} is the scan
|
|
||||||
* method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat)
|
|
||||||
* so it is not suitable for a normal user. If it is still the only difference after we implement
|
|
||||||
* most features of AsyncTable, we can think about merge these two interfaces.
|
|
||||||
* @since 2.0.0
|
* @since 2.0.0
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
|
@ -135,8 +130,8 @@ public interface RawAsyncTable extends AsyncTableBase {
|
||||||
* the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
|
* the rpc calls, so we need an {@link #onComplete()} which is used to tell you that we have
|
||||||
* passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
|
* passed all the return values to you(through the {@link #onRegionComplete(RegionInfo, Object)}
|
||||||
* or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
|
* or {@link #onRegionError(RegionInfo, Throwable)} calls), i.e, there will be no
|
||||||
* {@link #onRegionComplete(RegionInfo, Object)} or
|
* {@link #onRegionComplete(RegionInfo, Object)} or {@link #onRegionError(RegionInfo, Throwable)}
|
||||||
* {@link #onRegionError(RegionInfo, Throwable)} calls in the future.
|
* calls in the future.
|
||||||
* <p>
|
* <p>
|
||||||
* Here is a pseudo code to describe a typical implementation of a range coprocessor service
|
* Here is a pseudo code to describe a typical implementation of a range coprocessor service
|
||||||
* method to help you better understand how the {@link CoprocessorCallback} will be called. The
|
* method to help you better understand how the {@link CoprocessorCallback} will be called. The
|
||||||
|
@ -200,25 +195,56 @@ public interface RawAsyncTable extends AsyncTableBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the given coprocessor call on the regions which are covered by the range from
|
* Helper class for sending coprocessorService request that executes a coprocessor call on regions
|
||||||
* {@code startKey} inclusive and {@code endKey} exclusive. See the comment of
|
* which are covered by a range.
|
||||||
* {@link #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean, CoprocessorCallback)}
|
* <p>
|
||||||
* for more details.
|
* If {@code fromRow} is not specified the selection will start with the first table region. If
|
||||||
* @see #coprocessorService(Function, CoprocessorCallable, byte[], boolean, byte[], boolean,
|
* {@code toRow} is not specified the selection will continue through the last table region.
|
||||||
* CoprocessorCallback)
|
* @param <S> the type of the protobuf Service you want to call.
|
||||||
|
* @param <R> the type of the return value.
|
||||||
*/
|
*/
|
||||||
default <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
|
interface CoprocessorServiceBuilder<S, R> {
|
||||||
CoprocessorCallable<S, R> callable, byte[] startKey, byte[] endKey,
|
|
||||||
CoprocessorCallback<R> callback) {
|
/**
|
||||||
coprocessorService(stubMaker, callable, startKey, true, endKey, false, callback);
|
* @param startKey start region selection with region containing this row, inclusive.
|
||||||
|
*/
|
||||||
|
default CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey) {
|
||||||
|
return fromRow(startKey, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param startKey start region selection with region containing this row
|
||||||
|
* @param inclusive whether to include the startKey
|
||||||
|
*/
|
||||||
|
CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param endKey select regions up to and including the region containing this row, exclusive.
|
||||||
|
*/
|
||||||
|
default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
|
||||||
|
return toRow(endKey, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param endKey select regions up to and including the region containing this row
|
||||||
|
* @param inclusive whether to include the endKey
|
||||||
|
*/
|
||||||
|
CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the coprocessorService request. You can get the response through the
|
||||||
|
* {@link CoprocessorCallback}.
|
||||||
|
*/
|
||||||
|
void execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute the given coprocessor call on the regions which are covered by the range from
|
* Execute a coprocessor call on the regions which are covered by a range.
|
||||||
* {@code startKey} and {@code endKey}. The inclusive of boundaries are specified by
|
* <p>
|
||||||
* {@code startKeyInclusive} and {@code endKeyInclusive}. The {@code stubMaker} is just a
|
* Use the returned {@link CoprocessorServiceBuilder} construct your request and then execute it.
|
||||||
* delegation to the {@code xxxService.newStub} call. Usually it is only a one line lambda
|
* <p>
|
||||||
* expression, like:
|
* The {@code stubMaker} is just a delegation to the {@code xxxService.newStub} call. Usually it
|
||||||
|
* is only a one line lambda expression, like:
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* <code>
|
* <code>
|
||||||
|
@ -229,20 +255,9 @@ public interface RawAsyncTable extends AsyncTableBase {
|
||||||
* @param stubMaker a delegation to the actual {@code newStub} call.
|
* @param stubMaker a delegation to the actual {@code newStub} call.
|
||||||
* @param callable a delegation to the actual protobuf rpc call. See the comment of
|
* @param callable a delegation to the actual protobuf rpc call. See the comment of
|
||||||
* {@link CoprocessorCallable} for more details.
|
* {@link CoprocessorCallable} for more details.
|
||||||
* @param startKey start region selection with region containing this row. If {@code null}, the
|
|
||||||
* selection will start with the first table region.
|
|
||||||
* @param startKeyInclusive whether to include the startKey
|
|
||||||
* @param endKey select regions up to and including the region containing this row. If
|
|
||||||
* {@code null}, selection will continue through the last table region.
|
|
||||||
* @param endKeyInclusive whether to include the endKey
|
|
||||||
* @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
|
* @param callback callback to get the response. See the comment of {@link CoprocessorCallback}
|
||||||
* for more details.
|
* for more details.
|
||||||
* @param <S> the type of the asynchronous stub
|
|
||||||
* @param <R> the type of the return value
|
|
||||||
* @see CoprocessorCallable
|
|
||||||
* @see CoprocessorCallback
|
|
||||||
*/
|
*/
|
||||||
<S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
|
<S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
|
||||||
CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
|
CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback);
|
||||||
boolean endKeyInclusive, CoprocessorCallback<R> callback);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import static java.util.stream.Collectors.toList;
|
import static java.util.stream.Collectors.toList;
|
||||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
|
|
||||||
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
|
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
|
||||||
|
|
||||||
|
@ -29,7 +27,6 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -38,6 +35,7 @@ import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CompareOperator;
|
import org.apache.hadoop.hbase.CompareOperator;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
|
||||||
|
@ -560,19 +558,64 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private final class CoprocessorServiceBuilderImpl<S, R>
|
||||||
public <S, R> void coprocessorService(Function<RpcChannel, S> stubMaker,
|
implements CoprocessorServiceBuilder<S, R> {
|
||||||
CoprocessorCallable<S, R> callable, byte[] startKey, boolean startKeyInclusive, byte[] endKey,
|
|
||||||
boolean endKeyInclusive, CoprocessorCallback<R> callback) {
|
private final Function<RpcChannel, S> stubMaker;
|
||||||
byte[] nonNullStartKey = Optional.ofNullable(startKey).orElse(EMPTY_START_ROW);
|
|
||||||
byte[] nonNullEndKey = Optional.ofNullable(endKey).orElse(EMPTY_END_ROW);
|
private final CoprocessorCallable<S, R> callable;
|
||||||
List<HRegionLocation> locs = new ArrayList<>();
|
|
||||||
conn.getLocator()
|
private final CoprocessorCallback<R> callback;
|
||||||
.getRegionLocation(tableName, nonNullStartKey,
|
|
||||||
startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
|
private byte[] startKey = HConstants.EMPTY_START_ROW;
|
||||||
.whenComplete(
|
|
||||||
(loc, error) -> onLocateComplete(stubMaker, callable, callback, locs, nonNullEndKey,
|
private boolean startKeyInclusive;
|
||||||
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
|
|
||||||
|
private byte[] endKey = HConstants.EMPTY_END_ROW;
|
||||||
|
|
||||||
|
private boolean endKeyInclusive;
|
||||||
|
|
||||||
|
public CoprocessorServiceBuilderImpl(Function<RpcChannel, S> stubMaker,
|
||||||
|
CoprocessorCallable<S, R> callable, CoprocessorCallback<R> callback) {
|
||||||
|
this.stubMaker = Preconditions.checkNotNull(stubMaker, "stubMaker is null");
|
||||||
|
this.callable = Preconditions.checkNotNull(callable, "callable is null");
|
||||||
|
this.callback = Preconditions.checkNotNull(callback, "callback is null");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CoprocessorServiceBuilderImpl<S, R> fromRow(byte[] startKey, boolean inclusive) {
|
||||||
|
this.startKey = Preconditions.checkNotNull(startKey,
|
||||||
|
"startKey is null. Consider using" +
|
||||||
|
" an empty byte array, or just do not call this method if you want to start selection" +
|
||||||
|
" from the first region");
|
||||||
|
this.startKeyInclusive = inclusive;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusive) {
|
||||||
|
this.endKey = Preconditions.checkNotNull(endKey,
|
||||||
|
"endKey is null. Consider using" +
|
||||||
|
" an empty byte array, or just do not call this method if you want to continue" +
|
||||||
|
" selection to the last region");
|
||||||
|
this.endKeyInclusive = inclusive;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute() {
|
||||||
|
conn.getLocator().getRegionLocation(tableName, startKey,
|
||||||
|
startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs)
|
||||||
|
.whenComplete(
|
||||||
|
(loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(),
|
||||||
|
endKey, endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
|
||||||
|
Function<RpcChannel, S> stubMaker, CoprocessorCallable<S, R> callable,
|
||||||
|
CoprocessorCallback<R> callback) {
|
||||||
|
return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.client.coprocessor;
|
||||||
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
|
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
|
||||||
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
|
import static org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.validateArgAndGetPB;
|
||||||
|
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
|
@ -29,6 +31,7 @@ import java.util.TreeMap;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.client.RawAsyncTable;
|
import org.apache.hadoop.hbase.client.RawAsyncTable;
|
||||||
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
|
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallback;
|
||||||
import org.apache.hadoop.hbase.client.RawScanResultConsumer;
|
import org.apache.hadoop.hbase.client.RawScanResultConsumer;
|
||||||
|
@ -43,8 +46,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import com.google.protobuf.Message;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This client class is for invoking the aggregate functions deployed on the Region Server side via
|
* This client class is for invoking the aggregate functions deployed on the Region Server side via
|
||||||
* the AggregateService. This class will implement the supporting functionality for
|
* the AggregateService. This class will implement the supporting functionality for
|
||||||
|
@ -120,6 +121,10 @@ public class AsyncAggregationClient {
|
||||||
return ci.getPromotedValueFromProto(t);
|
return ci.getPromotedValueFromProto(t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static byte[] nullToEmpty(byte[] b) {
|
||||||
|
return b != null ? b : HConstants.EMPTY_BYTE_ARRAY;
|
||||||
|
}
|
||||||
|
|
||||||
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
|
public static <R, S, P extends Message, Q extends Message, T extends Message> CompletableFuture<R>
|
||||||
max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
|
max(RawAsyncTable table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) {
|
||||||
CompletableFuture<R> future = new CompletableFuture<>();
|
CompletableFuture<R> future = new CompletableFuture<>();
|
||||||
|
@ -149,10 +154,11 @@ public class AsyncAggregationClient {
|
||||||
return max;
|
return max;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
table.coprocessorService(channel -> AggregateService.newStub(channel),
|
table
|
||||||
(stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback),
|
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
|
||||||
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
|
(stub, controller, rpcCallback) -> stub.getMax(controller, req, rpcCallback), callback)
|
||||||
callback);
|
.fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
|
||||||
|
.toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,10 +191,11 @@ public class AsyncAggregationClient {
|
||||||
return min;
|
return min;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
table.coprocessorService(channel -> AggregateService.newStub(channel),
|
table
|
||||||
(stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback),
|
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
|
||||||
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
|
(stub, controller, rpcCallback) -> stub.getMin(controller, req, rpcCallback), callback)
|
||||||
callback);
|
.fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
|
||||||
|
.toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,10 +224,11 @@ public class AsyncAggregationClient {
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
table.coprocessorService(channel -> AggregateService.newStub(channel),
|
table
|
||||||
(stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback),
|
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
|
||||||
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
|
(stub, controller, rpcCallback) -> stub.getRowNum(controller, req, rpcCallback), callback)
|
||||||
callback);
|
.fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
|
||||||
|
.toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -251,10 +259,11 @@ public class AsyncAggregationClient {
|
||||||
return sum;
|
return sum;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
table.coprocessorService(channel -> AggregateService.newStub(channel),
|
table
|
||||||
(stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback),
|
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
|
||||||
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
|
(stub, controller, rpcCallback) -> stub.getSum(controller, req, rpcCallback), callback)
|
||||||
callback);
|
.fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
|
||||||
|
.toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,10 +297,11 @@ public class AsyncAggregationClient {
|
||||||
return ci.divideForAvg(sum, count);
|
return ci.divideForAvg(sum, count);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
table.coprocessorService(channel -> AggregateService.newStub(channel),
|
table
|
||||||
(stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback),
|
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
|
||||||
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
|
(stub, controller, rpcCallback) -> stub.getAvg(controller, req, rpcCallback), callback)
|
||||||
callback);
|
.fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
|
||||||
|
.toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,10 +340,11 @@ public class AsyncAggregationClient {
|
||||||
return Math.sqrt(avgSq - avg * avg);
|
return Math.sqrt(avgSq - avg * avg);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
table.coprocessorService(channel -> AggregateService.newStub(channel),
|
table
|
||||||
(stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback),
|
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
|
||||||
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
|
(stub, controller, rpcCallback) -> stub.getStd(controller, req, rpcCallback), callback)
|
||||||
callback);
|
.fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
|
||||||
|
.toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -368,10 +379,11 @@ public class AsyncAggregationClient {
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
table.coprocessorService(channel -> AggregateService.newStub(channel),
|
table
|
||||||
(stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback),
|
.<AggregateService, AggregateResponse> coprocessorService(AggregateService::newStub,
|
||||||
scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(),
|
(stub, controller, rpcCallback) -> stub.getMedian(controller, req, rpcCallback), callback)
|
||||||
callback);
|
.fromRow(nullToEmpty(scan.getStartRow()), scan.includeStartRow())
|
||||||
|
.toRow(nullToEmpty(scan.getStopRow()), scan.includeStopRow()).execute();
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue