HBASE-23305: Master based registry implementation (#954)

Implements a master based registry for clients.

 - Supports hedged RPCs (fan out configured via configs).
 - Parameterized existing client tests to run with multiple registry combinations.
 - Added unit-test coverage for the new registry implementation.

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: stack <stack@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Bharath Vissapragada 2020-01-14 08:24:07 -08:00
parent 7162c02c0d
commit 1aa6a4efb9
21 changed files with 1268 additions and 315 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@ -27,9 +28,6 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
final class ConnectionRegistryFactory {
static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
"hbase.client.connection.registry.impl";
private ConnectionRegistryFactory() {
}

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
*
* It supports hedged reads, which can be enabled by setting
* {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
* out the requests batch is controlled by
* {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
*
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
// Configured list of masters to probe the meta information from.
private final Set<ServerName> masterServers;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;
MasterRegistry(Configuration conf) {
boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
Configuration finalConf;
if (!hedgedReadsEnabled) {
// If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
// the configuration so that other places reusing this reference is not affected.
finalConf = new Configuration(conf);
finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
} else {
finalConf = conf;
}
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
masterServers = new HashSet<>();
parseMasterAddrs(finalConf);
rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
}
/**
* @return Stub needed to make RPC using a hedged channel to the master end points.
*/
private ClientMetaService.Interface getMasterStub() throws IOException {
return ClientMetaService.newStub(
rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
}
/**
* Parses the list of master addresses from the provided configuration. Supported format is
* comma separated host[:port] values. If no port number if specified, default master port is
* assumed.
* @param conf Configuration to parse from.
*/
private void parseMasterAddrs(Configuration conf) {
String configuredMasters = conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT);
for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
HostAndPort masterHostPort =
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
}
Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
}
@VisibleForTesting
public Set<ServerName> getParsedMasterServers() {
return Collections.unmodifiableSet(masterServers);
}
/**
* Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
* the rpc finishes and the response is propagated to the passed future.
* @param future Result future to which the rpc response is propagated.
* @param isValidResp Checks if the rpc response has a valid result.
* @param transformResult Transforms the result to a different form as expected by callers.
* @param hrc RpcController instance for this rpc.
* @param debug Debug message passed along to the caller in case of exceptions.
* @param <T> RPC result type.
* @param <R> Transformed type of the result.
* @return A call back that can be embedded in the non-blocking rpc call.
*/
private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc,
final String debug) {
return rpcResult -> {
if (rpcResult == null) {
future.completeExceptionally(
new MasterRegistryFetchException(masterServers, hrc.getFailed()));
}
if (!isValidResp.test(rpcResult)) {
// Rpc returned ok, but result was malformed.
future.completeExceptionally(new IOException(
String.format("Invalid result for request %s. Will be retried", debug)));
}
future.complete(transformResult.apply(rpcResult));
};
}
/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList().forEach(
location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
return new RegionLocations(regionLocations);
}
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
(rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
"getMetaRegionLocations()");
try {
getMasterStub().getMetaRegionLocations(
hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
@Override
public CompletableFuture<String> getClusterId() {
CompletableFuture<String> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc,
"getClusterId()");
try {
getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
private ServerName transformServerName(GetActiveMasterResponse resp) {
return ProtobufUtil.toServerName(resp.getServerName());
}
@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
"getActiveMaster()");
try {
getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
@Override
public void close() {
if (rpcClient != null) {
rpcClient.close();
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.PrettyPrinter;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Exception thrown when an master registry RPC fails in client. The exception includes the list of
* masters to which RPC was attempted and the last exception encountered. Prior exceptions are
* included in the logs.
*/
@InterfaceAudience.Private
public class MasterRegistryFetchException extends HBaseIOException {
public MasterRegistryFetchException(Set<ServerName> masters, Throwable failure) {
super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
failure);
}
}

View File

@ -20,39 +20,20 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
@ -63,7 +44,22 @@ import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/**
@ -203,7 +199,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
// have some pending calls on connection so we should not shutdown the connection outside.
// The connection itself will disconnect if there is no pending call for maxIdleTime.
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address);
if (LOG.isTraceEnabled()) {
LOG.trace("Cleanup idle connection to {}", conn.remoteId().address);
}
connections.removeValue(conn.remoteId(), conn);
conn.cleanupConnection();
}
@ -384,7 +382,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
}
}
private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
@ -421,9 +419,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
} catch (Exception e) {
call.setException(toIOE(e));
}
return call;
}
private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
if (addr.isUnresolved()) {
throw new UnknownHostException("can not resolve " + sn.getServerName());
@ -513,6 +512,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
}
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
// Check HedgedRpcChannel implementation for detailed comments.
throw new UnsupportedOperationException("Hedging not supported for this implementation.");
}
private static class AbstractRpcChannel {
protected final InetSocketAddress addr;

View File

@ -17,18 +17,15 @@
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.SocketAddress;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.net.NetUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* Does RPC against a cluster. Manages connections per regionserver in the cluster.

View File

@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.PrettyPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
/**
* A non-blocking implementation of RpcChannel that hedges requests to multiple service end points.
* First received response is returned to the caller. This abstracts out the logic needed to batch
* requests to multiple end points underneath and presents itself as a single logical RpcChannel to
* the client.
*
* Hedging Details:
* ---------------
* - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address
* end points to make the call to. We do multiple iterations until we get a proper response to the
* rpc call or all the service addresses are exhausted, which ever happens first. Size of each is
* configurable and is also known as 'fanOutSize'.
*
* - We randomize the addresses up front so that the batch order per client is non deterministic.
* This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'.
* Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the
* load on the client and server side when configuring the fan out.
*
* - In a happy case, once we receive a response from one end point, we cancel all the
* other inflight rpcs in the same batch and return the response to the caller. If we do not get a
* valid response from any address end point, we propagate the error back to the caller.
*
* - Rpc timeouts are applied to every hedged rpc.
*
* - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can
* be hedged (for example: cluster state changing rpcs).
*
* (TODO) Retries and Adaptive hedging policy:
* ------------------------------------------
*
* - No retries are handled at the channel level. Retries can be built in upper layers. However the
* question is, do we even need retries? Hedging in fact is a substitute for retries.
*
* - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging
* policy more adaptive. In most happy cases, the rpcs from the first few end points should return
* right away (especially short lived rpcs, that do not take up much time). In such cases, hedging
* is not needed. So, the idea is to make this request pattern pluggable so that the requests are
* hedged only when needed.
*/
class HedgedRpcChannel implements RpcChannel {
private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);
/**
* Currently hedging is only supported for non-blocking connection implementation types because
* the channel implementation inherently relies on the connection implementation being async.
* Refer to the comments in doCallMethod().
*/
private final NettyRpcClient rpcClient;
// List of service addresses to hedge the requests to.
private final List<InetSocketAddress> addrs;
private final User ticket;
private final int rpcTimeout;
// Controls the size of request fan out (number of rpcs per a single batch).
private final int fanOutSize;
/**
* A simple rpc call back implementation to notify the batch context if any rpc is successful.
*/
private static class BatchRpcCtxCallBack implements RpcCallback<Message> {
private final BatchRpcCtx batchRpcCtx;
private final HBaseRpcController rpcController;
BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) {
this.batchRpcCtx = batchRpcCtx;
this.rpcController = rpcController;
}
@Override
public void run(Message result) {
batchRpcCtx.setResultIfNotSet(result, rpcController);
}
}
/**
* A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
* synchronize on multiple RPCs to different end points fetching the result. All the methods are
* thread-safe.
*/
private static class BatchRpcCtx {
// Result set by the thread finishing first. Set only once.
private final AtomicReference<Message> result = new AtomicReference<>();
// Caller waits on this latch being set.
// We set this to 1, so that the first successful RPC result is returned to the client.
private CountDownLatch resultsReady = new CountDownLatch(1);
// Failed rpc book-keeping.
private AtomicInteger failedRpcCount = new AtomicInteger();
// All the call handles for this batch.
private final List<Call> callsInFlight = Collections.synchronizedList(new ArrayList<>());
// Target addresses.
private final List<InetSocketAddress> addresses;
// Called when the result is ready.
private final RpcCallback<Message> callBack;
// Last failed rpc's exception. Used to propagate the reason to the controller.
private IOException lastFailedRpcReason;
BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> callBack) {
this.addresses = addresses;
this.callBack = Preconditions.checkNotNull(callBack);
}
/**
* Sets the result only if it is not already set by another thread. Thread that successfully
* sets the result also count downs the latch.
* @param result Result to be set.
*/
public void setResultIfNotSet(Message result, HBaseRpcController rpcController) {
if (rpcController.failed()) {
incrementFailedRpcs(rpcController.getFailed());
return;
}
if (this.result.compareAndSet(null, result)) {
resultsReady.countDown();
// Cancel all pending in flight calls.
for (Call call: callsInFlight) {
// It is ok to do it for all calls as it is a no-op if the call is already done.
final String exceptionMsg = String.format("%s canceled because another hedged attempt " +
"for the same rpc already succeeded. This is not needed anymore.", call);
call.setException(new CallCancelledException(exceptionMsg));
}
}
}
/**
* Waits until the results are populated and calls the callback if the call is successful.
* @return true for successful rpc and false otherwise.
*/
public boolean waitForResults() {
try {
// We do not set a timeout on await() because we rely on the underlying RPCs to timeout if
// something on the remote is broken. Worst case we should wait for rpc time out to kick in.
resultsReady.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
}
Message message = result.get();
if (message != null) {
callBack.run(message);
return true;
}
return false;
}
public void addCallInFlight(Call c) {
callsInFlight.add(c);
}
public void incrementFailedRpcs(IOException reason) {
if (failedRpcCount.incrementAndGet() == addresses.size()) {
lastFailedRpcReason = reason;
// All the rpcs in this batch have failed. Invoke the waiting threads.
resultsReady.countDown();
}
}
public IOException getLastFailedRpcReason() {
return lastFailedRpcReason;
}
@Override
public String toString() {
return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses));
}
}
public HedgedRpcChannel(NettyRpcClient rpcClient, Set<InetSocketAddress> addrs,
User ticket, int rpcTimeout, int fanOutSize) {
this.rpcClient = rpcClient;
this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
Preconditions.checkArgument(this.addrs.size() >= 1);
// For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same
// order, creating hot spots on the service end points.
Collections.shuffle(this.addrs);
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
// fanOutSize controls the number of hedged RPCs per batch.
this.fanOutSize = fanOutSize;
}
private HBaseRpcController applyRpcTimeout(RpcController controller) {
HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
int rpcTimeoutToSet =
hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout;
HBaseRpcController response = new HBaseRpcControllerImpl();
response.setCallTimeout(rpcTimeoutToSet);
return response;
}
private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller,
Message request, Message responsePrototype, RpcCallback<Message> done) {
int i = 0;
BatchRpcCtx lastBatchCtx = null;
while (i < addrs.size()) {
// Each iteration picks fanOutSize addresses to run as batch.
int batchEnd = Math.min(addrs.size(), i + fanOutSize);
List<InetSocketAddress> addrSubList = addrs.subList(i, batchEnd);
BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
lastBatchCtx = batchRpcCtx;
LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
for (InetSocketAddress address : addrSubList) {
HBaseRpcController rpcController = applyRpcTimeout(controller);
// ** WARN ** This is a blocking call if the underlying connection for the rpc client is
// a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
// the write calls. Handling blocking connection means that this should be run in a separate
// thread and hence more code complexity. Is it ok to handle only non-blocking connections?
batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
responsePrototype, ticket, address,
new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
}
if (batchRpcCtx.waitForResults()) {
return;
}
// Entire batch has failed, lets try the next batch.
LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
i = batchEnd;
}
Preconditions.checkNotNull(lastBatchCtx);
// All the batches failed, mark it a failed rpc.
// Propagate the failure reason. We propagate the last batch's last failing rpc reason.
// Can we do something better?
controller.setFailed(lastBatchCtx.getLastFailedRpcReason());
done.run(null);
}
@Override
public void callMethod(Descriptors.MethodDescriptor method, RpcController controller,
Message request, Message responsePrototype, RpcCallback<Message> done) {
// There is no reason to use any other implementation of RpcController.
Preconditions.checkState(controller instanceof HBaseRpcController);
// To make the channel non-blocking, we run the actual doCalMethod() async. The call back is
// called once the hedging finishes.
CompletableFuture.runAsync(
() -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done));
}
}

View File

@ -17,22 +17,27 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.SocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.util.Pair;
/**
* Netty client for the requests and responses.
* @since 2.0.0
@ -74,6 +79,19 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
return new NettyRpcConnection(this, remoteId);
}
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
Set<InetSocketAddress> addresses = new HashSet<>();
for (ServerName sn: sns) {
addresses.add(createAddr(sn));
}
return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
hedgedRpcFanOut);
}
@Override
protected void closeInternal() {
if (shutdownGroupWhenClose) {

View File

@ -17,15 +17,14 @@
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
/**
* Interface for RpcClient implementations so ConnectionManager can handle it.
@ -83,6 +82,16 @@ public interface RpcClient extends Closeable {
RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout)
throws IOException;
/**
* Creates a channel that can hedge request to multiple underlying channels.
* @param sns Set of servers for underlying channels.
* @param user user for the connection.
* @param rpcTimeout rpc timeout to use.
* @return A hedging rpc channel for this rpc client instance.
*/
RpcChannel createHedgedRpcChannel(final Set<ServerName> sns, final User user, int rpcTimeout)
throws IOException;
/**
* Interrupt the connections to the given server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and,

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FutureUtils;
@ -70,7 +71,7 @@ public class TestConnectionRegistryLeak {
@BeforeClass
public static void setUp() {
CONF.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
CONF.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ConnectionRegistryForTest.class, ConnectionRegistry.class);
}

View File

@ -179,10 +179,17 @@ public final class HConstants {
public static final String MASTER_INFO_PORT = "hbase.master.info.port";
/** Configuration key for the list of master host:ports **/
public static final String MASTER_ADDRS_KEY = "hbase.master.addrs";
public static final String MASTER_ADDRS_KEY = "hbase.masters";
public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;
/** Configuration to enable hedged reads on master registry **/
public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
"hbase.client.master_registry.enable_hedged_reads";
/** Default value for enabling hedging reads on master registry **/
public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
/** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
@ -908,6 +915,12 @@ public final class HConstants {
*/
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
/** Configuration key that controls the fan out of requests in hedged channel implementation. **/
public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout";
/** Default value for the fan out of hedged requests. **/
public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2;
/**
* timeout for each read RPC
*/
@ -939,6 +952,11 @@ public final class HConstants {
*/
public static final long NO_SEQNUM = -1;
/**
* Registry implementation to be used on the client side.
*/
public static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
"hbase.client.registry.impl";
/*
* cluster replication constants.

View File

@ -19,9 +19,12 @@
package org.apache.hadoop.hbase.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.yetus.audience.InterfaceAudience;
@ -29,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PrettyPrinter {
public final class PrettyPrinter {
private static final Logger LOG = LoggerFactory.getLogger(PrettyPrinter.class);
@ -117,7 +120,7 @@ public class PrettyPrinter {
sb.append(" DAY").append(days == 1 ? "" : "S");
}
if (hours > 0 ) {
if (hours > 0) {
sb.append(days > 0 ? " " : "");
sb.append(hours);
sb.append(" HOUR").append(hours == 1 ? "" : "S");
@ -188,4 +191,18 @@ public class PrettyPrinter {
return ttl;
}
/**
* Pretty prints a collection of any type to a string. Relies on toString() implementation of the
* object type.
* @param collection collection to pretty print.
* @return Pretty printed string for the collection.
*/
public static String toString(Collection<?> collection) {
List<String> stringList = new ArrayList<>();
for (Object o: collection) {
stringList.add(Objects.toString(o));
}
return "[" + String.join(",", stringList) + "]";
}
}

View File

@ -29,7 +29,21 @@ public class TableNameTestRule extends TestWatcher {
@Override
protected void starting(Description description) {
tableName = TableName.valueOf(description.getMethodName());
tableName = TableName.valueOf(cleanUpTestName(description.getMethodName()));
}
/**
* Helper to handle parameterized method names. Unlike regular test methods, parameterized method
* names look like 'foo[x]'. This is problematic for tests that use this name for HBase tables.
* This helper strips out the parameter suffixes.
* @return current test method name with out parameterized suffixes.
*/
private static String cleanUpTestName(String methodName) {
int index = methodName.indexOf('[');
if (index == -1) {
return methodName;
}
return methodName.substring(0, index);
}
public TableName getTableName() {

View File

@ -26,12 +26,14 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Utility used running a cluster all in the one JVM.
@ -136,6 +138,11 @@ public class JVMClusterUtil {
} catch (Exception e) {
throw new IOException(e);
}
// Needed if a master based registry is configured for internal cluster connections. Here, we
// just add the current master host port since we do not know other master addresses up front
// in mini cluster tests.
c.set(HConstants.MASTER_ADDRS_KEY,
Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
return new JVMClusterUtil.MasterThread(server, index);
}

View File

@ -1082,6 +1082,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
this.hbaseCluster =
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass());
// Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY,
c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
// Don't leave here till we've done a successful scan of the hbase:meta
Table t = getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan());

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
@ -28,7 +29,7 @@ import org.apache.hadoop.hbase.ServerName;
public class DummyConnectionRegistry implements ConnectionRegistry {
public static final String REGISTRY_IMPL_CONF_KEY =
ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {

View File

@ -17,14 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized;
/**
* Test all client operations with a coprocessor that just implements the default flush/compact/scan
@ -32,13 +34,24 @@ import org.junit.experimental.categories.Category;
*/
@Category({ LargeTests.class, ClientTests.class })
public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFromClientSideWithCoprocessor.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
initialize(MultiRowMutationEndpoint.class, NoOpScanPolicyObserver.class);
// Override the parameters from the parent class. We just want to run it for the default
// param combination.
@Parameterized.Parameters
public static Collection parameters() {
return Arrays.asList(new Object[][] {
{ ZKConnectionRegistry.class, 1}
});
}
public TestFromClientSideWithCoprocessor(Class registry, int numHedgedReqs) throws Exception {
if (TEST_UTIL == null) {
// It is ok to initialize once because the test is parameterized for a single dimension.
initialize(registry, numHedgedReqs, NoOpScanPolicyObserver.class,
MultiRowMutationEndpoint.class);
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestMasterRegistry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterRegistry.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
builder.numMasters(3).numRegionServers(3);
TEST_UTIL.startMiniCluster(builder.build());
}
@AfterClass
public static void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Generates a string of dummy master addresses in host:port format. Every other hostname won't
* have a port number.
*/
private static String generateDummyMastersList(int size) {
List<String> masters = new ArrayList<>();
for (int i = 0; i < size; i++) {
masters.add(" localhost" + (i % 2 == 0 ? ":" + (1000 + i) : ""));
}
return String.join(",", masters);
}
/**
* Makes sure the master registry parses the master end points in the configuration correctly.
*/
@Test public void testMasterAddressParsing() {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
int numMasters = 10;
conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
try (MasterRegistry registry = new MasterRegistry(conf)) {
List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
// Half of them would be without a port, duplicates are removed.
assertEquals(numMasters/2 + 1, parsedMasters.size());
// Sort in the increasing order of port numbers.
Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort));
for (int i = 0; i < parsedMasters.size(); i++) {
ServerName sn = parsedMasters.get(i);
assertEquals("localhost", sn.getHostname());
if (i == parsedMasters.size() - 1) {
// Last entry should be the one with default port.
assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort());
} else {
assertEquals(1000 + (2 * i), sn.getPort());
}
}
}
}
@Test public void testRegistryRPCs() throws Exception {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
for (int numHedgedReqs = 1; numHedgedReqs <=3; numHedgedReqs++) {
if (numHedgedReqs == 1) {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
try (MasterRegistry registry = new MasterRegistry(conf)) {
assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaRegionLocationCache()
.getMetaRegionLocations().get();
Collections.sort(metaLocations);
Collections.sort(actualMetaLocations);
assertEquals(actualMetaLocations, metaLocations);
}
}
}
}

View File

@ -29,9 +29,10 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -44,35 +45,38 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* A client-side test, mostly testing scanners with various parameters.
* A client-side test, mostly testing scanners with various parameters. Parameterized on different
* registry implementations.
*/
@Category({MediumTests.class, ClientTests.class})
@RunWith(Parameterized.class)
public class TestScannersFromClientSide {
@ClassRule
@ -81,38 +85,80 @@ public class TestScannersFromClientSide {
private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static HBaseTestingUtility TEST_UTIL;
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
TEST_UTIL.startMiniCluster(3);
}
public TableNameTestRule name = new TableNameTestRule();
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
if (TEST_UTIL != null) {
TEST_UTIL.shutdownMiniCluster();
}
}
@Before
public void setUp() throws Exception {
// Nothing to do.
@Parameterized.Parameters
public static Collection parameters() {
return Arrays.asList(new Object[][] {
{ MasterRegistry.class, 1},
{ MasterRegistry.class, 2},
{ ZKConnectionRegistry.class, 1}
});
}
/**
* @throws java.lang.Exception
* JUnit does not provide an easy way to run a hook after each parameterized run. Without that
* there is no easy way to restart the test cluster after each parameterized run. Annotation
* BeforeParam does not work either because it runs before parameterization and hence does not
* have access to the test parameters (which is weird).
*
* This *hack* checks if the current instance of test cluster configuration has the passed
* parameterized configs. In such a case, we can just reuse the cluster for test and do not need
* to initialize from scratch. While this is a hack, it saves a ton of time for the full
* test and de-flakes it.
*/
@After
public void tearDown() throws Exception {
// Nothing to do.
private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
// initialize() is called for every unit test, however we only want to reset the cluster state
// at the end of every parameterized run.
if (TEST_UTIL == null) {
return false;
}
Configuration conf = TEST_UTIL.getConfiguration();
Class confClass = conf.getClass(
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
}
public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception {
if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
return;
}
if (TEST_UTIL != null) {
// We reached the end of a parameterized run, clean up the cluster.
TEST_UTIL.shutdownMiniCluster();
}
TEST_UTIL = new HBaseTestingUtility();
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
ConnectionRegistry.class);
if (numHedgedReqs == 1) {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
Preconditions.checkArgument(numHedgedReqs > 1);
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
// Multiple masters needed only when hedged reads for master registry are enabled.
builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
TEST_UTIL.startMiniCluster(builder.build());
}
/**
@ -120,7 +166,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testScanBatch() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = name.getTableName();
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
Table ht = TEST_UTIL.createTable(tableName, FAMILY);
@ -190,7 +236,7 @@ public class TestScannersFromClientSide {
@Test
public void testMaxResultSizeIsSetToDefault() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = name.getTableName();
Table ht = TEST_UTIL.createTable(tableName, FAMILY);
// The max result size we expect the scan to use by default.
@ -259,7 +305,7 @@ public class TestScannersFromClientSide {
@Test
public void testSmallScan() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = name.getTableName();
int numRows = 10;
byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
@ -292,7 +338,8 @@ public class TestScannersFromClientSide {
/**
* Run through a variety of test configurations with a small scan
*/
private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception {
private void testSmallScan(
Table table, boolean reversed, int rows, int columns) throws Exception {
Scan baseScan = new Scan();
baseScan.setReversed(reversed);
baseScan.setSmall(true);
@ -334,7 +381,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testGetMaxResults() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = name.getTableName();
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
@ -408,8 +455,8 @@ public class TestScannersFromClientSide {
kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
}
for (int i=0; i < 2; i++) {
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
}
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
}
for (int i=10; i < 20; i++) {
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
}
@ -452,7 +499,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testScanMaxResults() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = name.getTableName();
byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
@ -500,7 +547,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testGetRowOffset() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = name.getTableName();
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
@ -519,7 +566,9 @@ public class TestScannersFromClientSide {
KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
put.add(kv);
// skipping first two kvs
if (i < 2) continue;
if (i < 2) {
continue;
}
kvListExp.add(kv);
}
ht.put(put);
@ -590,7 +639,7 @@ public class TestScannersFromClientSide {
@Test
public void testScanRawDeleteFamilyVersion() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
TableName tableName = name.getTableName();
TEST_UTIL.createTable(tableName, FAMILY);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set(RPC_CODEC_CONF_KEY, "");
@ -618,7 +667,7 @@ public class TestScannersFromClientSide {
*/
@Test
public void testScanOnReopenedRegion() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final TableName tableName = name.getTableName();
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
Table ht = TEST_UTIL.createTable(tableName, FAMILY);
@ -693,8 +742,9 @@ public class TestScannersFromClientSide {
LOG.info(msg);
LOG.info("Expected count: " + expKvList.size());
LOG.info("Actual count: " + result.size());
if (expKvList.isEmpty())
if (expKvList.isEmpty()) {
return;
}
int i = 0;
for (Cell kv : result.rawCells()) {
@ -715,7 +765,7 @@ public class TestScannersFromClientSide {
@Test
public void testReadExpiredDataForRawScan() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
TableName tableName = name.getTableName();
long ts = System.currentTimeMillis() - 10000;
byte[] value = Bytes.toBytes("expired");
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
@ -735,7 +785,7 @@ public class TestScannersFromClientSide {
@Test
public void testScanWithColumnsAndFilterAndVersion() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
for (int i = 0; i < 4; i++) {
Put put = new Put(ROW);
@ -757,7 +807,7 @@ public class TestScannersFromClientSide {
@Test
public void testScanWithSameStartRowStopRow() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
@ -794,7 +844,7 @@ public class TestScannersFromClientSide {
@Test
public void testReverseScanWithFlush() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
TableName tableName = name.getTableName();
final int BATCH_SIZE = 10;
final int ROWS_TO_INSERT = 100;
final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);

View File

@ -30,22 +30,31 @@ import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -54,14 +63,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
* Some basic ipc tests.
@ -232,7 +233,6 @@ public abstract class AbstractTestIPC {
/**
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
* remoteAddress set to its Call Object
* @throws ServiceException
*/
@Test
public void testRpcServerForNotNullRemoteAddressInCallObject()
@ -363,6 +363,104 @@ public abstract class AbstractTestIPC {
}
}
/**
* Tests the various request fan out values using a simple RPC hedged across a mix of running and
* failing servers.
*/
@Test
public void testHedgedAsyncEcho() throws Exception {
// Hedging is not supported for blocking connection types.
Assume.assumeFalse(this instanceof TestBlockingIPC);
List<RpcServer> rpcServers = new ArrayList<>();
List<InetSocketAddress> addresses = new ArrayList<>();
// Create a mix of running and failing servers.
final int numRunningServers = 5;
final int numFailingServers = 3;
final int numServers = numRunningServers + numFailingServers;
for (int i = 0; i < numRunningServers; i++) {
RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
for (int i = 0; i < numFailingServers; i++) {
RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
Configuration conf = HBaseConfiguration.create();
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
// Try out various fan out values starting from 1 -> numServers.
for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
// Update the client's underlying conf, should be ok for the test.
LOG.debug("Testing with request fan out: " + reqFanOut);
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
Interface stub = newStub(client, addresses);
BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
stub.echo(new HBaseRpcControllerImpl(),
EchoRequestProto.newBuilder().setMessage("hello").build(), done);
TestProtos.EchoResponseProto responseProto = done.get();
assertNotNull(responseProto);
assertEquals("hello", responseProto.getMessage());
LOG.debug("Ended test with request fan out: " + reqFanOut);
}
} finally {
for (RpcServer rpcServer: rpcServers) {
rpcServer.stop();
}
}
}
@Test
public void testHedgedAsyncTimeouts() throws Exception {
// Hedging is not supported for blocking connection types.
Assume.assumeFalse(this instanceof TestBlockingIPC);
List<RpcServer> rpcServers = new ArrayList<>();
List<InetSocketAddress> addresses = new ArrayList<>();
final int numServers = 3;
for (int i = 0; i < numServers; i++) {
RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
Configuration conf = HBaseConfiguration.create();
int timeout = 100;
int pauseTime = 1000;
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
// Try out various fan out values starting from 1 -> numServers.
for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
// Update the client's underlying conf, should be ok for the test.
LOG.debug("Testing with request fan out: " + reqFanOut);
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
Interface stub = newStub(client, addresses);
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
pcrc.setCallTimeout(timeout);
BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback);
assertNull(callback.get());
// Make sure the controller has the right exception propagated.
assertTrue(pcrc.getFailed() instanceof CallTimeoutException);
LOG.debug("Ended test with request fan out: " + reqFanOut);
}
} finally {
for (RpcServer rpcServer: rpcServers) {
rpcServer.stop();
}
}
}
@Test
public void testAsyncRemoteError() throws IOException {
AbstractRpcClient<?> client = createRpcClient(CONF);

View File

@ -17,21 +17,23 @@
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@ -41,8 +43,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Threads;
@InterfaceAudience.Private
public class TestProtobufRpcServiceImpl implements BlockingInterface {
@ -67,6 +67,17 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
User.getCurrent(), 0));
}
public static Interface newStub(RpcClient client, List<InetSocketAddress> addrs)
throws IOException {
Set<ServerName> serverNames = new HashSet<>();
for (InetSocketAddress addr: addrs) {
serverNames.add(ServerName.valueOf(
addr.getHostName(), addr.getPort(), System.currentTimeMillis()));
}
return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel(
serverNames, User.getCurrent(), 0));
}
@Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException {