HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry … (#1593)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
a9a1b9524d
commit
c1cb22f0b3
|
@ -18,9 +18,9 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT;
|
||||
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY;
|
||||
import static org.apache.hadoop.hbase.util.DNS.getHostname;
|
||||
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -29,7 +29,9 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Predicate;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -44,11 +46,15 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
|||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.DNS.ServerType;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
|
||||
|
@ -61,53 +67,79 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaReg
|
|||
/**
|
||||
* Master based registry implementation. Makes RPCs to the configured master addresses from config
|
||||
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
|
||||
*
|
||||
* It supports hedged reads, which can be enabled by setting
|
||||
* {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
|
||||
* out the requests batch is controlled by
|
||||
* {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
|
||||
*
|
||||
* <p/>
|
||||
* It supports hedged reads, set the fan out of the requests batch by
|
||||
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
|
||||
* it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
|
||||
* <p/>
|
||||
* TODO: Handle changes to the configuration dynamically without having to restart the client.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MasterRegistry implements ConnectionRegistry {
|
||||
|
||||
/** Configuration key that controls the fan out of requests **/
|
||||
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
|
||||
"hbase.client.master_registry.hedged.fanout";
|
||||
|
||||
/** Default value for the fan out of hedged requests. **/
|
||||
public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
|
||||
|
||||
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
|
||||
|
||||
private final int hedgedReadFanOut;
|
||||
|
||||
// Configured list of masters to probe the meta information from.
|
||||
private final Set<ServerName> masterServers;
|
||||
private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
|
||||
|
||||
// RPC client used to talk to the masters.
|
||||
private final RpcClient rpcClient;
|
||||
private final RpcControllerFactory rpcControllerFactory;
|
||||
private final int rpcTimeoutMs;
|
||||
|
||||
MasterRegistry(Configuration conf) throws UnknownHostException {
|
||||
boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
|
||||
MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
|
||||
Configuration finalConf;
|
||||
if (!hedgedReadsEnabled) {
|
||||
// If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
|
||||
// the configuration so that other places reusing this reference is not affected.
|
||||
finalConf = new Configuration(conf);
|
||||
finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
|
||||
} else {
|
||||
finalConf = conf;
|
||||
/**
|
||||
* Parses the list of master addresses from the provided configuration. Supported format is comma
|
||||
* separated host[:port] values. If no port number if specified, default master port is assumed.
|
||||
* @param conf Configuration to parse from.
|
||||
*/
|
||||
private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
|
||||
Set<ServerName> masterAddrs = new HashSet<>();
|
||||
String configuredMasters = getMasterAddr(conf);
|
||||
for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
|
||||
HostAndPort masterHostPort =
|
||||
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
|
||||
masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
|
||||
}
|
||||
if (conf.get(MASTER_ADDRS_KEY) != null) {
|
||||
finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY));
|
||||
Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed");
|
||||
return masterAddrs;
|
||||
}
|
||||
|
||||
MasterRegistry(Configuration conf) throws IOException {
|
||||
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
|
||||
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
|
||||
int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
|
||||
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
|
||||
// this through the master registry...
|
||||
// This is a problem as we will use the cluster id to determine the authentication method
|
||||
rpcClient = RpcClientFactory.createClient(conf, null);
|
||||
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||
Set<ServerName> masterAddrs = parseMasterAddrs(conf);
|
||||
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
|
||||
ImmutableMap.builderWithExpectedSize(masterAddrs.size());
|
||||
User user = User.getCurrent();
|
||||
for (ServerName masterAddr : masterAddrs) {
|
||||
builder.put(masterAddr,
|
||||
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
|
||||
}
|
||||
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
|
||||
masterServers = new HashSet<>();
|
||||
parseMasterAddrs(finalConf);
|
||||
rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
|
||||
rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
|
||||
masterAddr2Stub = builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the default master address end point if it is not specified in the configuration.
|
||||
* <p/>
|
||||
* Will be called in {@code HBaseTestingUtility}.
|
||||
*/
|
||||
public static String getMasterAddr(Configuration conf) throws UnknownHostException {
|
||||
@VisibleForTesting
|
||||
public static String getMasterAddr(Configuration conf) throws UnknownHostException {
|
||||
String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY);
|
||||
if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
|
||||
return masterAddrFromConf;
|
||||
|
@ -118,63 +150,87 @@ public class MasterRegistry implements ConnectionRegistry {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return Stub needed to make RPC using a hedged channel to the master end points.
|
||||
* For describing the actual asynchronous rpc call.
|
||||
* <p/>
|
||||
* Typically, you can use lambda expression to implement this interface as
|
||||
*
|
||||
* <pre>
|
||||
* (c, s, d) -> s.xxx(c, your request here, d)
|
||||
* </pre>
|
||||
*/
|
||||
private ClientMetaService.Interface getMasterStub() throws IOException {
|
||||
return ClientMetaService.newStub(
|
||||
rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
|
||||
@FunctionalInterface
|
||||
private interface Callable<T> {
|
||||
void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
|
||||
}
|
||||
|
||||
private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
|
||||
Callable<T> callable) {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
callable.call(controller, stub, resp -> {
|
||||
if (controller.failed()) {
|
||||
future.completeExceptionally(controller.getFailed());
|
||||
} else {
|
||||
future.complete(resp);
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
private IOException badResponse(String debug) {
|
||||
return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the list of master addresses from the provided configuration. Supported format is
|
||||
* comma separated host[:port] values. If no port number if specified, default master port is
|
||||
* assumed.
|
||||
* @param conf Configuration to parse from.
|
||||
* send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
|
||||
* will complete the future and quit. If all the requests in one round are failed, we will start
|
||||
* another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
|
||||
* been tried and all of them are failed, we will fail the future.
|
||||
*/
|
||||
private void parseMasterAddrs(Configuration conf) throws UnknownHostException {
|
||||
String configuredMasters = getMasterAddr(conf);
|
||||
for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
|
||||
HostAndPort masterHostPort =
|
||||
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
|
||||
masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
|
||||
private <T extends Message> void groupCall(CompletableFuture<T> future,
|
||||
List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
|
||||
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
|
||||
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
|
||||
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
|
||||
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
|
||||
addListener(call(masterStubs.get(i), callable), (r, e) -> {
|
||||
// a simple check to skip all the later operations earlier
|
||||
if (future.isDone()) {
|
||||
return;
|
||||
}
|
||||
if (e == null && !isValidResp.test(r)) {
|
||||
e = badResponse(debug);
|
||||
}
|
||||
if (e != null) {
|
||||
// make sure when remaining reaches 0 we have all exceptions in the errors queue
|
||||
errors.add(e);
|
||||
if (remaining.decrementAndGet() == 0) {
|
||||
if (endIndexExclusive == masterStubs.size()) {
|
||||
// we are done, complete the future with exception
|
||||
RetriesExhaustedException ex = new RetriesExhaustedException("masters",
|
||||
masterStubs.size(), new ArrayList<>(errors));
|
||||
future.completeExceptionally(
|
||||
new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
|
||||
} else {
|
||||
groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
|
||||
errors);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// do not need to decrement the counter any more as we have already finished the future.
|
||||
future.complete(r);
|
||||
}
|
||||
});
|
||||
}
|
||||
Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Set<ServerName> getParsedMasterServers() {
|
||||
return Collections.unmodifiableSet(masterServers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
|
||||
* the rpc finishes and the response is propagated to the passed future.
|
||||
* @param future Result future to which the rpc response is propagated.
|
||||
* @param isValidResp Checks if the rpc response has a valid result.
|
||||
* @param transformResult Transforms the result to a different form as expected by callers.
|
||||
* @param hrc RpcController instance for this rpc.
|
||||
* @param debug Debug message passed along to the caller in case of exceptions.
|
||||
* @param <T> RPC result type.
|
||||
* @param <R> Transformed type of the result.
|
||||
* @return A call back that can be embedded in the non-blocking rpc call.
|
||||
*/
|
||||
private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
|
||||
Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc,
|
||||
final String debug) {
|
||||
return rpcResult -> {
|
||||
if (rpcResult == null) {
|
||||
future.completeExceptionally(
|
||||
new MasterRegistryFetchException(masterServers, hrc.getFailed()));
|
||||
return;
|
||||
}
|
||||
if (!isValidResp.test(rpcResult)) {
|
||||
// Rpc returned ok, but result was malformed.
|
||||
future.completeExceptionally(new IOException(
|
||||
String.format("Invalid result for request %s. Will be retried", debug)));
|
||||
return;
|
||||
}
|
||||
future.complete(transformResult.apply(rpcResult));
|
||||
};
|
||||
private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
|
||||
Predicate<T> isValidResp, String debug) {
|
||||
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
|
||||
Collections.shuffle(masterStubs, ThreadLocalRandom.current());
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -182,40 +238,25 @@ public class MasterRegistry implements ConnectionRegistry {
|
|||
*/
|
||||
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
|
||||
List<HRegionLocation> regionLocations = new ArrayList<>();
|
||||
resp.getMetaLocationsList().forEach(
|
||||
location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
|
||||
resp.getMetaLocationsList()
|
||||
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
|
||||
return new RegionLocations(regionLocations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
|
||||
CompletableFuture<RegionLocations> result = new CompletableFuture<>();
|
||||
HBaseRpcController hrc = rpcControllerFactory.newController();
|
||||
RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
|
||||
(rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
|
||||
"getMetaRegionLocations()");
|
||||
try {
|
||||
getMasterStub().getMetaRegionLocations(
|
||||
hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
|
||||
} catch (IOException e) {
|
||||
result.completeExceptionally(e);
|
||||
}
|
||||
return result;
|
||||
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
|
||||
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
|
||||
"getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<String> getClusterId() {
|
||||
CompletableFuture<String> result = new CompletableFuture<>();
|
||||
HBaseRpcController hrc = rpcControllerFactory.newController();
|
||||
RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
|
||||
GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc,
|
||||
"getClusterId()");
|
||||
try {
|
||||
getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
|
||||
} catch (IOException e) {
|
||||
result.completeExceptionally(e);
|
||||
}
|
||||
return result;
|
||||
return this
|
||||
.<GetClusterIdResponse> call(
|
||||
(c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
|
||||
GetClusterIdResponse::hasClusterId, "getClusterId()")
|
||||
.thenApply(GetClusterIdResponse::getClusterId);
|
||||
}
|
||||
|
||||
private ServerName transformServerName(GetActiveMasterResponse resp) {
|
||||
|
@ -224,17 +265,16 @@ public class MasterRegistry implements ConnectionRegistry {
|
|||
|
||||
@Override
|
||||
public CompletableFuture<ServerName> getActiveMaster() {
|
||||
CompletableFuture<ServerName> result = new CompletableFuture<>();
|
||||
HBaseRpcController hrc = rpcControllerFactory.newController();
|
||||
RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
|
||||
GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
|
||||
"getActiveMaster()");
|
||||
try {
|
||||
getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
|
||||
} catch (IOException e) {
|
||||
result.completeExceptionally(e);
|
||||
}
|
||||
return result;
|
||||
return this
|
||||
.<GetActiveMasterResponse> call(
|
||||
(c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
|
||||
GetActiveMasterResponse::hasServerName, "getActiveMaster()")
|
||||
.thenApply(this::transformServerName);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Set<ServerName> getParsedMasterServers() {
|
||||
return masterAddr2Stub.keySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -30,6 +30,9 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MasterRegistryFetchException extends HBaseIOException {
|
||||
|
||||
private static final long serialVersionUID = 6992134872168185171L;
|
||||
|
||||
public MasterRegistryFetchException(Set<ServerName> masters, Throwable failure) {
|
||||
super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
|
||||
failure);
|
||||
|
|
|
@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.ipc;
|
|||
|
||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
|
||||
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
@ -47,6 +47,7 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||
|
@ -60,6 +61,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
|
|||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
|
||||
/**
|
||||
|
@ -512,13 +514,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
|
||||
throws UnknownHostException {
|
||||
// Check HedgedRpcChannel implementation for detailed comments.
|
||||
throw new UnsupportedOperationException("Hedging not supported for this implementation.");
|
||||
}
|
||||
|
||||
private static class AbstractRpcChannel {
|
||||
|
||||
protected final InetSocketAddress addr;
|
||||
|
|
|
@ -1,274 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.PrettyPrinter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
|
||||
/**
|
||||
* A non-blocking implementation of RpcChannel that hedges requests to multiple service end points.
|
||||
* First received response is returned to the caller. This abstracts out the logic needed to batch
|
||||
* requests to multiple end points underneath and presents itself as a single logical RpcChannel to
|
||||
* the client.
|
||||
*
|
||||
* Hedging Details:
|
||||
* ---------------
|
||||
* - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address
|
||||
* end points to make the call to. We do multiple iterations until we get a proper response to the
|
||||
* rpc call or all the service addresses are exhausted, which ever happens first. Size of each is
|
||||
* configurable and is also known as 'fanOutSize'.
|
||||
*
|
||||
* - We randomize the addresses up front so that the batch order per client is non deterministic.
|
||||
* This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'.
|
||||
* Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the
|
||||
* load on the client and server side when configuring the fan out.
|
||||
*
|
||||
* - In a happy case, once we receive a response from one end point, we cancel all the
|
||||
* other inflight rpcs in the same batch and return the response to the caller. If we do not get a
|
||||
* valid response from any address end point, we propagate the error back to the caller.
|
||||
*
|
||||
* - Rpc timeouts are applied to every hedged rpc.
|
||||
*
|
||||
* - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can
|
||||
* be hedged (for example: cluster state changing rpcs).
|
||||
*
|
||||
* (TODO) Retries and Adaptive hedging policy:
|
||||
* ------------------------------------------
|
||||
*
|
||||
* - No retries are handled at the channel level. Retries can be built in upper layers. However the
|
||||
* question is, do we even need retries? Hedging in fact is a substitute for retries.
|
||||
*
|
||||
* - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging
|
||||
* policy more adaptive. In most happy cases, the rpcs from the first few end points should return
|
||||
* right away (especially short lived rpcs, that do not take up much time). In such cases, hedging
|
||||
* is not needed. So, the idea is to make this request pattern pluggable so that the requests are
|
||||
* hedged only when needed.
|
||||
*/
|
||||
class HedgedRpcChannel implements RpcChannel {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);
|
||||
|
||||
/**
|
||||
* Currently hedging is only supported for non-blocking connection implementation types because
|
||||
* the channel implementation inherently relies on the connection implementation being async.
|
||||
* Refer to the comments in doCallMethod().
|
||||
*/
|
||||
private final NettyRpcClient rpcClient;
|
||||
// List of service addresses to hedge the requests to.
|
||||
private final List<InetSocketAddress> addrs;
|
||||
private final User ticket;
|
||||
private final int rpcTimeout;
|
||||
// Controls the size of request fan out (number of rpcs per a single batch).
|
||||
private final int fanOutSize;
|
||||
|
||||
/**
|
||||
* A simple rpc call back implementation to notify the batch context if any rpc is successful.
|
||||
*/
|
||||
private static class BatchRpcCtxCallBack implements RpcCallback<Message> {
|
||||
private final BatchRpcCtx batchRpcCtx;
|
||||
private final HBaseRpcController rpcController;
|
||||
BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) {
|
||||
this.batchRpcCtx = batchRpcCtx;
|
||||
this.rpcController = rpcController;
|
||||
}
|
||||
@Override
|
||||
public void run(Message result) {
|
||||
batchRpcCtx.setResultIfNotSet(result, rpcController);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
|
||||
* synchronize on multiple RPCs to different end points fetching the result. All the methods are
|
||||
* thread-safe.
|
||||
*/
|
||||
private static class BatchRpcCtx {
|
||||
// Result set by the thread finishing first. Set only once.
|
||||
private final AtomicReference<Message> result = new AtomicReference<>();
|
||||
// Caller waits on this latch being set.
|
||||
// We set this to 1, so that the first successful RPC result is returned to the client.
|
||||
private CountDownLatch resultsReady = new CountDownLatch(1);
|
||||
// Failed rpc book-keeping.
|
||||
private AtomicInteger failedRpcCount = new AtomicInteger();
|
||||
// All the call handles for this batch.
|
||||
private final List<Call> callsInFlight = Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
// Target addresses.
|
||||
private final List<InetSocketAddress> addresses;
|
||||
// Called when the result is ready.
|
||||
private final RpcCallback<Message> callBack;
|
||||
// Last failed rpc's exception. Used to propagate the reason to the controller.
|
||||
private IOException lastFailedRpcReason;
|
||||
|
||||
|
||||
BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> callBack) {
|
||||
this.addresses = addresses;
|
||||
this.callBack = Preconditions.checkNotNull(callBack);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the result only if it is not already set by another thread. Thread that successfully
|
||||
* sets the result also count downs the latch.
|
||||
* @param result Result to be set.
|
||||
*/
|
||||
public void setResultIfNotSet(Message result, HBaseRpcController rpcController) {
|
||||
if (rpcController.failed()) {
|
||||
incrementFailedRpcs(rpcController.getFailed());
|
||||
return;
|
||||
}
|
||||
if (this.result.compareAndSet(null, result)) {
|
||||
resultsReady.countDown();
|
||||
// Cancel all pending in flight calls.
|
||||
for (Call call: callsInFlight) {
|
||||
// It is ok to do it for all calls as it is a no-op if the call is already done.
|
||||
final String exceptionMsg = String.format("%s canceled because another hedged attempt " +
|
||||
"for the same rpc already succeeded. This is not needed anymore.", call);
|
||||
call.setException(new CallCancelledException(exceptionMsg));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until the results are populated and calls the callback if the call is successful.
|
||||
* @return true for successful rpc and false otherwise.
|
||||
*/
|
||||
public boolean waitForResults() {
|
||||
try {
|
||||
// We do not set a timeout on await() because we rely on the underlying RPCs to timeout if
|
||||
// something on the remote is broken. Worst case we should wait for rpc time out to kick in.
|
||||
resultsReady.await();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
|
||||
}
|
||||
Message message = result.get();
|
||||
if (message != null) {
|
||||
callBack.run(message);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void addCallInFlight(Call c) {
|
||||
callsInFlight.add(c);
|
||||
}
|
||||
|
||||
public void incrementFailedRpcs(IOException reason) {
|
||||
if (failedRpcCount.incrementAndGet() == addresses.size()) {
|
||||
lastFailedRpcReason = reason;
|
||||
// All the rpcs in this batch have failed. Invoke the waiting threads.
|
||||
resultsReady.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public IOException getLastFailedRpcReason() {
|
||||
return lastFailedRpcReason;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses));
|
||||
}
|
||||
}
|
||||
|
||||
public HedgedRpcChannel(NettyRpcClient rpcClient, Set<InetSocketAddress> addrs,
|
||||
User ticket, int rpcTimeout, int fanOutSize) {
|
||||
this.rpcClient = rpcClient;
|
||||
this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
|
||||
Preconditions.checkArgument(this.addrs.size() >= 1);
|
||||
// For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same
|
||||
// order, creating hot spots on the service end points.
|
||||
Collections.shuffle(this.addrs);
|
||||
this.ticket = ticket;
|
||||
this.rpcTimeout = rpcTimeout;
|
||||
// fanOutSize controls the number of hedged RPCs per batch.
|
||||
this.fanOutSize = fanOutSize;
|
||||
}
|
||||
|
||||
private HBaseRpcController applyRpcTimeout(RpcController controller) {
|
||||
HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
|
||||
int rpcTimeoutToSet =
|
||||
hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout;
|
||||
HBaseRpcController response = new HBaseRpcControllerImpl();
|
||||
response.setCallTimeout(rpcTimeoutToSet);
|
||||
return response;
|
||||
}
|
||||
|
||||
private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller,
|
||||
Message request, Message responsePrototype, RpcCallback<Message> done) {
|
||||
int i = 0;
|
||||
BatchRpcCtx lastBatchCtx = null;
|
||||
while (i < addrs.size()) {
|
||||
// Each iteration picks fanOutSize addresses to run as batch.
|
||||
int batchEnd = Math.min(addrs.size(), i + fanOutSize);
|
||||
List<InetSocketAddress> addrSubList = addrs.subList(i, batchEnd);
|
||||
BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
|
||||
lastBatchCtx = batchRpcCtx;
|
||||
LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
|
||||
for (InetSocketAddress address : addrSubList) {
|
||||
HBaseRpcController rpcController = applyRpcTimeout(controller);
|
||||
// ** WARN ** This is a blocking call if the underlying connection for the rpc client is
|
||||
// a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
|
||||
// the write calls. Handling blocking connection means that this should be run in a separate
|
||||
// thread and hence more code complexity. Is it ok to handle only non-blocking connections?
|
||||
batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
|
||||
responsePrototype, ticket, address,
|
||||
new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
|
||||
}
|
||||
if (batchRpcCtx.waitForResults()) {
|
||||
return;
|
||||
}
|
||||
// Entire batch has failed, lets try the next batch.
|
||||
LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
|
||||
i = batchEnd;
|
||||
}
|
||||
Preconditions.checkNotNull(lastBatchCtx);
|
||||
// All the batches failed, mark it a failed rpc.
|
||||
// Propagate the failure reason. We propagate the last batch's last failing rpc reason.
|
||||
// Can we do something better?
|
||||
controller.setFailed(lastBatchCtx.getLastFailedRpcReason());
|
||||
done.run(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callMethod(Descriptors.MethodDescriptor method, RpcController controller,
|
||||
Message request, Message responsePrototype, RpcCallback<Message> done) {
|
||||
// There is no reason to use any other implementation of RpcController.
|
||||
Preconditions.checkState(controller instanceof HBaseRpcController);
|
||||
// To make the channel non-blocking, we run the actual doCalMethod() async. The call back is
|
||||
// called once the hedging finishes.
|
||||
CompletableFuture.runAsync(
|
||||
() -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done));
|
||||
}
|
||||
}
|
|
@ -18,20 +18,14 @@
|
|||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
|
||||
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
|
||||
|
@ -82,19 +76,6 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
|
|||
return new NettyRpcConnection(this, remoteId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
|
||||
throws UnknownHostException {
|
||||
final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
|
||||
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
|
||||
Set<InetSocketAddress> addresses = new HashSet<>();
|
||||
for (ServerName sn: sns) {
|
||||
addresses.add(createAddr(sn));
|
||||
}
|
||||
return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
|
||||
hedgedRpcFanOut);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeInternal() {
|
||||
if (shutdownGroupWhenClose) {
|
||||
|
|
|
@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.ipc;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
|
||||
|
||||
|
@ -82,16 +82,6 @@ public interface RpcClient extends Closeable {
|
|||
RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Creates a channel that can hedge request to multiple underlying channels.
|
||||
* @param sns Set of servers for underlying channels.
|
||||
* @param user user for the connection.
|
||||
* @param rpcTimeout rpc timeout to use.
|
||||
* @return A hedging rpc channel for this rpc client instance.
|
||||
*/
|
||||
RpcChannel createHedgedRpcChannel(final Set<ServerName> sns, final User user, int rpcTimeout)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Interrupt the connections to the given server. This should be called if the server
|
||||
* is known as actually dead. This will not prevent current operation to be retried, and,
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClient;
|
||||
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
|
||||
@Category({ ClientTests.class, SmallTests.class })
|
||||
public class TestMasterRegistryHedgedReads {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
|
||||
|
||||
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
|
||||
|
||||
private static final ExecutorService EXECUTOR =
|
||||
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
|
||||
|
||||
private static AtomicInteger CALLED = new AtomicInteger(0);
|
||||
|
||||
private static volatile int BAD_RESP_INDEX;
|
||||
|
||||
private static volatile Set<Integer> GOOD_RESP_INDEXS;
|
||||
|
||||
private static GetClusterIdResponse RESP =
|
||||
GetClusterIdResponse.newBuilder().setClusterId("id").build();
|
||||
|
||||
public static final class RpcClientImpl implements RpcClient {
|
||||
|
||||
public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
|
||||
MetricsConnection metrics) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
|
||||
throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
|
||||
throws IOException {
|
||||
return new RpcChannelImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelConnections(ServerName sn) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasCellBlockSupport() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class RpcChannelImpl implements RpcChannel {
|
||||
|
||||
@Override
|
||||
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
|
||||
Message responsePrototype, RpcCallback<Message> done) {
|
||||
// simulate the asynchronous behavior otherwise all logic will perform in the same thread...
|
||||
EXECUTOR.execute(() -> {
|
||||
int index = CALLED.getAndIncrement();
|
||||
if (index == BAD_RESP_INDEX) {
|
||||
done.run(GetClusterIdResponse.getDefaultInstance());
|
||||
} else if (GOOD_RESP_INDEXS.contains(index)) {
|
||||
done.run(RESP);
|
||||
} else {
|
||||
((HBaseRpcController) controller).setFailed("inject error");
|
||||
done.run(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
|
||||
RpcClient.class);
|
||||
String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
|
||||
.collect(Collectors.joining(","));
|
||||
conf.set(HConstants.MASTER_ADDRS_KEY, masters);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() {
|
||||
EXECUTOR.shutdownNow();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
CALLED.set(0);
|
||||
BAD_RESP_INDEX = -1;
|
||||
GOOD_RESP_INDEXS = Collections.emptySet();
|
||||
}
|
||||
|
||||
private <T> T logIfError(CompletableFuture<T> future) throws IOException {
|
||||
try {
|
||||
return FutureUtils.get(future);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("", t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllFailNoHedged() throws IOException {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
|
||||
try (MasterRegistry registry = new MasterRegistry(conf)) {
|
||||
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
|
||||
assertEquals(10, CALLED.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllFailHedged3() throws IOException {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
|
||||
BAD_RESP_INDEX = 5;
|
||||
try (MasterRegistry registry = new MasterRegistry(conf)) {
|
||||
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
|
||||
assertEquals(10, CALLED.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFirstSucceededNoHedge() throws IOException {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
// will be set to 1
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
|
||||
GOOD_RESP_INDEXS =
|
||||
IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
|
||||
try (MasterRegistry registry = new MasterRegistry(conf)) {
|
||||
String clusterId = logIfError(registry.getClusterId());
|
||||
assertEquals(RESP.getClusterId(), clusterId);
|
||||
assertEquals(1, CALLED.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecondRoundSucceededHedge4() throws IOException {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
|
||||
GOOD_RESP_INDEXS = Collections.singleton(6);
|
||||
try (MasterRegistry registry = new MasterRegistry(conf)) {
|
||||
String clusterId = logIfError(registry.getClusterId());
|
||||
assertEquals(RESP.getClusterId(), clusterId);
|
||||
UTIL.waitFor(5000, () -> CALLED.get() == 8);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
|
||||
GOOD_RESP_INDEXS = Collections.singleton(5);
|
||||
try (MasterRegistry registry = new MasterRegistry(conf)) {
|
||||
String clusterId = logIfError(registry.getClusterId());
|
||||
assertEquals(RESP.getClusterId(), clusterId);
|
||||
UTIL.waitFor(5000, () -> CALLED.get() == 10);
|
||||
Thread.sleep(1000);
|
||||
// make sure we do not send more
|
||||
assertEquals(10, CALLED.get());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -185,13 +185,6 @@ public final class HConstants {
|
|||
public static final String ZK_CONNECTION_REGISTRY_CLASS =
|
||||
"org.apache.hadoop.hbase.client.ZKConnectionRegistry";
|
||||
|
||||
/** Configuration to enable hedged reads on master registry **/
|
||||
public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
|
||||
"hbase.client.master_registry.enable_hedged_reads";
|
||||
|
||||
/** Default value for enabling hedging reads on master registry **/
|
||||
public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
|
||||
|
||||
/** Parameter name for the master type being backup (waits for primary to go inactive). */
|
||||
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
|
||||
|
||||
|
@ -917,12 +910,6 @@ public final class HConstants {
|
|||
*/
|
||||
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
|
||||
|
||||
/** Configuration key that controls the fan out of requests in hedged channel implementation. **/
|
||||
public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout";
|
||||
|
||||
/** Default value for the fan out of hedged requests. **/
|
||||
public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2;
|
||||
|
||||
/**
|
||||
* timeout for each read RPC
|
||||
*/
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.countRows;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
@ -53,6 +54,7 @@ import org.junit.runner.RunWith;
|
|||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
|
@ -86,20 +88,20 @@ class FromClientSideBase {
|
|||
* to initialize from scratch. While this is a hack, it saves a ton of time for the full
|
||||
* test and de-flakes it.
|
||||
*/
|
||||
protected static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
|
||||
protected static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
|
||||
if (TEST_UTIL == null) {
|
||||
return false;
|
||||
}
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
Class confClass = conf.getClass(
|
||||
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
|
||||
int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
|
||||
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
|
||||
Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
|
||||
ZKConnectionRegistry.class);
|
||||
int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
|
||||
MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
|
||||
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
|
||||
}
|
||||
|
||||
protected static final void initialize(Class registryImpl, int numHedgedReqs, Class<?>... cps)
|
||||
throws Exception {
|
||||
protected static final void initialize(Class<?> registryImpl, int numHedgedReqs, Class<?>... cps)
|
||||
throws Exception {
|
||||
// initialize() is called for every unit test, however we only want to reset the cluster state
|
||||
// at the end of every parameterized run.
|
||||
if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
|
||||
|
@ -123,13 +125,8 @@ class FromClientSideBase {
|
|||
conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true); // enable for below tests
|
||||
conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
|
||||
ConnectionRegistry.class);
|
||||
if (numHedgedReqs == 1) {
|
||||
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
|
||||
} else {
|
||||
Preconditions.checkArgument(numHedgedReqs > 1);
|
||||
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
|
||||
}
|
||||
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
|
||||
Preconditions.checkArgument(numHedgedReqs > 0);
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
|
||||
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
|
||||
// Multiple masters needed only when hedged reads for master registry are enabled.
|
||||
builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(SLAVES);
|
||||
|
|
|
@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -45,7 +45,7 @@ import org.junit.BeforeClass;
|
|||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
|
@ -146,14 +146,6 @@ public class TestClientTimeouts {
|
|||
throws UnknownHostException {
|
||||
return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
|
||||
throws UnknownHostException {
|
||||
Preconditions.checkArgument(sns != null && sns.size() == 1);
|
||||
return new RandomTimeoutRpcChannel(this, (ServerName)sns.toArray()[0], user, rpcTimeout);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static AtomicInteger invokations = new AtomicInteger();
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
|
|
@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -77,14 +78,15 @@ public class TestMasterRegistry {
|
|||
/**
|
||||
* Makes sure the master registry parses the master end points in the configuration correctly.
|
||||
*/
|
||||
@Test public void testMasterAddressParsing() throws UnknownHostException {
|
||||
@Test
|
||||
public void testMasterAddressParsing() throws IOException {
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
int numMasters = 10;
|
||||
conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
|
||||
try (MasterRegistry registry = new MasterRegistry(conf)) {
|
||||
List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
|
||||
// Half of them would be without a port, duplicates are removed.
|
||||
assertEquals(numMasters/2 + 1, parsedMasters.size());
|
||||
assertEquals(numMasters / 2 + 1, parsedMasters.size());
|
||||
// Sort in the increasing order of port numbers.
|
||||
Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort));
|
||||
for (int i = 0; i < parsedMasters.size(); i++) {
|
||||
|
@ -100,18 +102,14 @@ public class TestMasterRegistry {
|
|||
}
|
||||
}
|
||||
|
||||
@Test public void testRegistryRPCs() throws Exception {
|
||||
@Test
|
||||
public void testRegistryRPCs() throws Exception {
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
final int size = activeMaster.getMetaRegionLocationCache().
|
||||
getMetaRegionLocations().get().size();
|
||||
for (int numHedgedReqs = 1; numHedgedReqs <= 3; numHedgedReqs++) {
|
||||
if (numHedgedReqs == 1) {
|
||||
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
|
||||
} else {
|
||||
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
|
||||
}
|
||||
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
|
||||
final int size =
|
||||
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size();
|
||||
for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) {
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
|
||||
try (MasterRegistry registry = new MasterRegistry(conf)) {
|
||||
// Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
|
||||
// because not all replicas had made it up before test started.
|
||||
|
@ -119,9 +117,9 @@ public class TestMasterRegistry {
|
|||
assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
|
||||
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
|
||||
List<HRegionLocation> metaLocations =
|
||||
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
|
||||
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaRegionLocationCache()
|
||||
.getMetaRegionLocations().get();
|
||||
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
|
||||
List<HRegionLocation> actualMetaLocations =
|
||||
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
|
||||
Collections.sort(metaLocations);
|
||||
Collections.sort(actualMetaLocations);
|
||||
assertEquals(actualMetaLocations, metaLocations);
|
||||
|
|
|
@ -29,6 +29,7 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -68,6 +69,7 @@ import org.junit.runner.RunWith;
|
|||
import org.junit.runners.Parameterized;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
|
@ -101,7 +103,7 @@ public class TestScannersFromClientSide {
|
|||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection parameters() {
|
||||
public static Collection<Object[]> parameters() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{ MasterRegistry.class, 1},
|
||||
{ MasterRegistry.class, 2},
|
||||
|
@ -120,21 +122,21 @@ public class TestScannersFromClientSide {
|
|||
* to initialize from scratch. While this is a hack, it saves a ton of time for the full
|
||||
* test and de-flakes it.
|
||||
*/
|
||||
private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
|
||||
private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
|
||||
// initialize() is called for every unit test, however we only want to reset the cluster state
|
||||
// at the end of every parameterized run.
|
||||
if (TEST_UTIL == null) {
|
||||
return false;
|
||||
}
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
Class confClass = conf.getClass(
|
||||
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
|
||||
int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
|
||||
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
|
||||
Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
|
||||
ZKConnectionRegistry.class);
|
||||
int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
|
||||
MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
|
||||
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
|
||||
}
|
||||
|
||||
public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception {
|
||||
public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception {
|
||||
if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
|
||||
return;
|
||||
}
|
||||
|
@ -147,13 +149,8 @@ public class TestScannersFromClientSide {
|
|||
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
|
||||
conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
|
||||
ConnectionRegistry.class);
|
||||
if (numHedgedReqs == 1) {
|
||||
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
|
||||
} else {
|
||||
Preconditions.checkArgument(numHedgedReqs > 1);
|
||||
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
|
||||
}
|
||||
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
|
||||
Preconditions.checkArgument(numHedgedReqs > 0);
|
||||
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
|
||||
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
|
||||
// Multiple masters needed only when hedged reads for master registry are enabled.
|
||||
builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
|
||||
|
|
|
@ -26,30 +26,27 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.ArgumentMatchers.anyObject;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.internal.verification.VerificationModeFactory.times;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -58,7 +55,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
|||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
|
||||
|
@ -367,105 +363,6 @@ public abstract class AbstractTestIPC {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the various request fan out values using a simple RPC hedged across a mix of running and
|
||||
* failing servers.
|
||||
*/
|
||||
@Test
|
||||
@Ignore
|
||||
public void testHedgedAsyncEcho() throws Exception {
|
||||
// Hedging is not supported for blocking connection types.
|
||||
Assume.assumeFalse(this instanceof TestBlockingIPC);
|
||||
List<RpcServer> rpcServers = new ArrayList<>();
|
||||
List<InetSocketAddress> addresses = new ArrayList<>();
|
||||
// Create a mix of running and failing servers.
|
||||
final int numRunningServers = 5;
|
||||
final int numFailingServers = 3;
|
||||
final int numServers = numRunningServers + numFailingServers;
|
||||
for (int i = 0; i < numRunningServers; i++) {
|
||||
RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i,
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
rpcServer.start();
|
||||
addresses.add(rpcServer.getListenerAddress());
|
||||
rpcServers.add(rpcServer);
|
||||
}
|
||||
for (int i = 0; i < numFailingServers; i++) {
|
||||
RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i,
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
rpcServer.start();
|
||||
addresses.add(rpcServer.getListenerAddress());
|
||||
rpcServers.add(rpcServer);
|
||||
}
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
// Try out various fan out values starting from 1 -> numServers.
|
||||
for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
|
||||
// Update the client's underlying conf, should be ok for the test.
|
||||
LOG.debug("Testing with request fan out: " + reqFanOut);
|
||||
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
|
||||
Interface stub = newStub(client, addresses);
|
||||
BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
|
||||
stub.echo(new HBaseRpcControllerImpl(),
|
||||
EchoRequestProto.newBuilder().setMessage("hello").build(), done);
|
||||
TestProtos.EchoResponseProto responseProto = done.get();
|
||||
assertNotNull(responseProto);
|
||||
assertEquals("hello", responseProto.getMessage());
|
||||
LOG.debug("Ended test with request fan out: " + reqFanOut);
|
||||
}
|
||||
} finally {
|
||||
for (RpcServer rpcServer: rpcServers) {
|
||||
rpcServer.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHedgedAsyncTimeouts() throws Exception {
|
||||
// Hedging is not supported for blocking connection types.
|
||||
Assume.assumeFalse(this instanceof TestBlockingIPC);
|
||||
List<RpcServer> rpcServers = new ArrayList<>();
|
||||
List<InetSocketAddress> addresses = new ArrayList<>();
|
||||
final int numServers = 3;
|
||||
for (int i = 0; i < numServers; i++) {
|
||||
RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i,
|
||||
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
|
||||
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
|
||||
new FifoRpcScheduler(CONF, 1));
|
||||
rpcServer.start();
|
||||
addresses.add(rpcServer.getListenerAddress());
|
||||
rpcServers.add(rpcServer);
|
||||
}
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
int timeout = 100;
|
||||
int pauseTime = 1000;
|
||||
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
|
||||
// Try out various fan out values starting from 1 -> numServers.
|
||||
for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
|
||||
// Update the client's underlying conf, should be ok for the test.
|
||||
LOG.debug("Testing with request fan out: " + reqFanOut);
|
||||
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
|
||||
Interface stub = newStub(client, addresses);
|
||||
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
|
||||
pcrc.setCallTimeout(timeout);
|
||||
BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
|
||||
stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback);
|
||||
assertNull(callback.get());
|
||||
// Make sure the controller has the right exception propagated.
|
||||
assertTrue(pcrc.getFailed() instanceof CallTimeoutException);
|
||||
LOG.debug("Ended test with request fan out: " + reqFanOut);
|
||||
}
|
||||
} finally {
|
||||
for (RpcServer rpcServer: rpcServers) {
|
||||
rpcServer.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAsyncRemoteError() throws IOException {
|
||||
AbstractRpcClient<?> client = createRpcClient(CONF);
|
||||
|
|
|
@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.ipc;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
|
@ -31,9 +29,11 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
|
||||
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
|
||||
|
@ -67,17 +67,6 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
|
|||
User.getCurrent(), 0));
|
||||
}
|
||||
|
||||
public static Interface newStub(RpcClient client, List<InetSocketAddress> addrs)
|
||||
throws IOException {
|
||||
Set<ServerName> serverNames = new HashSet<>();
|
||||
for (InetSocketAddress addr: addrs) {
|
||||
serverNames.add(ServerName.valueOf(
|
||||
addr.getHostName(), addr.getPort(), System.currentTimeMillis()));
|
||||
}
|
||||
return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel(
|
||||
serverNames, User.getCurrent(), 0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
|
||||
throws ServiceException {
|
||||
|
|
|
@ -24,15 +24,12 @@ import static org.junit.Assert.fail;
|
|||
import java.io.IOException;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
|
@ -40,7 +37,6 @@ import org.apache.hadoop.hbase.client.Get;
|
|||
import org.apache.hadoop.hbase.client.MetricsConnection;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -52,8 +48,6 @@ import org.junit.experimental.categories.Category;
|
|||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestRpcClientLeaks {
|
||||
|
@ -96,14 +90,6 @@ public class TestRpcClientLeaks {
|
|||
};
|
||||
}
|
||||
|
||||
// To keep the registry paths happy.
|
||||
@Override
|
||||
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
|
||||
throws UnknownHostException {
|
||||
Preconditions.checkState(sns != null && sns.size() == 1);
|
||||
return super.createRpcChannel((ServerName)sns.toArray()[0], user, rpcTimeout);
|
||||
}
|
||||
|
||||
public static void enableThrowExceptions() {
|
||||
throwException = true;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue