HBASE-17142 Implement multi get
This commit is contained in:
parent
db5953c6fe
commit
a2e967d92f
|
@ -0,0 +1,406 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
|
||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||
|
||||
import io.netty.util.HashedWheelTimer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* Retry caller for multi get.
|
||||
* <p>
|
||||
* Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
|
||||
* other single operations
|
||||
* <p>
|
||||
* And the {@link #maxAttempts} is a limit for each single get in the batch logically. In the
|
||||
* implementation, we will record a {@code tries} parameter for each operation group, and if it is
|
||||
* split to several groups when retrying, the sub groups will inherit {@code tries}. You can imagine
|
||||
* that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of the depth
|
||||
* of the tree.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class AsyncMultiGetRpcRetryingCaller {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AsyncMultiGetRpcRetryingCaller.class);
|
||||
|
||||
private final HashedWheelTimer retryTimer;
|
||||
|
||||
private final AsyncConnectionImpl conn;
|
||||
|
||||
private final TableName tableName;
|
||||
|
||||
private final List<Get> gets;
|
||||
|
||||
private final List<CompletableFuture<Result>> futures;
|
||||
|
||||
private final IdentityHashMap<Get, CompletableFuture<Result>> get2Future;
|
||||
|
||||
private final IdentityHashMap<Get, List<ThrowableWithExtraContext>> get2Errors;
|
||||
|
||||
private final long pauseNs;
|
||||
|
||||
private final int maxAttempts;
|
||||
|
||||
private final long operationTimeoutNs;
|
||||
|
||||
private final long rpcTimeoutNs;
|
||||
|
||||
private final int startLogErrorsCnt;
|
||||
|
||||
private final long startNs;
|
||||
|
||||
// we can not use HRegionLocation as the map key because the hashCode and equals method of
|
||||
// HRegionLocation only consider serverName.
|
||||
private static final class RegionRequest {
|
||||
|
||||
public final HRegionLocation loc;
|
||||
|
||||
public final ConcurrentLinkedQueue<Get> gets = new ConcurrentLinkedQueue<>();
|
||||
|
||||
public RegionRequest(HRegionLocation loc) {
|
||||
this.loc = loc;
|
||||
}
|
||||
}
|
||||
|
||||
public AsyncMultiGetRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
|
||||
TableName tableName, List<Get> gets, long pauseNs, int maxRetries, long operationTimeoutNs,
|
||||
long rpcTimeoutNs, int startLogErrorsCnt) {
|
||||
this.retryTimer = retryTimer;
|
||||
this.conn = conn;
|
||||
this.tableName = tableName;
|
||||
this.gets = gets;
|
||||
this.pauseNs = pauseNs;
|
||||
this.maxAttempts = retries2Attempts(maxRetries);
|
||||
this.operationTimeoutNs = operationTimeoutNs;
|
||||
this.rpcTimeoutNs = rpcTimeoutNs;
|
||||
this.startLogErrorsCnt = startLogErrorsCnt;
|
||||
|
||||
this.futures = new ArrayList<>(gets.size());
|
||||
this.get2Future = new IdentityHashMap<>(gets.size());
|
||||
gets.forEach(
|
||||
get -> futures.add(get2Future.computeIfAbsent(get, k -> new CompletableFuture<>())));
|
||||
this.get2Errors = new IdentityHashMap<>();
|
||||
this.startNs = System.nanoTime();
|
||||
}
|
||||
|
||||
private long remainingTimeNs() {
|
||||
return operationTimeoutNs - (System.nanoTime() - startNs);
|
||||
}
|
||||
|
||||
private List<ThrowableWithExtraContext> removeErrors(Get get) {
|
||||
synchronized (get2Errors) {
|
||||
return get2Errors.remove(get);
|
||||
}
|
||||
}
|
||||
|
||||
private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
|
||||
Throwable error, ServerName serverName) {
|
||||
if (tries > startLogErrorsCnt) {
|
||||
String regions =
|
||||
regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'")
|
||||
.collect(Collectors.joining(",", "[", "]"));
|
||||
LOG.warn("Get data for " + regions + " in " + tableName + " from " + serverName
|
||||
+ " failed, tries=" + tries,
|
||||
error);
|
||||
}
|
||||
}
|
||||
|
||||
private String getExtras(ServerName serverName) {
|
||||
return serverName != null ? serverName.getServerName() : "";
|
||||
}
|
||||
|
||||
private void addError(Get get, Throwable error, ServerName serverName) {
|
||||
List<ThrowableWithExtraContext> errors;
|
||||
synchronized (get2Errors) {
|
||||
errors = get2Errors.computeIfAbsent(get, k -> new ArrayList<>());
|
||||
}
|
||||
errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
|
||||
serverName != null ? serverName.toString() : ""));
|
||||
}
|
||||
|
||||
private void addError(Iterable<Get> gets, Throwable error, ServerName serverName) {
|
||||
gets.forEach(get -> addError(get, error, serverName));
|
||||
}
|
||||
|
||||
private void failOne(Get get, int tries, Throwable error, long currentTime, String extras) {
|
||||
CompletableFuture<Result> future = get2Future.get(get);
|
||||
if (future.isDone()) {
|
||||
return;
|
||||
}
|
||||
ThrowableWithExtraContext errorWithCtx =
|
||||
new ThrowableWithExtraContext(error, currentTime, extras);
|
||||
List<ThrowableWithExtraContext> errors = removeErrors(get);
|
||||
if (errors == null) {
|
||||
errors = Collections.singletonList(errorWithCtx);
|
||||
} else {
|
||||
errors.add(errorWithCtx);
|
||||
}
|
||||
future.completeExceptionally(new RetriesExhaustedException(tries, errors));
|
||||
}
|
||||
|
||||
private void failAll(Stream<Get> gets, int tries, Throwable error, ServerName serverName) {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
String extras = getExtras(serverName);
|
||||
gets.forEach(get -> failOne(get, tries, error, currentTime, extras));
|
||||
}
|
||||
|
||||
private void failAll(Stream<Get> gets, int tries) {
|
||||
gets.forEach(get -> {
|
||||
CompletableFuture<Result> future = get2Future.get(get);
|
||||
if (future.isDone()) {
|
||||
return;
|
||||
}
|
||||
future.completeExceptionally(new RetriesExhaustedException(tries,
|
||||
Optional.ofNullable(removeErrors(get)).orElse(Collections.emptyList())));
|
||||
});
|
||||
}
|
||||
|
||||
private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> getsByRegion)
|
||||
throws IOException {
|
||||
ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
|
||||
for (Map.Entry<byte[], RegionRequest> entry : getsByRegion.entrySet()) {
|
||||
ClientProtos.RegionAction.Builder regionActionBuilder =
|
||||
ClientProtos.RegionAction.newBuilder().setRegion(
|
||||
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
|
||||
int index = 0;
|
||||
for (Get get : entry.getValue().gets) {
|
||||
regionActionBuilder.addAction(
|
||||
ClientProtos.Action.newBuilder().setIndex(index).setGet(ProtobufUtil.toGet(get)));
|
||||
index++;
|
||||
}
|
||||
multiRequestBuilder.addRegionAction(regionActionBuilder);
|
||||
}
|
||||
return multiRequestBuilder.build();
|
||||
}
|
||||
|
||||
private void onComplete(Map<byte[], RegionRequest> getsByRegion, int tries, ServerName serverName,
|
||||
MultiResponse resp) {
|
||||
List<Get> failedGets = new ArrayList<>();
|
||||
getsByRegion.forEach((rn, regionReq) -> {
|
||||
RegionResult regionResult = resp.getResults().get(rn);
|
||||
if (regionResult != null) {
|
||||
int index = 0;
|
||||
for (Get get : regionReq.gets) {
|
||||
Object result = regionResult.result.get(index);
|
||||
if (result == null) {
|
||||
LOG.error("Server sent us neither result nor exception for row '"
|
||||
+ Bytes.toStringBinary(get.getRow()) + "' of " + Bytes.toStringBinary(rn));
|
||||
addError(get, new RuntimeException("Invalid response"), serverName);
|
||||
failedGets.add(get);
|
||||
} else if (result instanceof Throwable) {
|
||||
Throwable error = translateException((Throwable) result);
|
||||
logException(tries, () -> Stream.of(regionReq), error, serverName);
|
||||
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
|
||||
failOne(get, tries, error, EnvironmentEdgeManager.currentTime(),
|
||||
getExtras(serverName));
|
||||
} else {
|
||||
failedGets.add(get);
|
||||
}
|
||||
} else {
|
||||
get2Future.get(get).complete((Result) result);
|
||||
}
|
||||
index++;
|
||||
}
|
||||
} else {
|
||||
Throwable t = resp.getException(rn);
|
||||
Throwable error;
|
||||
if (t == null) {
|
||||
LOG.error(
|
||||
"Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));
|
||||
error = new RuntimeException("Invalid response");
|
||||
} else {
|
||||
error = translateException(t);
|
||||
logException(tries, () -> Stream.of(regionReq), error, serverName);
|
||||
conn.getLocator().updateCachedLocation(regionReq.loc, error);
|
||||
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
|
||||
failAll(regionReq.gets.stream(), tries, error, serverName);
|
||||
return;
|
||||
}
|
||||
addError(regionReq.gets, error, serverName);
|
||||
failedGets.addAll(regionReq.gets);
|
||||
}
|
||||
}
|
||||
});
|
||||
if (!failedGets.isEmpty()) {
|
||||
tryResubmit(failedGets.stream(), tries);
|
||||
}
|
||||
}
|
||||
|
||||
private void send(Map<ServerName, ? extends Map<byte[], RegionRequest>> getsByServer, int tries) {
|
||||
long callTimeoutNs;
|
||||
if (operationTimeoutNs > 0) {
|
||||
long remainingNs = remainingTimeNs();
|
||||
if (remainingNs <= 0) {
|
||||
failAll(getsByServer.values().stream().flatMap(m -> m.values().stream())
|
||||
.flatMap(r -> r.gets.stream()),
|
||||
tries);
|
||||
return;
|
||||
}
|
||||
callTimeoutNs = Math.min(remainingNs, rpcTimeoutNs);
|
||||
} else {
|
||||
callTimeoutNs = rpcTimeoutNs;
|
||||
}
|
||||
getsByServer.forEach((sn, getsByRegion) -> {
|
||||
ClientService.Interface stub;
|
||||
try {
|
||||
stub = conn.getRegionServerStub(sn);
|
||||
} catch (IOException e) {
|
||||
onError(getsByRegion, tries, e, sn);
|
||||
return;
|
||||
}
|
||||
ClientProtos.MultiRequest req;
|
||||
try {
|
||||
req = buildReq(getsByRegion);
|
||||
} catch (IOException e) {
|
||||
onError(getsByRegion, tries, e, sn);
|
||||
return;
|
||||
}
|
||||
HBaseRpcController controller = conn.rpcControllerFactory.newController();
|
||||
resetController(controller, callTimeoutNs);
|
||||
stub.multi(controller, req, resp -> {
|
||||
if (controller.failed()) {
|
||||
onError(getsByRegion, tries, controller.getFailed(), sn);
|
||||
} else {
|
||||
try {
|
||||
onComplete(getsByRegion, tries, sn,
|
||||
ResponseConverter.getResults(req, resp, controller.cellScanner()));
|
||||
} catch (Exception e) {
|
||||
onError(getsByRegion, tries, e, sn);
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private void onError(Map<byte[], RegionRequest> getsByRegion, int tries, Throwable t,
|
||||
ServerName serverName) {
|
||||
Throwable error = translateException(t);
|
||||
logException(tries, () -> getsByRegion.values().stream(), error, serverName);
|
||||
if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
|
||||
failAll(getsByRegion.values().stream().flatMap(r -> r.gets.stream()), tries, error,
|
||||
serverName);
|
||||
return;
|
||||
}
|
||||
List<Get> copiedGets =
|
||||
getsByRegion.values().stream().flatMap(r -> r.gets.stream()).collect(Collectors.toList());
|
||||
addError(copiedGets, error, serverName);
|
||||
tryResubmit(copiedGets.stream(), tries);
|
||||
}
|
||||
|
||||
private void tryResubmit(Stream<Get> gets, int tries) {
|
||||
long delayNs;
|
||||
if (operationTimeoutNs > 0) {
|
||||
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
|
||||
if (maxDelayNs <= 0) {
|
||||
failAll(gets, tries);
|
||||
return;
|
||||
}
|
||||
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
|
||||
} else {
|
||||
delayNs = getPauseTime(pauseNs, tries - 1);
|
||||
}
|
||||
retryTimer.newTimeout(t -> groupAndSend(gets, tries + 1), delayNs, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
private void groupAndSend(Stream<Get> gets, int tries) {
|
||||
long locateTimeoutNs;
|
||||
if (operationTimeoutNs > 0) {
|
||||
locateTimeoutNs = remainingTimeNs();
|
||||
if (locateTimeoutNs <= 0) {
|
||||
failAll(gets, tries);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
locateTimeoutNs = -1L;
|
||||
}
|
||||
ConcurrentMap<ServerName, ConcurrentMap<byte[], RegionRequest>> getsByServer =
|
||||
new ConcurrentHashMap<>();
|
||||
ConcurrentLinkedQueue<Get> locateFailed = new ConcurrentLinkedQueue<>();
|
||||
CompletableFuture.allOf(gets.map(get -> conn.getLocator()
|
||||
.getRegionLocation(tableName, get.getRow(), locateTimeoutNs).whenComplete((loc, error) -> {
|
||||
if (error != null) {
|
||||
error = translateException(error);
|
||||
if (error instanceof DoNotRetryIOException) {
|
||||
failOne(get, tries, error, EnvironmentEdgeManager.currentTime(), "");
|
||||
return;
|
||||
}
|
||||
addError(get, error, null);
|
||||
locateFailed.add(get);
|
||||
} else {
|
||||
ConcurrentMap<byte[], RegionRequest> getsByRegion = computeIfAbsent(getsByServer,
|
||||
loc.getServerName(), () -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
|
||||
computeIfAbsent(getsByRegion, loc.getRegionInfo().getRegionName(),
|
||||
() -> new RegionRequest(loc)).gets.add(get);
|
||||
}
|
||||
})).toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
|
||||
if (!getsByServer.isEmpty()) {
|
||||
send(getsByServer, tries);
|
||||
}
|
||||
if (!locateFailed.isEmpty()) {
|
||||
tryResubmit(locateFailed.stream(), tries);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public List<CompletableFuture<Result>> call() {
|
||||
groupAndSend(gets.stream(), 1);
|
||||
return futures;
|
||||
}
|
||||
}
|
|
@ -257,4 +257,49 @@ class AsyncRpcRetryingCallerFactory {
|
|||
public ScanSingleRegionCallerBuilder scanSingleRegion() {
|
||||
return new ScanSingleRegionCallerBuilder();
|
||||
}
|
||||
|
||||
public class MultiGetCallerBuilder {
|
||||
|
||||
private TableName tableName;
|
||||
|
||||
private List<Get> gets;
|
||||
|
||||
private long operationTimeoutNs = -1L;
|
||||
|
||||
private long rpcTimeoutNs = -1L;
|
||||
|
||||
public MultiGetCallerBuilder table(TableName tableName) {
|
||||
this.tableName = tableName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MultiGetCallerBuilder gets(List<Get> gets) {
|
||||
this.gets = gets;
|
||||
return this;
|
||||
}
|
||||
|
||||
public MultiGetCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) {
|
||||
this.operationTimeoutNs = unit.toNanos(operationTimeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MultiGetCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
|
||||
this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AsyncMultiGetRpcRetryingCaller build() {
|
||||
return new AsyncMultiGetRpcRetryingCaller(retryTimer, conn, tableName, gets,
|
||||
conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
|
||||
rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
|
||||
}
|
||||
|
||||
public List<CompletableFuture<Result>> call() {
|
||||
return build().call();
|
||||
}
|
||||
}
|
||||
|
||||
public MultiGetCallerBuilder multiGet() {
|
||||
return new MultiGetCallerBuilder();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
|
||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
|
||||
|
@ -52,9 +53,6 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(AsyncSingleRequestRpcRetryingCaller.class);
|
||||
|
||||
// Add a delta to avoid timeout immediately after a retry sleeping.
|
||||
private static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||
|
||||
@FunctionalInterface
|
||||
public interface Callable<T> {
|
||||
CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc,
|
||||
|
@ -146,7 +144,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
|
|||
}
|
||||
long delayNs;
|
||||
if (operationTimeoutNs > 0) {
|
||||
long maxDelayNs = operationTimeoutNs - (System.nanoTime() - startNs) - SLEEP_DELTA_NS;
|
||||
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
|
||||
if (maxDelayNs <= 0) {
|
||||
completeExceptionally();
|
||||
return;
|
||||
|
|
|
@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -351,4 +352,27 @@ public interface AsyncTableBase {
|
|||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
|
||||
|
||||
/**
|
||||
* Extracts certain cells from the given rows, in batch.
|
||||
* <p>
|
||||
* Notice that you may not get all the results with this function, which means some of the
|
||||
* returned {@link CompletableFuture}s may succeed while some of the other returned
|
||||
* {@link CompletableFuture}s may fail.
|
||||
* @param gets The objects that specify what data to fetch and from which rows.
|
||||
* @return A list of {@link CompletableFuture}s that represent the result for each get.
|
||||
*/
|
||||
List<CompletableFuture<Result>> get(List<Get> gets);
|
||||
|
||||
/**
|
||||
* A simple version for batch get. It will fail if there are any failures and you will get the
|
||||
* whole result list at once if the operation is succeeded.
|
||||
* @param gets The objects that specify what data to fetch and from which rows.
|
||||
* @return A {@link CompletableFuture} that wrapper the result list.
|
||||
*/
|
||||
default CompletableFuture<List<Result>> getAll(List<Get> gets) {
|
||||
List<CompletableFuture<Result>> futures = get(gets);
|
||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
|
||||
.thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.util.List;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -191,4 +192,9 @@ class AsyncTableImpl implements AsyncTable {
|
|||
public void scan(Scan scan, ScanResultConsumer consumer) {
|
||||
pool.execute(() -> scan0(scan, consumer));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Result>> get(List<Get> gets) {
|
||||
return rawTable.get(gets).stream().map(this::wrap).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -324,4 +324,7 @@ public final class ConnectionUtils {
|
|||
return Result.create(Arrays.copyOfRange(rawCells, index, rawCells.length), null,
|
||||
result.isStale(), true);
|
||||
}
|
||||
|
||||
// Add a delta to avoid timeout immediately after a retry sleeping.
|
||||
static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
|
||||
}
|
||||
|
|
|
@ -32,9 +32,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
* especially for the {@link #scan(Scan, RawScanResultConsumer)} below.
|
||||
* <p>
|
||||
* TODO: For now the only difference between this interface and {@link AsyncTable} is the scan
|
||||
* method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat) so
|
||||
* it is not suitable for a normal user. If it is still the only difference after we implement most
|
||||
* features of AsyncTable, we can think about merge these two interfaces.
|
||||
* method. The {@link RawScanResultConsumer} exposes the implementation details of a scan(heartbeat)
|
||||
* so it is not suitable for a normal user. If it is still the only difference after we implement
|
||||
* most features of AsyncTable, we can think about merge these two interfaces.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
|
@ -42,13 +42,13 @@ public interface RawAsyncTable extends AsyncTableBase {
|
|||
|
||||
/**
|
||||
* The basic scan API uses the observer pattern. All results that match the given scan object will
|
||||
* be passed to the given {@code consumer} by calling {@link RawScanResultConsumer#onNext(Result[])}.
|
||||
* {@link RawScanResultConsumer#onComplete()} means the scan is finished, and
|
||||
* {@link RawScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan
|
||||
* is terminated. {@link RawScanResultConsumer#onHeartbeat()} means the RS is still working but we
|
||||
* can not get a valid result to call {@link RawScanResultConsumer#onNext(Result[])}. This is usually
|
||||
* because the matched results are too sparse, for example, a filter which almost filters out
|
||||
* everything is specified.
|
||||
* be passed to the given {@code consumer} by calling
|
||||
* {@link RawScanResultConsumer#onNext(Result[])}. {@link RawScanResultConsumer#onComplete()}
|
||||
* means the scan is finished, and {@link RawScanResultConsumer#onError(Throwable)} means we hit
|
||||
* an unrecoverable error and the scan is terminated. {@link RawScanResultConsumer#onHeartbeat()}
|
||||
* means the RS is still working but we can not get a valid result to call
|
||||
* {@link RawScanResultConsumer#onNext(Result[])}. This is usually because the matched results are
|
||||
* too sparse, for example, a filter which almost filters out everything is specified.
|
||||
* <p>
|
||||
* Notice that, the methods of the given {@code consumer} will be called directly in the rpc
|
||||
* framework's callback thread, so typically you should not do any time consuming work inside
|
||||
|
|
|
@ -405,4 +405,11 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
public long getScanTimeout(TimeUnit unit) {
|
||||
return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CompletableFuture<Result>> get(List<Get> gets) {
|
||||
return conn.callerFactory.multiGet().table(tableName).gets(gets)
|
||||
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
|
||||
.rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncTableMultiGet {
|
||||
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static TableName TABLE_NAME = TableName.valueOf("async");
|
||||
|
||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
||||
|
||||
private static byte[] CQ = Bytes.toBytes("cq");
|
||||
|
||||
private static int COUNT = 100;
|
||||
|
||||
private static AsyncConnection ASYNC_CONN;
|
||||
|
||||
@Parameter
|
||||
public Supplier<AsyncTableBase> getTable;
|
||||
|
||||
private static RawAsyncTable getRawTable() {
|
||||
return ASYNC_CONN.getRawTable(TABLE_NAME);
|
||||
}
|
||||
|
||||
private static AsyncTable getTable() {
|
||||
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static List<Object[]> params() {
|
||||
return Arrays.asList(new Supplier<?>[] { TestAsyncTableMultiGet::getRawTable },
|
||||
new Supplier<?>[] { TestAsyncTableMultiGet::getTable });
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
byte[][] splitKeys = new byte[8][];
|
||||
for (int i = 11; i < 99; i += 11) {
|
||||
splitKeys[i / 11 - 1] = Bytes.toBytes(String.format("%02d", i));
|
||||
}
|
||||
TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
|
||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
|
||||
RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
|
||||
List<CompletableFuture<?>> futures = new ArrayList<>();
|
||||
IntStream.range(0, COUNT).forEach(i -> futures.add(table.put(
|
||||
new Put(Bytes.toBytes(String.format("%02d", i))).addColumn(FAMILY, CQ, Bytes.toBytes(i)))));
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).get();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
ASYNC_CONN.close();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private void move() throws IOException, InterruptedException {
|
||||
HRegionServer src = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME);
|
||||
HRegionServer dst = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
|
||||
.map(t -> t.getRegionServer()).filter(r -> r != src).findAny().get();
|
||||
Region region = src.getOnlineRegions(TABLE_NAME).stream().findAny().get();
|
||||
TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(dst.getServerName().getServerName()));
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
private void test(BiFunction<AsyncTableBase, List<Get>, List<Result>> getFunc)
|
||||
throws IOException, InterruptedException {
|
||||
AsyncTableBase table = getTable.get();
|
||||
List<Get> gets =
|
||||
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(String.format("%02d", i))))
|
||||
.collect(Collectors.toList());
|
||||
List<Result> results = getFunc.apply(table, gets);
|
||||
assertEquals(COUNT, results.size());
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
Result result = results.get(i);
|
||||
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
|
||||
}
|
||||
// test basic failure recovery
|
||||
move();
|
||||
results = getFunc.apply(table, gets);
|
||||
assertEquals(COUNT, results.size());
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
Result result = results.get(i);
|
||||
assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGet() throws InterruptedException, IOException {
|
||||
test((table, gets) -> {
|
||||
return table.get(gets).stream().map(f -> {
|
||||
try {
|
||||
return f.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAll() throws InterruptedException, IOException {
|
||||
test((table, gets) -> {
|
||||
try {
|
||||
return table.getAll(gets).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue