diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java index 9b01429f83e..4d0a591a5b4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY; -import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT; -import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY; import static org.apache.hadoop.hbase.util.DNS.getHostname; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; @@ -29,7 +29,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -44,11 +46,15 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.DNS.ServerType; import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Strings; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest; @@ -61,53 +67,79 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaReg /** * Master based registry implementation. Makes RPCs to the configured master addresses from config * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}. - * - * It supports hedged reads, which can be enabled by setting - * {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan - * out the requests batch is controlled by - * {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}. - * + *

+ * It supports hedged reads, set the fan out of the requests batch by + * {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable + * it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}). + *

* TODO: Handle changes to the configuration dynamically without having to restart the client. */ @InterfaceAudience.Private public class MasterRegistry implements ConnectionRegistry { + + /** Configuration key that controls the fan out of requests **/ + public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY = + "hbase.client.master_registry.hedged.fanout"; + + /** Default value for the fan out of hedged requests. **/ + public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2; + private static final String MASTER_ADDRS_CONF_SEPARATOR = ","; + private final int hedgedReadFanOut; + // Configured list of masters to probe the meta information from. - private final Set masterServers; + private final ImmutableMap masterAddr2Stub; // RPC client used to talk to the masters. private final RpcClient rpcClient; private final RpcControllerFactory rpcControllerFactory; - private final int rpcTimeoutMs; - MasterRegistry(Configuration conf) throws UnknownHostException { - boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, - MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT); - Configuration finalConf; - if (!hedgedReadsEnabled) { - // If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of - // the configuration so that other places reusing this reference is not affected. - finalConf = new Configuration(conf); - finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1); - } else { - finalConf = conf; + /** + * Parses the list of master addresses from the provided configuration. Supported format is comma + * separated host[:port] values. If no port number if specified, default master port is assumed. + * @param conf Configuration to parse from. + */ + private static Set parseMasterAddrs(Configuration conf) throws UnknownHostException { + Set masterAddrs = new HashSet<>(); + String configuredMasters = getMasterAddr(conf); + for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) { + HostAndPort masterHostPort = + HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT); + masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE)); } - if (conf.get(MASTER_ADDRS_KEY) != null) { - finalConf.set(MASTER_ADDRS_KEY, conf.get(MASTER_ADDRS_KEY)); + Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed"); + return masterAddrs; + } + + MasterRegistry(Configuration conf) throws IOException { + this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, + MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); + int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch + // this through the master registry... + // This is a problem as we will use the cluster id to determine the authentication method + rpcClient = RpcClientFactory.createClient(conf, null); + rpcControllerFactory = RpcControllerFactory.instantiate(conf); + Set masterAddrs = parseMasterAddrs(conf); + ImmutableMap.Builder builder = + ImmutableMap.builderWithExpectedSize(masterAddrs.size()); + User user = User.getCurrent(); + for (ServerName masterAddr : masterAddrs) { + builder.put(masterAddr, + ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); } - rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - masterServers = new HashSet<>(); - parseMasterAddrs(finalConf); - rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT); - rpcControllerFactory = RpcControllerFactory.instantiate(finalConf); + masterAddr2Stub = builder.build(); } /** * Builds the default master address end point if it is not specified in the configuration. + *

+ * Will be called in {@code HBaseTestingUtility}. */ - public static String getMasterAddr(Configuration conf) throws UnknownHostException { + @VisibleForTesting + public static String getMasterAddr(Configuration conf) throws UnknownHostException { String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY); if (!Strings.isNullOrEmpty(masterAddrFromConf)) { return masterAddrFromConf; @@ -118,63 +150,87 @@ public class MasterRegistry implements ConnectionRegistry { } /** - * @return Stub needed to make RPC using a hedged channel to the master end points. + * For describing the actual asynchronous rpc call. + *

+ * Typically, you can use lambda expression to implement this interface as + * + *

+   * (c, s, d) -> s.xxx(c, your request here, d)
+   * 
*/ - private ClientMetaService.Interface getMasterStub() throws IOException { - return ClientMetaService.newStub( - rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs)); + @FunctionalInterface + private interface Callable { + void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback done); + } + + private CompletableFuture call(ClientMetaService.Interface stub, + Callable callable) { + HBaseRpcController controller = rpcControllerFactory.newController(); + CompletableFuture future = new CompletableFuture<>(); + callable.call(controller, stub, resp -> { + if (controller.failed()) { + future.completeExceptionally(controller.getFailed()); + } else { + future.complete(resp); + } + }); + return future; + } + + private IOException badResponse(String debug) { + return new IOException(String.format("Invalid result for request %s. Will be retried", debug)); } /** - * Parses the list of master addresses from the provided configuration. Supported format is - * comma separated host[:port] values. If no port number if specified, default master port is - * assumed. - * @param conf Configuration to parse from. + * send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we + * will complete the future and quit. If all the requests in one round are failed, we will start + * another round to send requests concurrently tohedgedReadsFanout masters. If all masters have + * been tried and all of them are failed, we will fail the future. */ - private void parseMasterAddrs(Configuration conf) throws UnknownHostException { - String configuredMasters = getMasterAddr(conf); - for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) { - HostAndPort masterHostPort = - HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT); - masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE)); + private void groupCall(CompletableFuture future, + List masterStubs, int startIndexInclusive, Callable callable, + Predicate isValidResp, String debug, ConcurrentLinkedQueue errors) { + int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size()); + AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive); + for (int i = startIndexInclusive; i < endIndexExclusive; i++) { + addListener(call(masterStubs.get(i), callable), (r, e) -> { + // a simple check to skip all the later operations earlier + if (future.isDone()) { + return; + } + if (e == null && !isValidResp.test(r)) { + e = badResponse(debug); + } + if (e != null) { + // make sure when remaining reaches 0 we have all exceptions in the errors queue + errors.add(e); + if (remaining.decrementAndGet() == 0) { + if (endIndexExclusive == masterStubs.size()) { + // we are done, complete the future with exception + RetriesExhaustedException ex = new RetriesExhaustedException("masters", + masterStubs.size(), new ArrayList<>(errors)); + future.completeExceptionally( + new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex)); + } else { + groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug, + errors); + } + } + } else { + // do not need to decrement the counter any more as we have already finished the future. + future.complete(r); + } + }); } - Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed"); } - @VisibleForTesting - public Set getParsedMasterServers() { - return Collections.unmodifiableSet(masterServers); - } - - /** - * Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once - * the rpc finishes and the response is propagated to the passed future. - * @param future Result future to which the rpc response is propagated. - * @param isValidResp Checks if the rpc response has a valid result. - * @param transformResult Transforms the result to a different form as expected by callers. - * @param hrc RpcController instance for this rpc. - * @param debug Debug message passed along to the caller in case of exceptions. - * @param RPC result type. - * @param Transformed type of the result. - * @return A call back that can be embedded in the non-blocking rpc call. - */ - private RpcCallback getRpcCallBack(CompletableFuture future, - Predicate isValidResp, Function transformResult, HBaseRpcController hrc, - final String debug) { - return rpcResult -> { - if (rpcResult == null) { - future.completeExceptionally( - new MasterRegistryFetchException(masterServers, hrc.getFailed())); - return; - } - if (!isValidResp.test(rpcResult)) { - // Rpc returned ok, but result was malformed. - future.completeExceptionally(new IOException( - String.format("Invalid result for request %s. Will be retried", debug))); - return; - } - future.complete(transformResult.apply(rpcResult)); - }; + private CompletableFuture call(Callable callable, + Predicate isValidResp, String debug) { + List masterStubs = new ArrayList<>(masterAddr2Stub.values()); + Collections.shuffle(masterStubs, ThreadLocalRandom.current()); + CompletableFuture future = new CompletableFuture<>(); + groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>()); + return future; } /** @@ -182,40 +238,25 @@ public class MasterRegistry implements ConnectionRegistry { */ private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { List regionLocations = new ArrayList<>(); - resp.getMetaLocationsList().forEach( - location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); + resp.getMetaLocationsList() + .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); return new RegionLocations(regionLocations); } @Override public CompletableFuture getMetaRegionLocations() { - CompletableFuture result = new CompletableFuture<>(); - HBaseRpcController hrc = rpcControllerFactory.newController(); - RpcCallback callback = getRpcCallBack(result, - (rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc, - "getMetaRegionLocations()"); - try { - getMasterStub().getMetaRegionLocations( - hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback); - } catch (IOException e) { - result.completeExceptionally(e); - } - return result; + return this. call((c, s, d) -> s.getMetaRegionLocations(c, + GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0, + "getMetaLocationsCount").thenApply(this::transformMetaRegionLocations); } @Override public CompletableFuture getClusterId() { - CompletableFuture result = new CompletableFuture<>(); - HBaseRpcController hrc = rpcControllerFactory.newController(); - RpcCallback callback = getRpcCallBack(result, - GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc, - "getClusterId()"); - try { - getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback); - } catch (IOException e) { - result.completeExceptionally(e); - } - return result; + return this + . call( + (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d), + GetClusterIdResponse::hasClusterId, "getClusterId()") + .thenApply(GetClusterIdResponse::getClusterId); } private ServerName transformServerName(GetActiveMasterResponse resp) { @@ -224,17 +265,16 @@ public class MasterRegistry implements ConnectionRegistry { @Override public CompletableFuture getActiveMaster() { - CompletableFuture result = new CompletableFuture<>(); - HBaseRpcController hrc = rpcControllerFactory.newController(); - RpcCallback callback = getRpcCallBack(result, - GetActiveMasterResponse::hasServerName, this::transformServerName, hrc, - "getActiveMaster()"); - try { - getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback); - } catch (IOException e) { - result.completeExceptionally(e); - } - return result; + return this + . call( + (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d), + GetActiveMasterResponse::hasServerName, "getActiveMaster()") + .thenApply(this::transformServerName); + } + + @VisibleForTesting + Set getParsedMasterServers() { + return masterAddr2Stub.keySet(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java index 18871befef8..ca80ed565a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java @@ -30,6 +30,9 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public class MasterRegistryFetchException extends HBaseIOException { + + private static final long serialVersionUID = 6992134872168185171L; + public MasterRegistryFetchException(Set masters, Throwable failure) { super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)), failure); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index b0baff23d51..bf2f3615735 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; + import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.Collection; -import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -47,6 +47,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; @@ -60,6 +61,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; /** @@ -512,13 +514,6 @@ public abstract class AbstractRpcClient implements RpcC return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); } - @Override - public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout) - throws UnknownHostException { - // Check HedgedRpcChannel implementation for detailed comments. - throw new UnsupportedOperationException("Hedging not supported for this implementation."); - } - private static class AbstractRpcChannel { protected final InetSocketAddress addr; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java deleted file mode 100644 index 7b681e079bd..00000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.PrettyPrinter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; - -/** - * A non-blocking implementation of RpcChannel that hedges requests to multiple service end points. - * First received response is returned to the caller. This abstracts out the logic needed to batch - * requests to multiple end points underneath and presents itself as a single logical RpcChannel to - * the client. - * - * Hedging Details: - * --------------- - * - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address - * end points to make the call to. We do multiple iterations until we get a proper response to the - * rpc call or all the service addresses are exhausted, which ever happens first. Size of each is - * configurable and is also known as 'fanOutSize'. - * - * - We randomize the addresses up front so that the batch order per client is non deterministic. - * This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'. - * Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the - * load on the client and server side when configuring the fan out. - * - * - In a happy case, once we receive a response from one end point, we cancel all the - * other inflight rpcs in the same batch and return the response to the caller. If we do not get a - * valid response from any address end point, we propagate the error back to the caller. - * - * - Rpc timeouts are applied to every hedged rpc. - * - * - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can - * be hedged (for example: cluster state changing rpcs). - * - * (TODO) Retries and Adaptive hedging policy: - * ------------------------------------------ - * - * - No retries are handled at the channel level. Retries can be built in upper layers. However the - * question is, do we even need retries? Hedging in fact is a substitute for retries. - * - * - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging - * policy more adaptive. In most happy cases, the rpcs from the first few end points should return - * right away (especially short lived rpcs, that do not take up much time). In such cases, hedging - * is not needed. So, the idea is to make this request pattern pluggable so that the requests are - * hedged only when needed. - */ -class HedgedRpcChannel implements RpcChannel { - private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class); - - /** - * Currently hedging is only supported for non-blocking connection implementation types because - * the channel implementation inherently relies on the connection implementation being async. - * Refer to the comments in doCallMethod(). - */ - private final NettyRpcClient rpcClient; - // List of service addresses to hedge the requests to. - private final List addrs; - private final User ticket; - private final int rpcTimeout; - // Controls the size of request fan out (number of rpcs per a single batch). - private final int fanOutSize; - - /** - * A simple rpc call back implementation to notify the batch context if any rpc is successful. - */ - private static class BatchRpcCtxCallBack implements RpcCallback { - private final BatchRpcCtx batchRpcCtx; - private final HBaseRpcController rpcController; - BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) { - this.batchRpcCtx = batchRpcCtx; - this.rpcController = rpcController; - } - @Override - public void run(Message result) { - batchRpcCtx.setResultIfNotSet(result, rpcController); - } - } - - /** - * A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to - * synchronize on multiple RPCs to different end points fetching the result. All the methods are - * thread-safe. - */ - private static class BatchRpcCtx { - // Result set by the thread finishing first. Set only once. - private final AtomicReference result = new AtomicReference<>(); - // Caller waits on this latch being set. - // We set this to 1, so that the first successful RPC result is returned to the client. - private CountDownLatch resultsReady = new CountDownLatch(1); - // Failed rpc book-keeping. - private AtomicInteger failedRpcCount = new AtomicInteger(); - // All the call handles for this batch. - private final List callsInFlight = Collections.synchronizedList(new ArrayList<>()); - - // Target addresses. - private final List addresses; - // Called when the result is ready. - private final RpcCallback callBack; - // Last failed rpc's exception. Used to propagate the reason to the controller. - private IOException lastFailedRpcReason; - - - BatchRpcCtx(List addresses, RpcCallback callBack) { - this.addresses = addresses; - this.callBack = Preconditions.checkNotNull(callBack); - } - - /** - * Sets the result only if it is not already set by another thread. Thread that successfully - * sets the result also count downs the latch. - * @param result Result to be set. - */ - public void setResultIfNotSet(Message result, HBaseRpcController rpcController) { - if (rpcController.failed()) { - incrementFailedRpcs(rpcController.getFailed()); - return; - } - if (this.result.compareAndSet(null, result)) { - resultsReady.countDown(); - // Cancel all pending in flight calls. - for (Call call: callsInFlight) { - // It is ok to do it for all calls as it is a no-op if the call is already done. - final String exceptionMsg = String.format("%s canceled because another hedged attempt " + - "for the same rpc already succeeded. This is not needed anymore.", call); - call.setException(new CallCancelledException(exceptionMsg)); - } - } - } - - /** - * Waits until the results are populated and calls the callback if the call is successful. - * @return true for successful rpc and false otherwise. - */ - public boolean waitForResults() { - try { - // We do not set a timeout on await() because we rely on the underlying RPCs to timeout if - // something on the remote is broken. Worst case we should wait for rpc time out to kick in. - resultsReady.await(); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e); - } - Message message = result.get(); - if (message != null) { - callBack.run(message); - return true; - } - return false; - } - - public void addCallInFlight(Call c) { - callsInFlight.add(c); - } - - public void incrementFailedRpcs(IOException reason) { - if (failedRpcCount.incrementAndGet() == addresses.size()) { - lastFailedRpcReason = reason; - // All the rpcs in this batch have failed. Invoke the waiting threads. - resultsReady.countDown(); - } - } - - public IOException getLastFailedRpcReason() { - return lastFailedRpcReason; - } - - @Override - public String toString() { - return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses)); - } - } - - public HedgedRpcChannel(NettyRpcClient rpcClient, Set addrs, - User ticket, int rpcTimeout, int fanOutSize) { - this.rpcClient = rpcClient; - this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs)); - Preconditions.checkArgument(this.addrs.size() >= 1); - // For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same - // order, creating hot spots on the service end points. - Collections.shuffle(this.addrs); - this.ticket = ticket; - this.rpcTimeout = rpcTimeout; - // fanOutSize controls the number of hedged RPCs per batch. - this.fanOutSize = fanOutSize; - } - - private HBaseRpcController applyRpcTimeout(RpcController controller) { - HBaseRpcController hBaseRpcController = (HBaseRpcController) controller; - int rpcTimeoutToSet = - hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout; - HBaseRpcController response = new HBaseRpcControllerImpl(); - response.setCallTimeout(rpcTimeoutToSet); - return response; - } - - private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller, - Message request, Message responsePrototype, RpcCallback done) { - int i = 0; - BatchRpcCtx lastBatchCtx = null; - while (i < addrs.size()) { - // Each iteration picks fanOutSize addresses to run as batch. - int batchEnd = Math.min(addrs.size(), i + fanOutSize); - List addrSubList = addrs.subList(i, batchEnd); - BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done); - lastBatchCtx = batchRpcCtx; - LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx); - for (InetSocketAddress address : addrSubList) { - HBaseRpcController rpcController = applyRpcTimeout(controller); - // ** WARN ** This is a blocking call if the underlying connection for the rpc client is - // a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all - // the write calls. Handling blocking connection means that this should be run in a separate - // thread and hence more code complexity. Is it ok to handle only non-blocking connections? - batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request, - responsePrototype, ticket, address, - new BatchRpcCtxCallBack(batchRpcCtx, rpcController))); - } - if (batchRpcCtx.waitForResults()) { - return; - } - // Entire batch has failed, lets try the next batch. - LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx); - i = batchEnd; - } - Preconditions.checkNotNull(lastBatchCtx); - // All the batches failed, mark it a failed rpc. - // Propagate the failure reason. We propagate the last batch's last failing rpc reason. - // Can we do something better? - controller.setFailed(lastBatchCtx.getLastFailedRpcReason()); - done.run(null); - } - - @Override - public void callMethod(Descriptors.MethodDescriptor method, RpcController controller, - Message request, Message responsePrototype, RpcCallback done) { - // There is no reason to use any other implementation of RpcController. - Preconditions.checkState(controller instanceof HBaseRpcController); - // To make the channel non-blocking, we run the actual doCalMethod() async. The call back is - // called once the hedging finishes. - CompletableFuture.runAsync( - () -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done)); - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index f7a65e4d6ff..4c85e3d51ab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -18,20 +18,14 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.util.HashSet; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; + import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; @@ -82,19 +76,6 @@ public class NettyRpcClient extends AbstractRpcClient { return new NettyRpcConnection(this, remoteId); } - @Override - public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout) - throws UnknownHostException { - final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, - HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT); - Set addresses = new HashSet<>(); - for (ServerName sn: sns) { - addresses.add(createAddr(sn)); - } - return new HedgedRpcChannel(this, addresses, user, rpcTimeout, - hedgedRpcFanOut); - } - @Override protected void closeInternal() { if (shutdownGroupWhenClose) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 558fceeb78b..877d9b0d5b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.ipc; import java.io.Closeable; import java.io.IOException; -import java.util.Set; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; @@ -82,16 +82,6 @@ public interface RpcClient extends Closeable { RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) throws IOException; - /** - * Creates a channel that can hedge request to multiple underlying channels. - * @param sns Set of servers for underlying channels. - * @param user user for the connection. - * @param rpcTimeout rpc timeout to use. - * @return A hedging rpc channel for this rpc client instance. - */ - RpcChannel createHedgedRpcChannel(final Set sns, final User user, int rpcTimeout) - throws IOException; - /** * Interrupt the connections to the given server. This should be called if the server * is known as actually dead. This will not prevent current operation to be retried, and, diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java new file mode 100644 index 00000000000..8bbdce64887 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestMasterRegistryHedgedReads { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class); + + private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + private static final ExecutorService EXECUTOR = + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()); + + private static AtomicInteger CALLED = new AtomicInteger(0); + + private static volatile int BAD_RESP_INDEX; + + private static volatile Set GOOD_RESP_INDEXS; + + private static GetClusterIdResponse RESP = + GetClusterIdResponse.newBuilder().setClusterId("id").build(); + + public static final class RpcClientImpl implements RpcClient { + + public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress, + MetricsConnection metrics) { + } + + @Override + public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) + throws IOException { + return new RpcChannelImpl(); + } + + @Override + public void cancelConnections(ServerName sn) { + } + + @Override + public void close() { + } + + @Override + public boolean hasCellBlockSupport() { + return false; + } + } + + public static final class RpcChannelImpl implements RpcChannel { + + @Override + public void callMethod(MethodDescriptor method, RpcController controller, Message request, + Message responsePrototype, RpcCallback done) { + // simulate the asynchronous behavior otherwise all logic will perform in the same thread... + EXECUTOR.execute(() -> { + int index = CALLED.getAndIncrement(); + if (index == BAD_RESP_INDEX) { + done.run(GetClusterIdResponse.getDefaultInstance()); + } else if (GOOD_RESP_INDEXS.contains(index)) { + done.run(RESP); + } else { + ((HBaseRpcController) controller).setFailed("inject error"); + done.run(null); + } + }); + } + } + + @BeforeClass + public static void setUpBeforeClass() { + Configuration conf = UTIL.getConfiguration(); + conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class, + RpcClient.class); + String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i)) + .collect(Collectors.joining(",")); + conf.set(HConstants.MASTER_ADDRS_KEY, masters); + } + + @AfterClass + public static void tearDownAfterClass() { + EXECUTOR.shutdownNow(); + } + + @Before + public void setUp() { + CALLED.set(0); + BAD_RESP_INDEX = -1; + GOOD_RESP_INDEXS = Collections.emptySet(); + } + + private T logIfError(CompletableFuture future) throws IOException { + try { + return FutureUtils.get(future); + } catch (Throwable t) { + LOG.warn("", t); + throw t; + } + } + + @Test + public void testAllFailNoHedged() throws IOException { + Configuration conf = UTIL.getConfiguration(); + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1); + try (MasterRegistry registry = new MasterRegistry(conf)) { + assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); + assertEquals(10, CALLED.get()); + } + } + + @Test + public void testAllFailHedged3() throws IOException { + Configuration conf = UTIL.getConfiguration(); + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3); + BAD_RESP_INDEX = 5; + try (MasterRegistry registry = new MasterRegistry(conf)) { + assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); + assertEquals(10, CALLED.get()); + } + } + + @Test + public void testFirstSucceededNoHedge() throws IOException { + Configuration conf = UTIL.getConfiguration(); + // will be set to 1 + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0); + GOOD_RESP_INDEXS = + IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet()); + try (MasterRegistry registry = new MasterRegistry(conf)) { + String clusterId = logIfError(registry.getClusterId()); + assertEquals(RESP.getClusterId(), clusterId); + assertEquals(1, CALLED.get()); + } + } + + @Test + public void testSecondRoundSucceededHedge4() throws IOException { + Configuration conf = UTIL.getConfiguration(); + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4); + GOOD_RESP_INDEXS = Collections.singleton(6); + try (MasterRegistry registry = new MasterRegistry(conf)) { + String clusterId = logIfError(registry.getClusterId()); + assertEquals(RESP.getClusterId(), clusterId); + UTIL.waitFor(5000, () -> CALLED.get() == 8); + } + } + + @Test + public void testSucceededWithLargestHedged() throws IOException, InterruptedException { + Configuration conf = UTIL.getConfiguration(); + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE); + GOOD_RESP_INDEXS = Collections.singleton(5); + try (MasterRegistry registry = new MasterRegistry(conf)) { + String clusterId = logIfError(registry.getClusterId()); + assertEquals(RESP.getClusterId(), clusterId); + UTIL.waitFor(5000, () -> CALLED.get() == 10); + Thread.sleep(1000); + // make sure we do not send more + assertEquals(10, CALLED.get()); + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 6f1bb6ad68e..b8098f417eb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -190,13 +190,6 @@ public final class HConstants { public static final String ZK_CONNECTION_REGISTRY_CLASS = "org.apache.hadoop.hbase.client.ZKConnectionRegistry"; - /** Configuration to enable hedged reads on master registry **/ - public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY = - "hbase.client.master_registry.enable_hedged_reads"; - - /** Default value for enabling hedging reads on master registry **/ - public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false; - /** Parameter name for the master type being backup (waits for primary to go inactive). */ public static final String MASTER_TYPE_BACKUP = "hbase.master.backup"; @@ -939,12 +932,6 @@ public final class HConstants { */ public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; - /** Configuration key that controls the fan out of requests in hedged channel implementation. **/ - public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout"; - - /** Default value for the fan out of hedged requests. **/ - public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2; - /** * timeout for each read RPC */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java index 6787a11321a..3c539a020af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.Arrays; import java.util.Iterator; @@ -49,10 +53,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; /** * Base for TestFromClientSide* classes. @@ -85,20 +87,20 @@ class FromClientSideBase { * to initialize from scratch. While this is a hack, it saves a ton of time for the full * test and de-flakes it. */ - protected static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) { + protected static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) { if (TEST_UTIL == null) { return false; } Configuration conf = TEST_UTIL.getConfiguration(); - Class confClass = conf.getClass( - HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class); - int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, - HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT); + Class confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + ZKConnectionRegistry.class); + int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, + MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT); return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; } - protected static final void initialize(Class registryImpl, int numHedgedReqs, Class... cps) - throws Exception { + protected static final void initialize(Class registryImpl, int numHedgedReqs, Class... cps) + throws Exception { // initialize() is called for every unit test, however we only want to reset the cluster state // at the end of every parameterized run. if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { @@ -122,13 +124,8 @@ class FromClientSideBase { conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true); // enable for below tests conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, ConnectionRegistry.class); - if (numHedgedReqs == 1) { - conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false); - } else { - Preconditions.checkArgument(numHedgedReqs > 1); - conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true); - } - conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); + Preconditions.checkArgument(numHedgedReqs > 0); + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); // Multiple masters needed only when hedged reads for master registry are enabled. builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(SLAVES); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java index e0e7b7111e1..aa8e10d5111 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java @@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; + import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Random; -import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -46,7 +46,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -157,13 +157,6 @@ public class TestClientTimeouts { throws UnknownHostException { return new RandomTimeoutRpcChannel(this, sn, ticket, rpcTimeout); } - - @Override - public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout) - throws UnknownHostException { - Preconditions.checkArgument(sns != null && sns.size() == 1); - return new RandomTimeoutRpcChannel(this, (ServerName)sns.toArray()[0], user, rpcTimeout); - } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 4a83527b115..ac02e9b3f6a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java index 65b2d0bc956..0e0060cbfe5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java @@ -19,7 +19,8 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM; import static org.junit.Assert.assertEquals; -import java.net.UnknownHostException; + +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -77,14 +78,15 @@ public class TestMasterRegistry { /** * Makes sure the master registry parses the master end points in the configuration correctly. */ - @Test public void testMasterAddressParsing() throws UnknownHostException { + @Test + public void testMasterAddressParsing() throws IOException { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); int numMasters = 10; conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters)); try (MasterRegistry registry = new MasterRegistry(conf)) { List parsedMasters = new ArrayList<>(registry.getParsedMasterServers()); // Half of them would be without a port, duplicates are removed. - assertEquals(numMasters/2 + 1, parsedMasters.size()); + assertEquals(numMasters / 2 + 1, parsedMasters.size()); // Sort in the increasing order of port numbers. Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort)); for (int i = 0; i < parsedMasters.size(); i++) { @@ -100,18 +102,14 @@ public class TestMasterRegistry { } } - @Test public void testRegistryRPCs() throws Exception { + @Test + public void testRegistryRPCs() throws Exception { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); - final int size = activeMaster.getMetaRegionLocationCache(). - getMetaRegionLocations().get().size(); - for (int numHedgedReqs = 1; numHedgedReqs <= 3; numHedgedReqs++) { - if (numHedgedReqs == 1) { - conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false); - } else { - conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true); - } - conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); + final int size = + activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get().size(); + for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) { + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); try (MasterRegistry registry = new MasterRegistry(conf)) { // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion // because not all replicas had made it up before test started. @@ -119,9 +117,9 @@ public class TestMasterRegistry { assertEquals(registry.getClusterId().get(), activeMaster.getClusterId()); assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName()); List metaLocations = - Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations()); - List actualMetaLocations = activeMaster.getMetaRegionLocationCache() - .getMetaRegionLocations().get(); + Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations()); + List actualMetaLocations = + activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get(); Collections.sort(metaLocations); Collections.sort(actualMetaLocations); assertEquals(actualMetaLocations, metaLocations); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index 328bbbf052e..e10f34257e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -72,6 +73,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** @@ -115,11 +117,11 @@ public class TestScannersFromClientSide { } @Parameterized.Parameters - public static Collection parameters() { - return Arrays.asList(new Object[][]{ - {MasterRegistry.class, 1}, - {MasterRegistry.class, 2}, - {ZKConnectionRegistry.class, 1} + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { MasterRegistry.class, 1}, + { MasterRegistry.class, 2}, + { ZKConnectionRegistry.class, 1} }); } @@ -134,21 +136,21 @@ public class TestScannersFromClientSide { * to initialize from scratch. While this is a hack, it saves a ton of time for the full * test and de-flakes it. */ - private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) { + private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) { // initialize() is called for every unit test, however we only want to reset the cluster state // at the end of every parameterized run. if (TEST_UTIL == null) { return false; } Configuration conf = TEST_UTIL.getConfiguration(); - Class confClass = conf.getClass( - HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class); - int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, - HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT); + Class confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + ZKConnectionRegistry.class); + int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, + MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT); return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; } - public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception { + public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception { if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { return; } @@ -161,13 +163,8 @@ public class TestScannersFromClientSide { conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, ConnectionRegistry.class); - if (numHedgedReqs == 1) { - conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false); - } else { - Preconditions.checkArgument(numHedgedReqs > 1); - conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true); - } - conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); + Preconditions.checkArgument(numHedgedReqs > 0); + conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); // Multiple masters needed only when hedged reads for master registry are enabled. builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index c3bf1c25753..87561bac745 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -26,30 +26,27 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyObject; +import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; -import org.junit.Assume; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +55,6 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; -import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -367,105 +363,6 @@ public abstract class AbstractTestIPC { } } - /** - * Tests the various request fan out values using a simple RPC hedged across a mix of running and - * failing servers. - */ - @Test - @Ignore - public void testHedgedAsyncEcho() throws Exception { - // Hedging is not supported for blocking connection types. - Assume.assumeFalse(this instanceof TestBlockingIPC); - List rpcServers = new ArrayList<>(); - List addresses = new ArrayList<>(); - // Create a mix of running and failing servers. - final int numRunningServers = 5; - final int numFailingServers = 3; - final int numServers = numRunningServers + numFailingServers; - for (int i = 0; i < numRunningServers; i++) { - RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i, - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( - SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, - new FifoRpcScheduler(CONF, 1)); - rpcServer.start(); - addresses.add(rpcServer.getListenerAddress()); - rpcServers.add(rpcServer); - } - for (int i = 0; i < numFailingServers; i++) { - RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i, - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( - SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, - new FifoRpcScheduler(CONF, 1)); - rpcServer.start(); - addresses.add(rpcServer.getListenerAddress()); - rpcServers.add(rpcServer); - } - Configuration conf = HBaseConfiguration.create(); - try (AbstractRpcClient client = createRpcClient(conf)) { - // Try out various fan out values starting from 1 -> numServers. - for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) { - // Update the client's underlying conf, should be ok for the test. - LOG.debug("Testing with request fan out: " + reqFanOut); - conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut); - Interface stub = newStub(client, addresses); - BlockingRpcCallback done = new BlockingRpcCallback<>(); - stub.echo(new HBaseRpcControllerImpl(), - EchoRequestProto.newBuilder().setMessage("hello").build(), done); - TestProtos.EchoResponseProto responseProto = done.get(); - assertNotNull(responseProto); - assertEquals("hello", responseProto.getMessage()); - LOG.debug("Ended test with request fan out: " + reqFanOut); - } - } finally { - for (RpcServer rpcServer: rpcServers) { - rpcServer.stop(); - } - } - } - - @Test - public void testHedgedAsyncTimeouts() throws Exception { - // Hedging is not supported for blocking connection types. - Assume.assumeFalse(this instanceof TestBlockingIPC); - List rpcServers = new ArrayList<>(); - List addresses = new ArrayList<>(); - final int numServers = 3; - for (int i = 0; i < numServers; i++) { - RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i, - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( - SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, - new FifoRpcScheduler(CONF, 1)); - rpcServer.start(); - addresses.add(rpcServer.getListenerAddress()); - rpcServers.add(rpcServer); - } - Configuration conf = HBaseConfiguration.create(); - int timeout = 100; - int pauseTime = 1000; - try (AbstractRpcClient client = createRpcClient(conf)) { - // Try out various fan out values starting from 1 -> numServers. - for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) { - // Update the client's underlying conf, should be ok for the test. - LOG.debug("Testing with request fan out: " + reqFanOut); - conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut); - Interface stub = newStub(client, addresses); - HBaseRpcController pcrc = new HBaseRpcControllerImpl(); - pcrc.setCallTimeout(timeout); - BlockingRpcCallback callback = new BlockingRpcCallback<>(); - stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback); - assertNull(callback.get()); - // Make sure the controller has the right exception propagated. - assertTrue(pcrc.getFailed() instanceof CallTimeoutException); - LOG.debug("Ended test with request fan out: " + reqFanOut); - } - } finally { - for (RpcServer rpcServer: rpcServers) { - rpcServer.stop(); - } - } - } - - @Test public void testAsyncRemoteError() throws IOException { AbstractRpcClient client = createRpcClient(CONF); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java index 6adfa4602ee..ab282e38dc8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; @@ -31,9 +29,11 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; + import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -67,17 +67,6 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface { User.getCurrent(), 0)); } - public static Interface newStub(RpcClient client, List addrs) - throws IOException { - Set serverNames = new HashSet<>(); - for (InetSocketAddress addr: addrs) { - serverNames.add(ServerName.valueOf( - addr.getHostName(), addr.getPort(), System.currentTimeMillis())); - } - return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel( - serverNames, User.getCurrent(), 0)); - } - @Override public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) throws ServiceException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java index 27c1235ff38..2881eb736e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java @@ -24,15 +24,12 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.net.Socket; import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -40,7 +37,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; @@ -52,8 +48,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; @Category(MediumTests.class) public class TestRpcClientLeaks { @@ -96,14 +90,6 @@ public class TestRpcClientLeaks { }; } - // To keep the registry paths happy. - @Override - public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout) - throws UnknownHostException { - Preconditions.checkState(sns != null && sns.size() == 1); - return super.createRpcChannel((ServerName)sns.toArray()[0], user, rpcTimeout); - } - public static void enableThrowExceptions() { throwException = true; }