HBASE-24265 Remove hedged rpc call support, implement the logic in MaterRegistry … (#1593)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Duo Zhang 2020-05-06 15:55:26 +08:00
parent e6b8ca9dcf
commit 7f4683bafa
16 changed files with 445 additions and 634 deletions

View File

@ -18,9 +18,9 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY;
import static org.apache.hadoop.hbase.util.DNS.getHostname;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -29,7 +29,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@ -44,11 +46,15 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.DNS.ServerType;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
@ -61,52 +67,78 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaReg
/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
*
* It supports hedged reads, which can be enabled by setting
* {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
* out the requests batch is controlled by
* {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
*
* <p/>
* It supports hedged reads, set the fan out of the requests batch by
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
* it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
* <p/>
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {
/** Configuration key that controls the fan out of requests **/
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
"hbase.client.master_registry.hedged.fanout";
/** Default value for the fan out of hedged requests. **/
public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
private final int hedgedReadFanOut;
// Configured list of masters to probe the meta information from.
private final Set<ServerName> masterServers;
private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;
MasterRegistry(Configuration conf) throws UnknownHostException {
boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
Configuration finalConf;
if (!hedgedReadsEnabled) {
// If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
// the configuration so that other places reusing this reference is not affected.
finalConf = new Configuration(conf);
finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
} else {
finalConf = conf;
/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
* separated host[:port] values. If no port number if specified, default master port is assumed.
* @param conf Configuration to parse from.
*/
private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
Set<ServerName> masterAddrs = new HashSet<>();
String configuredMasters = getMasterAddr(conf);
for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
HostAndPort masterHostPort =
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
}
if (conf.get(MASTER_ADDRS_KEY) != null) {
finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY));
Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed");
return masterAddrs;
}
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
masterServers = new HashSet<>();
parseMasterAddrs(finalConf);
rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
Set<ServerName> masterAddrs = parseMasterAddrs(conf);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(masterAddrs.size());
User user = User.getCurrent();
for (ServerName masterAddr : masterAddrs) {
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
masterAddr2Stub = builder.build();
}
/**
* Builds the default master address end point if it is not specified in the configuration.
* <p/>
* Will be called in {@code HBaseTestingUtility}.
*/
@VisibleForTesting
public static String getMasterAddr(Configuration conf) throws UnknownHostException {
String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY);
if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
@ -118,63 +150,87 @@ public class MasterRegistry implements ConnectionRegistry {
}
/**
* @return Stub needed to make RPC using a hedged channel to the master end points.
* For describing the actual asynchronous rpc call.
* <p/>
* Typically, you can use lambda expression to implement this interface as
*
* <pre>
* (c, s, d) -> s.xxx(c, your request here, d)
* </pre>
*/
private ClientMetaService.Interface getMasterStub() throws IOException {
return ClientMetaService.newStub(
rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
@FunctionalInterface
private interface Callable<T> {
void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
}
private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
Callable<T> callable) {
HBaseRpcController controller = rpcControllerFactory.newController();
CompletableFuture<T> future = new CompletableFuture<>();
callable.call(controller, stub, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
});
return future;
}
private IOException badResponse(String debug) {
return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
}
/**
* Parses the list of master addresses from the provided configuration. Supported format is
* comma separated host[:port] values. If no port number if specified, default master port is
* assumed.
* @param conf Configuration to parse from.
* send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
* will complete the future and quit. If all the requests in one round are failed, we will start
* another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
* been tried and all of them are failed, we will fail the future.
*/
private void parseMasterAddrs(Configuration conf) throws UnknownHostException {
String configuredMasters = getMasterAddr(conf);
for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
HostAndPort masterHostPort =
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
private <T extends Message> void groupCall(CompletableFuture<T> future,
List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
addListener(call(masterStubs.get(i), callable), (r, e) -> {
// a simple check to skip all the later operations earlier
if (future.isDone()) {
return;
}
Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
if (e == null && !isValidResp.test(r)) {
e = badResponse(debug);
}
@VisibleForTesting
public Set<ServerName> getParsedMasterServers() {
return Collections.unmodifiableSet(masterServers);
}
/**
* Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
* the rpc finishes and the response is propagated to the passed future.
* @param future Result future to which the rpc response is propagated.
* @param isValidResp Checks if the rpc response has a valid result.
* @param transformResult Transforms the result to a different form as expected by callers.
* @param hrc RpcController instance for this rpc.
* @param debug Debug message passed along to the caller in case of exceptions.
* @param <T> RPC result type.
* @param <R> Transformed type of the result.
* @return A call back that can be embedded in the non-blocking rpc call.
*/
private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc,
final String debug) {
return rpcResult -> {
if (rpcResult == null) {
if (e != null) {
// make sure when remaining reaches 0 we have all exceptions in the errors queue
errors.add(e);
if (remaining.decrementAndGet() == 0) {
if (endIndexExclusive == masterStubs.size()) {
// we are done, complete the future with exception
RetriesExhaustedException ex = new RetriesExhaustedException("masters",
masterStubs.size(), new ArrayList<>(errors));
future.completeExceptionally(
new MasterRegistryFetchException(masterServers, hrc.getFailed()));
return;
new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
} else {
groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
errors);
}
if (!isValidResp.test(rpcResult)) {
// Rpc returned ok, but result was malformed.
future.completeExceptionally(new IOException(
String.format("Invalid result for request %s. Will be retried", debug)));
return;
}
future.complete(transformResult.apply(rpcResult));
};
} else {
// do not need to decrement the counter any more as we have already finished the future.
future.complete(r);
}
});
}
}
private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
Predicate<T> isValidResp, String debug) {
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
Collections.shuffle(masterStubs, ThreadLocalRandom.current());
CompletableFuture<T> future = new CompletableFuture<>();
groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
return future;
}
/**
@ -182,40 +238,25 @@ public class MasterRegistry implements ConnectionRegistry {
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList().forEach(
location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
resp.getMetaLocationsList()
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
return new RegionLocations(regionLocations);
}
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
(rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
"getMetaRegionLocations()");
try {
getMasterStub().getMetaRegionLocations(
hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
"getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
}
@Override
public CompletableFuture<String> getClusterId() {
CompletableFuture<String> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc,
"getClusterId()");
try {
getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
return this
.<GetClusterIdResponse> call(
(c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
GetClusterIdResponse::hasClusterId, "getClusterId()")
.thenApply(GetClusterIdResponse::getClusterId);
}
private ServerName transformServerName(GetActiveMasterResponse resp) {
@ -224,17 +265,16 @@ public class MasterRegistry implements ConnectionRegistry {
@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
"getActiveMaster()");
try {
getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
return this
.<GetActiveMasterResponse> call(
(c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
GetActiveMasterResponse::hasServerName, "getActiveMaster()")
.thenApply(this::transformServerName);
}
return result;
@VisibleForTesting
Set<ServerName> getParsedMasterServers() {
return masterAddr2Stub.keySet();
}
@Override

View File

@ -30,6 +30,9 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public class MasterRegistryFetchException extends HBaseIOException {
private static final long serialVersionUID = 6992134872168185171L;
public MasterRegistryFetchException(Set<ServerName> masters, Throwable failure) {
super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
failure);

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -47,6 +47,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
@ -60,6 +61,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
@ -512,13 +514,6 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
}
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
// Check HedgedRpcChannel implementation for detailed comments.
throw new UnsupportedOperationException("Hedging not supported for this implementation.");
}
private static class AbstractRpcChannel {
protected final InetSocketAddress addr;

View File

@ -1,274 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.PrettyPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
/**
* A non-blocking implementation of RpcChannel that hedges requests to multiple service end points.
* First received response is returned to the caller. This abstracts out the logic needed to batch
* requests to multiple end points underneath and presents itself as a single logical RpcChannel to
* the client.
*
* Hedging Details:
* ---------------
* - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address
* end points to make the call to. We do multiple iterations until we get a proper response to the
* rpc call or all the service addresses are exhausted, which ever happens first. Size of each is
* configurable and is also known as 'fanOutSize'.
*
* - We randomize the addresses up front so that the batch order per client is non deterministic.
* This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'.
* Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the
* load on the client and server side when configuring the fan out.
*
* - In a happy case, once we receive a response from one end point, we cancel all the
* other inflight rpcs in the same batch and return the response to the caller. If we do not get a
* valid response from any address end point, we propagate the error back to the caller.
*
* - Rpc timeouts are applied to every hedged rpc.
*
* - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can
* be hedged (for example: cluster state changing rpcs).
*
* (TODO) Retries and Adaptive hedging policy:
* ------------------------------------------
*
* - No retries are handled at the channel level. Retries can be built in upper layers. However the
* question is, do we even need retries? Hedging in fact is a substitute for retries.
*
* - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging
* policy more adaptive. In most happy cases, the rpcs from the first few end points should return
* right away (especially short lived rpcs, that do not take up much time). In such cases, hedging
* is not needed. So, the idea is to make this request pattern pluggable so that the requests are
* hedged only when needed.
*/
class HedgedRpcChannel implements RpcChannel {
private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);
/**
* Currently hedging is only supported for non-blocking connection implementation types because
* the channel implementation inherently relies on the connection implementation being async.
* Refer to the comments in doCallMethod().
*/
private final NettyRpcClient rpcClient;
// List of service addresses to hedge the requests to.
private final List<InetSocketAddress> addrs;
private final User ticket;
private final int rpcTimeout;
// Controls the size of request fan out (number of rpcs per a single batch).
private final int fanOutSize;
/**
* A simple rpc call back implementation to notify the batch context if any rpc is successful.
*/
private static class BatchRpcCtxCallBack implements RpcCallback<Message> {
private final BatchRpcCtx batchRpcCtx;
private final HBaseRpcController rpcController;
BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) {
this.batchRpcCtx = batchRpcCtx;
this.rpcController = rpcController;
}
@Override
public void run(Message result) {
batchRpcCtx.setResultIfNotSet(result, rpcController);
}
}
/**
* A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
* synchronize on multiple RPCs to different end points fetching the result. All the methods are
* thread-safe.
*/
private static class BatchRpcCtx {
// Result set by the thread finishing first. Set only once.
private final AtomicReference<Message> result = new AtomicReference<>();
// Caller waits on this latch being set.
// We set this to 1, so that the first successful RPC result is returned to the client.
private CountDownLatch resultsReady = new CountDownLatch(1);
// Failed rpc book-keeping.
private AtomicInteger failedRpcCount = new AtomicInteger();
// All the call handles for this batch.
private final List<Call> callsInFlight = Collections.synchronizedList(new ArrayList<>());
// Target addresses.
private final List<InetSocketAddress> addresses;
// Called when the result is ready.
private final RpcCallback<Message> callBack;
// Last failed rpc's exception. Used to propagate the reason to the controller.
private IOException lastFailedRpcReason;
BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> callBack) {
this.addresses = addresses;
this.callBack = Preconditions.checkNotNull(callBack);
}
/**
* Sets the result only if it is not already set by another thread. Thread that successfully
* sets the result also count downs the latch.
* @param result Result to be set.
*/
public void setResultIfNotSet(Message result, HBaseRpcController rpcController) {
if (rpcController.failed()) {
incrementFailedRpcs(rpcController.getFailed());
return;
}
if (this.result.compareAndSet(null, result)) {
resultsReady.countDown();
// Cancel all pending in flight calls.
for (Call call: callsInFlight) {
// It is ok to do it for all calls as it is a no-op if the call is already done.
final String exceptionMsg = String.format("%s canceled because another hedged attempt " +
"for the same rpc already succeeded. This is not needed anymore.", call);
call.setException(new CallCancelledException(exceptionMsg));
}
}
}
/**
* Waits until the results are populated and calls the callback if the call is successful.
* @return true for successful rpc and false otherwise.
*/
public boolean waitForResults() {
try {
// We do not set a timeout on await() because we rely on the underlying RPCs to timeout if
// something on the remote is broken. Worst case we should wait for rpc time out to kick in.
resultsReady.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
}
Message message = result.get();
if (message != null) {
callBack.run(message);
return true;
}
return false;
}
public void addCallInFlight(Call c) {
callsInFlight.add(c);
}
public void incrementFailedRpcs(IOException reason) {
if (failedRpcCount.incrementAndGet() == addresses.size()) {
lastFailedRpcReason = reason;
// All the rpcs in this batch have failed. Invoke the waiting threads.
resultsReady.countDown();
}
}
public IOException getLastFailedRpcReason() {
return lastFailedRpcReason;
}
@Override
public String toString() {
return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses));
}
}
public HedgedRpcChannel(NettyRpcClient rpcClient, Set<InetSocketAddress> addrs,
User ticket, int rpcTimeout, int fanOutSize) {
this.rpcClient = rpcClient;
this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
Preconditions.checkArgument(this.addrs.size() >= 1);
// For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same
// order, creating hot spots on the service end points.
Collections.shuffle(this.addrs);
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
// fanOutSize controls the number of hedged RPCs per batch.
this.fanOutSize = fanOutSize;
}
private HBaseRpcController applyRpcTimeout(RpcController controller) {
HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
int rpcTimeoutToSet =
hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout;
HBaseRpcController response = new HBaseRpcControllerImpl();
response.setCallTimeout(rpcTimeoutToSet);
return response;
}
private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller,
Message request, Message responsePrototype, RpcCallback<Message> done) {
int i = 0;
BatchRpcCtx lastBatchCtx = null;
while (i < addrs.size()) {
// Each iteration picks fanOutSize addresses to run as batch.
int batchEnd = Math.min(addrs.size(), i + fanOutSize);
List<InetSocketAddress> addrSubList = addrs.subList(i, batchEnd);
BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
lastBatchCtx = batchRpcCtx;
LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
for (InetSocketAddress address : addrSubList) {
HBaseRpcController rpcController = applyRpcTimeout(controller);
// ** WARN ** This is a blocking call if the underlying connection for the rpc client is
// a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
// the write calls. Handling blocking connection means that this should be run in a separate
// thread and hence more code complexity. Is it ok to handle only non-blocking connections?
batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
responsePrototype, ticket, address,
new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
}
if (batchRpcCtx.waitForResults()) {
return;
}
// Entire batch has failed, lets try the next batch.
LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
i = batchEnd;
}
Preconditions.checkNotNull(lastBatchCtx);
// All the batches failed, mark it a failed rpc.
// Propagate the failure reason. We propagate the last batch's last failing rpc reason.
// Can we do something better?
controller.setFailed(lastBatchCtx.getLastFailedRpcReason());
done.run(null);
}
@Override
public void callMethod(Descriptors.MethodDescriptor method, RpcController controller,
Message request, Message responsePrototype, RpcCallback<Message> done) {
// There is no reason to use any other implementation of RpcController.
Preconditions.checkState(controller instanceof HBaseRpcController);
// To make the channel non-blocking, we run the actual doCalMethod() async. The call back is
// called once the hedging finishes.
CompletableFuture.runAsync(
() -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done));
}
}

View File

@ -18,20 +18,14 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
@ -82,19 +76,6 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
return new NettyRpcConnection(this, remoteId);
}
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
Set<InetSocketAddress> addresses = new HashSet<>();
for (ServerName sn: sns) {
addresses.add(createAddr(sn));
}
return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
hedgedRpcFanOut);
}
@Override
protected void closeInternal() {
if (shutdownGroupWhenClose) {

View File

@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.ipc;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
@ -82,16 +82,6 @@ public interface RpcClient extends Closeable {
RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout)
throws IOException;
/**
* Creates a channel that can hedge request to multiple underlying channels.
* @param sns Set of servers for underlying channels.
* @param user user for the connection.
* @param rpcTimeout rpc timeout to use.
* @return A hedging rpc channel for this rpc client instance.
*/
RpcChannel createHedgedRpcChannel(final Set<ServerName> sns, final User user, int rpcTimeout)
throws IOException;
/**
* Interrupt the connections to the given server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and,

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
@Category({ ClientTests.class, SmallTests.class })
public class TestMasterRegistryHedgedReads {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
private static final ExecutorService EXECUTOR =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
private static AtomicInteger CALLED = new AtomicInteger(0);
private static volatile int BAD_RESP_INDEX;
private static volatile Set<Integer> GOOD_RESP_INDEXS;
private static GetClusterIdResponse RESP =
GetClusterIdResponse.newBuilder().setClusterId("id").build();
public static final class RpcClientImpl implements RpcClient {
public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
MetricsConnection metrics) {
}
@Override
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout)
throws IOException {
throw new UnsupportedOperationException();
}
@Override
public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
throws IOException {
return new RpcChannelImpl();
}
@Override
public void cancelConnections(ServerName sn) {
}
@Override
public void close() {
}
@Override
public boolean hasCellBlockSupport() {
return false;
}
}
public static final class RpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
// simulate the asynchronous behavior otherwise all logic will perform in the same thread...
EXECUTOR.execute(() -> {
int index = CALLED.getAndIncrement();
if (index == BAD_RESP_INDEX) {
done.run(GetClusterIdResponse.getDefaultInstance());
} else if (GOOD_RESP_INDEXS.contains(index)) {
done.run(RESP);
} else {
((HBaseRpcController) controller).setFailed("inject error");
done.run(null);
}
});
}
}
@BeforeClass
public static void setUpBeforeClass() {
Configuration conf = UTIL.getConfiguration();
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
RpcClient.class);
String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
.collect(Collectors.joining(","));
conf.set(HConstants.MASTER_ADDRS_KEY, masters);
}
@AfterClass
public static void tearDownAfterClass() {
EXECUTOR.shutdownNow();
}
@Before
public void setUp() {
CALLED.set(0);
BAD_RESP_INDEX = -1;
GOOD_RESP_INDEXS = Collections.emptySet();
}
private <T> T logIfError(CompletableFuture<T> future) throws IOException {
try {
return FutureUtils.get(future);
} catch (Throwable t) {
LOG.warn("", t);
throw t;
}
}
@Test
public void testAllFailNoHedged() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
try (MasterRegistry registry = new MasterRegistry(conf)) {
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
assertEquals(10, CALLED.get());
}
}
@Test
public void testAllFailHedged3() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
BAD_RESP_INDEX = 5;
try (MasterRegistry registry = new MasterRegistry(conf)) {
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
assertEquals(10, CALLED.get());
}
}
@Test
public void testFirstSucceededNoHedge() throws IOException {
Configuration conf = UTIL.getConfiguration();
// will be set to 1
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
GOOD_RESP_INDEXS =
IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
try (MasterRegistry registry = new MasterRegistry(conf)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
assertEquals(1, CALLED.get());
}
}
@Test
public void testSecondRoundSucceededHedge4() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
GOOD_RESP_INDEXS = Collections.singleton(6);
try (MasterRegistry registry = new MasterRegistry(conf)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
UTIL.waitFor(5000, () -> CALLED.get() == 8);
}
}
@Test
public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
GOOD_RESP_INDEXS = Collections.singleton(5);
try (MasterRegistry registry = new MasterRegistry(conf)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
UTIL.waitFor(5000, () -> CALLED.get() == 10);
Thread.sleep(1000);
// make sure we do not send more
assertEquals(10, CALLED.get());
}
}
}

View File

@ -190,13 +190,6 @@ public final class HConstants {
public static final String ZK_CONNECTION_REGISTRY_CLASS =
"org.apache.hadoop.hbase.client.ZKConnectionRegistry";
/** Configuration to enable hedged reads on master registry **/
public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
"hbase.client.master_registry.enable_hedged_reads";
/** Default value for enabling hedging reads on master registry **/
public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
/** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
@ -939,12 +932,6 @@ public final class HConstants {
*/
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
/** Configuration key that controls the fan out of requests in hedged channel implementation. **/
public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout";
/** Default value for the fan out of hedged requests. **/
public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2;
/**
* timeout for each read RPC
*/

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
@ -49,10 +53,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Base for TestFromClientSide* classes.
@ -85,19 +87,19 @@ class FromClientSideBase {
* to initialize from scratch. While this is a hack, it saves a ton of time for the full
* test and de-flakes it.
*/
protected static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
protected static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
if (TEST_UTIL == null) {
return false;
}
Configuration conf = TEST_UTIL.getConfiguration();
Class confClass = conf.getClass(
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
}
protected static final void initialize(Class registryImpl, int numHedgedReqs, Class<?>... cps)
protected static final void initialize(Class<?> registryImpl, int numHedgedReqs, Class<?>... cps)
throws Exception {
// initialize() is called for every unit test, however we only want to reset the cluster state
// at the end of every parameterized run.
@ -122,13 +124,8 @@ class FromClientSideBase {
conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true); // enable for below tests
conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
ConnectionRegistry.class);
if (numHedgedReqs == 1) {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
Preconditions.checkArgument(numHedgedReqs > 1);
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
Preconditions.checkArgument(numHedgedReqs > 0);
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
// Multiple masters needed only when hedged reads for master registry are enabled.
builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(SLAVES);

View File

@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@ -46,7 +46,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@ -157,13 +157,6 @@ public class TestClientTimeouts {
throws UnknownHostException {
return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout);
}
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
Preconditions.checkArgument(sns != null && sns.size() == 1);
return new RandomTimeoutRpcChannel(this, (ServerName)sns.toArray()[0], user, rpcTimeout);
}
}
/**

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;

View File

@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.junit.Assert.assertEquals;
import java.net.UnknownHostException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -77,14 +78,15 @@ public class TestMasterRegistry {
/**
* Makes sure the master registry parses the master end points in the configuration correctly.
*/
@Test public void testMasterAddressParsing() throws UnknownHostException {
@Test
public void testMasterAddressParsing() throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
int numMasters = 10;
conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
try (MasterRegistry registry = new MasterRegistry(conf)) {
List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
// Half of them would be without a port, duplicates are removed.
assertEquals(numMasters/2 + 1, parsedMasters.size());
assertEquals(numMasters / 2 + 1, parsedMasters.size());
// Sort in the increasing order of port numbers.
Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort));
for (int i = 0; i < parsedMasters.size(); i++) {
@ -100,18 +102,14 @@ public class TestMasterRegistry {
}
}
@Test public void testRegistryRPCs() throws Exception {
@Test
public void testRegistryRPCs() throws Exception {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
final int size = activeMaster.getMetaRegionLocationCache().
getMetaRegionLocations().get().size();
for (int numHedgedReqs = 1; numHedgedReqs <= 3; numHedgedReqs++) {
if (numHedgedReqs == 1) {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
final int size =
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size();
for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) {
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
try (MasterRegistry registry = new MasterRegistry(conf)) {
// Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
// because not all replicas had made it up before test started.
@ -120,8 +118,8 @@ public class TestMasterRegistry {
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaRegionLocationCache()
.getMetaRegionLocations().get();
List<HRegionLocation> actualMetaLocations =
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
Collections.sort(metaLocations);
Collections.sort(actualMetaLocations);
assertEquals(actualMetaLocations, metaLocations);

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -72,6 +73,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
@ -115,11 +117,11 @@ public class TestScannersFromClientSide {
}
@Parameterized.Parameters
public static Collection parameters() {
return Arrays.asList(new Object[][]{
{MasterRegistry.class, 1},
{MasterRegistry.class, 2},
{ZKConnectionRegistry.class, 1}
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{ MasterRegistry.class, 1},
{ MasterRegistry.class, 2},
{ ZKConnectionRegistry.class, 1}
});
}
@ -134,21 +136,21 @@ public class TestScannersFromClientSide {
* to initialize from scratch. While this is a hack, it saves a ton of time for the full
* test and de-flakes it.
*/
private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
private static boolean isSameParameterizedCluster(Class<?> registryImpl, int numHedgedReqs) {
// initialize() is called for every unit test, however we only want to reset the cluster state
// at the end of every parameterized run.
if (TEST_UTIL == null) {
return false;
}
Configuration conf = TEST_UTIL.getConfiguration();
Class confClass = conf.getClass(
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
}
public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception {
public TestScannersFromClientSide(Class<?> registryImpl, int numHedgedReqs) throws Exception {
if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
return;
}
@ -161,13 +163,8 @@ public class TestScannersFromClientSide {
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
ConnectionRegistry.class);
if (numHedgedReqs == 1) {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
Preconditions.checkArgument(numHedgedReqs > 1);
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
Preconditions.checkArgument(numHedgedReqs > 0);
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
// Multiple masters needed only when hedged reads for master registry are enabled.
builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);

View File

@ -26,30 +26,27 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyObject;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,7 +55,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -367,105 +363,6 @@ public abstract class AbstractTestIPC {
}
}
/**
* Tests the various request fan out values using a simple RPC hedged across a mix of running and
* failing servers.
*/
@Test
@Ignore
public void testHedgedAsyncEcho() throws Exception {
// Hedging is not supported for blocking connection types.
Assume.assumeFalse(this instanceof TestBlockingIPC);
List<RpcServer> rpcServers = new ArrayList<>();
List<InetSocketAddress> addresses = new ArrayList<>();
// Create a mix of running and failing servers.
final int numRunningServers = 5;
final int numFailingServers = 3;
final int numServers = numRunningServers + numFailingServers;
for (int i = 0; i < numRunningServers; i++) {
RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
for (int i = 0; i < numFailingServers; i++) {
RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
Configuration conf = HBaseConfiguration.create();
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
// Try out various fan out values starting from 1 -> numServers.
for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
// Update the client's underlying conf, should be ok for the test.
LOG.debug("Testing with request fan out: " + reqFanOut);
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
Interface stub = newStub(client, addresses);
BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
stub.echo(new HBaseRpcControllerImpl(),
EchoRequestProto.newBuilder().setMessage("hello").build(), done);
TestProtos.EchoResponseProto responseProto = done.get();
assertNotNull(responseProto);
assertEquals("hello", responseProto.getMessage());
LOG.debug("Ended test with request fan out: " + reqFanOut);
}
} finally {
for (RpcServer rpcServer: rpcServers) {
rpcServer.stop();
}
}
}
@Test
public void testHedgedAsyncTimeouts() throws Exception {
// Hedging is not supported for blocking connection types.
Assume.assumeFalse(this instanceof TestBlockingIPC);
List<RpcServer> rpcServers = new ArrayList<>();
List<InetSocketAddress> addresses = new ArrayList<>();
final int numServers = 3;
for (int i = 0; i < numServers; i++) {
RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
Configuration conf = HBaseConfiguration.create();
int timeout = 100;
int pauseTime = 1000;
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
// Try out various fan out values starting from 1 -> numServers.
for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
// Update the client's underlying conf, should be ok for the test.
LOG.debug("Testing with request fan out: " + reqFanOut);
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
Interface stub = newStub(client, addresses);
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
pcrc.setCallTimeout(timeout);
BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback);
assertNull(callback.get());
// Make sure the controller has the right exception propagated.
assertTrue(pcrc.getFailed() instanceof CallTimeoutException);
LOG.debug("Ended test with request fan out: " + reqFanOut);
}
} finally {
for (RpcServer rpcServer: rpcServers) {
rpcServer.stop();
}
}
}
@Test
public void testAsyncRemoteError() throws IOException {
AbstractRpcClient<?> client = createRpcClient(CONF);

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
@ -31,9 +29,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@ -67,17 +67,6 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
User.getCurrent(), 0));
}
public static Interface newStub(RpcClient client, List<InetSocketAddress> addrs)
throws IOException {
Set<ServerName> serverNames = new HashSet<>();
for (InetSocketAddress addr: addrs) {
serverNames.add(ServerName.valueOf(
addr.getHostName(), addr.getPort(), System.currentTimeMillis()));
}
return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel(
serverNames, User.getCurrent(), 0));
}
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {

View File

@ -24,15 +24,12 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -40,7 +37,6 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
@ -52,8 +48,6 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
@Category(MediumTests.class)
public class TestRpcClientLeaks {
@ -96,14 +90,6 @@ public class TestRpcClientLeaks {
};
}
// To keep the registry paths happy.
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
Preconditions.checkState(sns != null && sns.size() == 1);
return super.createRpcChannel((ServerName)sns.toArray()[0], user, rpcTimeout);
}
public static void enableThrowExceptions() {
throwException = true;
}