HBASE-23305: Master based registry implementation (#954)

Implements a master based registry for clients.

 - Supports hedged RPCs (fan out configured via configs).
 - Parameterized existing client tests to run with multiple registry combinations.
 - Added unit-test coverage for the new registry implementation.

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: stack <stack@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
(cherry picked from commit 62da419b23)
This commit is contained in:
Bharath Vissapragada 2020-01-14 08:24:07 -08:00
parent 1b52501b1b
commit be30d43a6c
22 changed files with 1279 additions and 315 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -27,9 +28,6 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
final class ConnectionRegistryFactory { final class ConnectionRegistryFactory {
static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
"hbase.client.connection.registry.impl";
private ConnectionRegistryFactory() { private ConnectionRegistryFactory() {
} }

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
*
* It supports hedged reads, which can be enabled by setting
* {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
* out the requests batch is controlled by
* {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
*
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
// Configured list of masters to probe the meta information from.
private final Set<ServerName> masterServers;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;
MasterRegistry(Configuration conf) {
boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
Configuration finalConf;
if (!hedgedReadsEnabled) {
// If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
// the configuration so that other places reusing this reference is not affected.
finalConf = new Configuration(conf);
finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
} else {
finalConf = conf;
}
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
masterServers = new HashSet<>();
parseMasterAddrs(finalConf);
rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
}
/**
* @return Stub needed to make RPC using a hedged channel to the master end points.
*/
private ClientMetaService.Interface getMasterStub() throws IOException {
return ClientMetaService.newStub(
rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
}
/**
* Parses the list of master addresses from the provided configuration. Supported format is
* comma separated host[:port] values. If no port number if specified, default master port is
* assumed.
* @param conf Configuration to parse from.
*/
private void parseMasterAddrs(Configuration conf) {
String configuredMasters = conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT);
for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
HostAndPort masterHostPort =
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
}
Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
}
@VisibleForTesting
public Set<ServerName> getParsedMasterServers() {
return Collections.unmodifiableSet(masterServers);
}
/**
* Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
* the rpc finishes and the response is propagated to the passed future.
* @param future Result future to which the rpc response is propagated.
* @param isValidResp Checks if the rpc response has a valid result.
* @param transformResult Transforms the result to a different form as expected by callers.
* @param hrc RpcController instance for this rpc.
* @param debug Debug message passed along to the caller in case of exceptions.
* @param <T> RPC result type.
* @param <R> Transformed type of the result.
* @return A call back that can be embedded in the non-blocking rpc call.
*/
private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc,
final String debug) {
return rpcResult -> {
if (rpcResult == null) {
future.completeExceptionally(
new MasterRegistryFetchException(masterServers, hrc.getFailed()));
}
if (!isValidResp.test(rpcResult)) {
// Rpc returned ok, but result was malformed.
future.completeExceptionally(new IOException(
String.format("Invalid result for request %s. Will be retried", debug)));
}
future.complete(transformResult.apply(rpcResult));
};
}
/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList().forEach(
location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
return new RegionLocations(regionLocations);
}
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
(rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
"getMetaRegionLocations()");
try {
getMasterStub().getMetaRegionLocations(
hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
@Override
public CompletableFuture<String> getClusterId() {
CompletableFuture<String> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc,
"getClusterId()");
try {
getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
private ServerName transformServerName(GetActiveMasterResponse resp) {
return ProtobufUtil.toServerName(resp.getServerName());
}
@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
"getActiveMaster()");
try {
getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}
@Override
public void close() {
if (rpcClient != null) {
rpcClient.close();
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.PrettyPrinter;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Exception thrown when an master registry RPC fails in client. The exception includes the list of
* masters to which RPC was attempted and the last exception encountered. Prior exceptions are
* included in the logs.
*/
@InterfaceAudience.Private
public class MasterRegistryFetchException extends HBaseIOException {
public MasterRegistryFetchException(Set<ServerName> masters, Throwable failure) {
super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
failure);
}
}

View File

@ -20,39 +20,20 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.codec.KeyValueCodec;
@ -63,7 +44,22 @@ import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
/** /**
@ -203,7 +199,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
// have some pending calls on connection so we should not shutdown the connection outside. // have some pending calls on connection so we should not shutdown the connection outside.
// The connection itself will disconnect if there is no pending call for maxIdleTime. // The connection itself will disconnect if there is no pending call for maxIdleTime.
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address); if (LOG.isTraceEnabled()) {
LOG.trace("Cleanup idle connection to {}", conn.remoteId().address);
}
connections.removeValue(conn.remoteId(), conn); connections.removeValue(conn.remoteId(), conn);
conn.cleanupConnection(); conn.cleanupConnection();
} }
@ -384,7 +382,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
} }
} }
private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final InetSocketAddress addr, final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
final RpcCallback<Message> callback) { final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
@ -421,9 +419,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
} catch (Exception e) { } catch (Exception e) {
call.setException(toIOE(e)); call.setException(toIOE(e));
} }
return call;
} }
private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
if (addr.isUnresolved()) { if (addr.isUnresolved()) {
throw new UnknownHostException("can not resolve " + sn.getServerName()); throw new UnknownHostException("can not resolve " + sn.getServerName());
@ -513,6 +512,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
} }
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
// Check HedgedRpcChannel implementation for detailed comments.
throw new UnsupportedOperationException("Hedging not supported for this implementation.");
}
private static class AbstractRpcChannel { private static class AbstractRpcChannel {
protected final InetSocketAddress addr; protected final InetSocketAddress addr;

View File

@ -17,18 +17,15 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
* Does RPC against a cluster. Manages connections per regionserver in the cluster. * Does RPC against a cluster. Manages connections per regionserver in the cluster.

View File

@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.PrettyPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
/**
* A non-blocking implementation of RpcChannel that hedges requests to multiple service end points.
* First received response is returned to the caller. This abstracts out the logic needed to batch
* requests to multiple end points underneath and presents itself as a single logical RpcChannel to
* the client.
*
* Hedging Details:
* ---------------
* - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address
* end points to make the call to. We do multiple iterations until we get a proper response to the
* rpc call or all the service addresses are exhausted, which ever happens first. Size of each is
* configurable and is also known as 'fanOutSize'.
*
* - We randomize the addresses up front so that the batch order per client is non deterministic.
* This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'.
* Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the
* load on the client and server side when configuring the fan out.
*
* - In a happy case, once we receive a response from one end point, we cancel all the
* other inflight rpcs in the same batch and return the response to the caller. If we do not get a
* valid response from any address end point, we propagate the error back to the caller.
*
* - Rpc timeouts are applied to every hedged rpc.
*
* - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can
* be hedged (for example: cluster state changing rpcs).
*
* (TODO) Retries and Adaptive hedging policy:
* ------------------------------------------
*
* - No retries are handled at the channel level. Retries can be built in upper layers. However the
* question is, do we even need retries? Hedging in fact is a substitute for retries.
*
* - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging
* policy more adaptive. In most happy cases, the rpcs from the first few end points should return
* right away (especially short lived rpcs, that do not take up much time). In such cases, hedging
* is not needed. So, the idea is to make this request pattern pluggable so that the requests are
* hedged only when needed.
*/
class HedgedRpcChannel implements RpcChannel {
private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);
/**
* Currently hedging is only supported for non-blocking connection implementation types because
* the channel implementation inherently relies on the connection implementation being async.
* Refer to the comments in doCallMethod().
*/
private final NettyRpcClient rpcClient;
// List of service addresses to hedge the requests to.
private final List<InetSocketAddress> addrs;
private final User ticket;
private final int rpcTimeout;
// Controls the size of request fan out (number of rpcs per a single batch).
private final int fanOutSize;
/**
* A simple rpc call back implementation to notify the batch context if any rpc is successful.
*/
private static class BatchRpcCtxCallBack implements RpcCallback<Message> {
private final BatchRpcCtx batchRpcCtx;
private final HBaseRpcController rpcController;
BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) {
this.batchRpcCtx = batchRpcCtx;
this.rpcController = rpcController;
}
@Override
public void run(Message result) {
batchRpcCtx.setResultIfNotSet(result, rpcController);
}
}
/**
* A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
* synchronize on multiple RPCs to different end points fetching the result. All the methods are
* thread-safe.
*/
private static class BatchRpcCtx {
// Result set by the thread finishing first. Set only once.
private final AtomicReference<Message> result = new AtomicReference<>();
// Caller waits on this latch being set.
// We set this to 1, so that the first successful RPC result is returned to the client.
private CountDownLatch resultsReady = new CountDownLatch(1);
// Failed rpc book-keeping.
private AtomicInteger failedRpcCount = new AtomicInteger();
// All the call handles for this batch.
private final List<Call> callsInFlight = Collections.synchronizedList(new ArrayList<>());
// Target addresses.
private final List<InetSocketAddress> addresses;
// Called when the result is ready.
private final RpcCallback<Message> callBack;
// Last failed rpc's exception. Used to propagate the reason to the controller.
private IOException lastFailedRpcReason;
BatchRpcCtx(List<InetSocketAddress> addresses, RpcCallback<Message> callBack) {
this.addresses = addresses;
this.callBack = Preconditions.checkNotNull(callBack);
}
/**
* Sets the result only if it is not already set by another thread. Thread that successfully
* sets the result also count downs the latch.
* @param result Result to be set.
*/
public void setResultIfNotSet(Message result, HBaseRpcController rpcController) {
if (rpcController.failed()) {
incrementFailedRpcs(rpcController.getFailed());
return;
}
if (this.result.compareAndSet(null, result)) {
resultsReady.countDown();
// Cancel all pending in flight calls.
for (Call call: callsInFlight) {
// It is ok to do it for all calls as it is a no-op if the call is already done.
final String exceptionMsg = String.format("%s canceled because another hedged attempt " +
"for the same rpc already succeeded. This is not needed anymore.", call);
call.setException(new CallCancelledException(exceptionMsg));
}
}
}
/**
* Waits until the results are populated and calls the callback if the call is successful.
* @return true for successful rpc and false otherwise.
*/
public boolean waitForResults() {
try {
// We do not set a timeout on await() because we rely on the underlying RPCs to timeout if
// something on the remote is broken. Worst case we should wait for rpc time out to kick in.
resultsReady.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
}
Message message = result.get();
if (message != null) {
callBack.run(message);
return true;
}
return false;
}
public void addCallInFlight(Call c) {
callsInFlight.add(c);
}
public void incrementFailedRpcs(IOException reason) {
if (failedRpcCount.incrementAndGet() == addresses.size()) {
lastFailedRpcReason = reason;
// All the rpcs in this batch have failed. Invoke the waiting threads.
resultsReady.countDown();
}
}
public IOException getLastFailedRpcReason() {
return lastFailedRpcReason;
}
@Override
public String toString() {
return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses));
}
}
public HedgedRpcChannel(NettyRpcClient rpcClient, Set<InetSocketAddress> addrs,
User ticket, int rpcTimeout, int fanOutSize) {
this.rpcClient = rpcClient;
this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
Preconditions.checkArgument(this.addrs.size() >= 1);
// For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same
// order, creating hot spots on the service end points.
Collections.shuffle(this.addrs);
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
// fanOutSize controls the number of hedged RPCs per batch.
this.fanOutSize = fanOutSize;
}
private HBaseRpcController applyRpcTimeout(RpcController controller) {
HBaseRpcController hBaseRpcController = (HBaseRpcController) controller;
int rpcTimeoutToSet =
hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout;
HBaseRpcController response = new HBaseRpcControllerImpl();
response.setCallTimeout(rpcTimeoutToSet);
return response;
}
private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller,
Message request, Message responsePrototype, RpcCallback<Message> done) {
int i = 0;
BatchRpcCtx lastBatchCtx = null;
while (i < addrs.size()) {
// Each iteration picks fanOutSize addresses to run as batch.
int batchEnd = Math.min(addrs.size(), i + fanOutSize);
List<InetSocketAddress> addrSubList = addrs.subList(i, batchEnd);
BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done);
lastBatchCtx = batchRpcCtx;
LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx);
for (InetSocketAddress address : addrSubList) {
HBaseRpcController rpcController = applyRpcTimeout(controller);
// ** WARN ** This is a blocking call if the underlying connection for the rpc client is
// a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
// the write calls. Handling blocking connection means that this should be run in a separate
// thread and hence more code complexity. Is it ok to handle only non-blocking connections?
batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
responsePrototype, ticket, address,
new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
}
if (batchRpcCtx.waitForResults()) {
return;
}
// Entire batch has failed, lets try the next batch.
LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
i = batchEnd;
}
Preconditions.checkNotNull(lastBatchCtx);
// All the batches failed, mark it a failed rpc.
// Propagate the failure reason. We propagate the last batch's last failing rpc reason.
// Can we do something better?
controller.setFailed(lastBatchCtx.getLastFailedRpcReason());
done.run(null);
}
@Override
public void callMethod(Descriptors.MethodDescriptor method, RpcController controller,
Message request, Message responsePrototype, RpcCallback<Message> done) {
// There is no reason to use any other implementation of RpcController.
Preconditions.checkState(controller instanceof HBaseRpcController);
// To make the channel non-blocking, we run the actual doCalMethod() async. The call back is
// called once the hedging finishes.
CompletableFuture.runAsync(
() -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done));
}
}

View File

@ -17,22 +17,27 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.SocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* Netty client for the requests and responses. * Netty client for the requests and responses.
* @since 2.0.0 * @since 2.0.0
@ -74,6 +79,19 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
return new NettyRpcConnection(this, remoteId); return new NettyRpcConnection(this, remoteId);
} }
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
Set<InetSocketAddress> addresses = new HashSet<>();
for (ServerName sn: sns) {
addresses.add(createAddr(sn));
}
return new HedgedRpcChannel(this, addresses, user, rpcTimeout,
hedgedRpcFanOut);
}
@Override @Override
protected void closeInternal() { protected void closeInternal() {
if (shutdownGroupWhenClose) { if (shutdownGroupWhenClose) {

View File

@ -17,15 +17,14 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
/** /**
* Interface for RpcClient implementations so ConnectionManager can handle it. * Interface for RpcClient implementations so ConnectionManager can handle it.
@ -83,6 +82,16 @@ public interface RpcClient extends Closeable {
RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout)
throws IOException; throws IOException;
/**
* Creates a channel that can hedge request to multiple underlying channels.
* @param sns Set of servers for underlying channels.
* @param user user for the connection.
* @param rpcTimeout rpc timeout to use.
* @return A hedging rpc channel for this rpc client instance.
*/
RpcChannel createHedgedRpcChannel(final Set<ServerName> sns, final User user, int rpcTimeout)
throws IOException;
/** /**
* Interrupt the connections to the given server. This should be called if the server * Interrupt the connections to the given server. This should be called if the server
* is known as actually dead. This will not prevent current operation to be retried, and, * is known as actually dead. This will not prevent current operation to be retried, and,

View File

@ -481,7 +481,7 @@ public class TestAsyncProcess {
} }
private static Configuration setupConf(Configuration conf) { private static Configuration setupConf(Configuration conf) {
conf.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
TestRegistry.class, ConnectionRegistry.class); TestRegistry.class, ConnectionRegistry.class);
return conf; return conf;
} }

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -58,7 +59,7 @@ public class TestBufferedMutator {
public void testAlternateBufferedMutatorImpl() throws IOException { public void testAlternateBufferedMutatorImpl() throws IOException {
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(name.getMethodName())); BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(name.getMethodName()));
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
conf.set(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
DoNothingConnectionRegistry.class.getName()); DoNothingConnectionRegistry.class.getName());
try (Connection connection = ConnectionFactory.createConnection(conf)) { try (Connection connection = ConnectionFactory.createConnection(conf)) {
BufferedMutator bm = connection.getBufferedMutator(params); BufferedMutator bm = connection.getBufferedMutator(params);

View File

@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
@ -70,7 +71,7 @@ public class TestConnectionRegistryLeak {
@BeforeClass @BeforeClass
public static void setUp() { public static void setUp() {
CONF.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, CONF.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ConnectionRegistryForTest.class, ConnectionRegistry.class); ConnectionRegistryForTest.class, ConnectionRegistry.class);
} }

View File

@ -184,10 +184,17 @@ public final class HConstants {
public static final String MASTER_INFO_PORT = "hbase.master.info.port"; public static final String MASTER_INFO_PORT = "hbase.master.info.port";
/** Configuration key for the list of master host:ports **/ /** Configuration key for the list of master host:ports **/
public static final String MASTER_ADDRS_KEY = "hbase.master.addrs"; public static final String MASTER_ADDRS_KEY = "hbase.masters";
public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT; public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;
/** Configuration to enable hedged reads on master registry **/
public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
"hbase.client.master_registry.enable_hedged_reads";
/** Default value for enabling hedging reads on master registry **/
public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;
/** Parameter name for the master type being backup (waits for primary to go inactive). */ /** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup"; public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";
@ -930,6 +937,12 @@ public final class HConstants {
*/ */
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
/** Configuration key that controls the fan out of requests in hedged channel implementation. **/
public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout";
/** Default value for the fan out of hedged requests. **/
public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2;
/** /**
* timeout for each read RPC * timeout for each read RPC
*/ */
@ -961,6 +974,11 @@ public final class HConstants {
*/ */
public static final long NO_SEQNUM = -1; public static final long NO_SEQNUM = -1;
/**
* Registry implementation to be used on the client side.
*/
public static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
"hbase.client.registry.impl";
/* /*
* cluster replication constants. * cluster replication constants.

View File

@ -19,9 +19,12 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -29,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@InterfaceAudience.Private @InterfaceAudience.Private
public class PrettyPrinter { public final class PrettyPrinter {
private static final Logger LOG = LoggerFactory.getLogger(PrettyPrinter.class); private static final Logger LOG = LoggerFactory.getLogger(PrettyPrinter.class);
@ -117,7 +120,7 @@ public class PrettyPrinter {
sb.append(" DAY").append(days == 1 ? "" : "S"); sb.append(" DAY").append(days == 1 ? "" : "S");
} }
if (hours > 0 ) { if (hours > 0) {
sb.append(days > 0 ? " " : ""); sb.append(days > 0 ? " " : "");
sb.append(hours); sb.append(hours);
sb.append(" HOUR").append(hours == 1 ? "" : "S"); sb.append(" HOUR").append(hours == 1 ? "" : "S");
@ -188,4 +191,18 @@ public class PrettyPrinter {
return ttl; return ttl;
} }
/**
* Pretty prints a collection of any type to a string. Relies on toString() implementation of the
* object type.
* @param collection collection to pretty print.
* @return Pretty printed string for the collection.
*/
public static String toString(Collection<?> collection) {
List<String> stringList = new ArrayList<>();
for (Object o: collection) {
stringList.add(Objects.toString(o));
}
return "[" + String.join(",", stringList) + "]";
}
} }

View File

@ -29,7 +29,21 @@ public class TableNameTestRule extends TestWatcher {
@Override @Override
protected void starting(Description description) { protected void starting(Description description) {
tableName = TableName.valueOf(description.getMethodName()); tableName = TableName.valueOf(cleanUpTestName(description.getMethodName()));
}
/**
* Helper to handle parameterized method names. Unlike regular test methods, parameterized method
* names look like 'foo[x]'. This is problematic for tests that use this name for HBase tables.
* This helper strips out the parameter suffixes.
* @return current test method name with out parameterized suffixes.
*/
private static String cleanUpTestName(String methodName) {
int index = methodName.indexOf('[');
if (index == -1) {
return methodName;
}
return methodName.substring(0, index);
} }
public TableName getTableName() { public TableName getTableName() {

View File

@ -26,12 +26,14 @@ import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/** /**
* Utility used running a cluster all in the one JVM. * Utility used running a cluster all in the one JVM.
@ -136,6 +138,11 @@ public class JVMClusterUtil {
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
// Needed if a master based registry is configured for internal cluster connections. Here, we
// just add the current master host port since we do not know other master addresses up front
// in mini cluster tests.
c.set(HConstants.MASTER_ADDRS_KEY,
Preconditions.checkNotNull(server.getServerName().getAddress()).toString());
return new JVMClusterUtil.MasterThread(server, index); return new JVMClusterUtil.MasterThread(server, index);
} }

View File

@ -1144,6 +1144,9 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
this.hbaseCluster = this.hbaseCluster =
new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(), new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(),
option.getRsPorts(), option.getMasterClass(), option.getRsClass()); option.getRsPorts(), option.getMasterClass(), option.getRsClass());
// Populate the master address configuration from mini cluster configuration.
conf.set(HConstants.MASTER_ADDRS_KEY,
c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT));
// Don't leave here till we've done a successful scan of the hbase:meta // Don't leave here till we've done a successful scan of the hbase:meta
Table t = getConnection().getTable(TableName.META_TABLE_NAME); Table t = getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan()); ResultScanner s = t.getScanner(new Scan());

View File

@ -17,14 +17,16 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver; import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized;
/** /**
* Test all client operations with a coprocessor that just implements the default flush/compact/scan * Test all client operations with a coprocessor that just implements the default flush/compact/scan
@ -32,13 +34,24 @@ import org.junit.experimental.categories.Category;
*/ */
@Category({ LargeTests.class, ClientTests.class }) @Category({ LargeTests.class, ClientTests.class })
public class TestFromClientSideWithCoprocessor extends TestFromClientSide { public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFromClientSideWithCoprocessor.class); HBaseClassTestRule.forClass(TestFromClientSideWithCoprocessor.class);
@BeforeClass // Override the parameters from the parent class. We just want to run it for the default
public static void setUpBeforeClass() throws Exception { // param combination.
initialize(MultiRowMutationEndpoint.class, NoOpScanPolicyObserver.class); @Parameterized.Parameters
public static Collection parameters() {
return Arrays.asList(new Object[][] {
{ ZKConnectionRegistry.class, 1}
});
}
public TestFromClientSideWithCoprocessor(Class registry, int numHedgedReqs) throws Exception {
if (TEST_UTIL == null) {
// It is ok to initialize once because the test is parameterized for a single dimension.
initialize(registry, numHedgedReqs, NoOpScanPolicyObserver.class,
MultiRowMutationEndpoint.class);
}
} }
} }

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, ClientTests.class })
public class TestMasterRegistry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterRegistry.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
builder.numMasters(3).numRegionServers(3);
TEST_UTIL.startMiniCluster(builder.build());
}
@AfterClass
public static void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Generates a string of dummy master addresses in host:port format. Every other hostname won't
* have a port number.
*/
private static String generateDummyMastersList(int size) {
List<String> masters = new ArrayList<>();
for (int i = 0; i < size; i++) {
masters.add(" localhost" + (i % 2 == 0 ? ":" + (1000 + i) : ""));
}
return String.join(",", masters);
}
/**
* Makes sure the master registry parses the master end points in the configuration correctly.
*/
@Test public void testMasterAddressParsing() {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
int numMasters = 10;
conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
try (MasterRegistry registry = new MasterRegistry(conf)) {
List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
// Half of them would be without a port, duplicates are removed.
assertEquals(numMasters/2 + 1, parsedMasters.size());
// Sort in the increasing order of port numbers.
Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort));
for (int i = 0; i < parsedMasters.size(); i++) {
ServerName sn = parsedMasters.get(i);
assertEquals("localhost", sn.getHostname());
if (i == parsedMasters.size() - 1) {
// Last entry should be the one with default port.
assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort());
} else {
assertEquals(1000 + (2 * i), sn.getPort());
}
}
}
}
@Test public void testRegistryRPCs() throws Exception {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
for (int numHedgedReqs = 1; numHedgedReqs <=3; numHedgedReqs++) {
if (numHedgedReqs == 1) {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
try (MasterRegistry registry = new MasterRegistry(conf)) {
assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
List<HRegionLocation> actualMetaLocations = activeMaster.getMetaRegionLocationCache()
.getMetaRegionLocations().get();
Collections.sort(metaLocations);
Collections.sort(actualMetaLocations);
assertEquals(actualMetaLocations, metaLocations);
}
}
}
}

View File

@ -27,9 +27,10 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -46,20 +47,21 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.ColumnRangeFilter; import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -67,14 +69,18 @@ import org.junit.ClassRule;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/** /**
* A client-side test, mostly testing scanners with various parameters. * A client-side test, mostly testing scanners with various parameters. Parameterized on different
* registry implementations.
*/ */
@Category({MediumTests.class, ClientTests.class}) @Category({MediumTests.class, ClientTests.class})
@RunWith(Parameterized.class)
public class TestScannersFromClientSide { public class TestScannersFromClientSide {
@ClassRule @ClassRule
@ -83,15 +89,12 @@ public class TestScannersFromClientSide {
private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static HBaseTestingUtility TEST_UTIL;
private static byte [] ROW = Bytes.toBytes("testRow"); private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily"); private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue"); private static byte [] VALUE = Bytes.toBytes("testValue");
@Rule
public TestName name = new TestName();
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
*/ */
@ -102,12 +105,16 @@ public class TestScannersFromClientSide {
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);
} }
@Rule public TableNameTestRule name = new TableNameTestRule();
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
*/ */
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster(); if (TEST_UTIL != null) {
TEST_UTIL.shutdownMiniCluster();
}
} }
/** /**
@ -118,12 +125,64 @@ public class TestScannersFromClientSide {
// Nothing to do. // Nothing to do.
} }
@Parameterized.Parameters
public static Collection parameters() {
return Arrays.asList(new Object[][]{
{MasterRegistry.class, 1},
{MasterRegistry.class, 2},
{ZKConnectionRegistry.class, 1}
});
}
/** /**
* @throws java.lang.Exception * JUnit does not provide an easy way to run a hook after each parameterized run. Without that
* there is no easy way to restart the test cluster after each parameterized run. Annotation
* BeforeParam does not work either because it runs before parameterization and hence does not
* have access to the test parameters (which is weird).
*
* This *hack* checks if the current instance of test cluster configuration has the passed
* parameterized configs. In such a case, we can just reuse the cluster for test and do not need
* to initialize from scratch. While this is a hack, it saves a ton of time for the full
* test and de-flakes it.
*/ */
@After private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) {
public void tearDown() throws Exception { // initialize() is called for every unit test, however we only want to reset the cluster state
// Nothing to do. // at the end of every parameterized run.
if (TEST_UTIL == null) {
return false;
}
Configuration conf = TEST_UTIL.getConfiguration();
Class confClass = conf.getClass(
HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
}
public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception {
if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) {
return;
}
if (TEST_UTIL != null) {
// We reached the end of a parameterized run, clean up the cluster.
TEST_UTIL.shutdownMiniCluster();
}
TEST_UTIL = new HBaseTestingUtility();
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024);
conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl,
ConnectionRegistry.class);
if (numHedgedReqs == 1) {
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false);
} else {
Preconditions.checkArgument(numHedgedReqs > 1);
conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true);
}
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs);
StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder();
// Multiple masters needed only when hedged reads for master registry are enabled.
builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3);
TEST_UTIL.startMiniCluster(builder.build());
} }
/** /**
@ -133,7 +192,7 @@ public class TestScannersFromClientSide {
*/ */
@Test @Test
public void testScanBatch() throws Exception { public void testScanBatch() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = name.getTableName();
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8);
Table ht = TEST_UTIL.createTable(tableName, FAMILY); Table ht = TEST_UTIL.createTable(tableName, FAMILY);
@ -203,7 +262,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testMaxResultSizeIsSetToDefault() throws Exception { public void testMaxResultSizeIsSetToDefault() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = name.getTableName();
Table ht = TEST_UTIL.createTable(tableName, FAMILY); Table ht = TEST_UTIL.createTable(tableName, FAMILY);
// The max result size we expect the scan to use by default. // The max result size we expect the scan to use by default.
@ -274,7 +333,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testSmallScan() throws Exception { public void testSmallScan() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = name.getTableName();
int numRows = 10; int numRows = 10;
byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows);
@ -312,7 +371,8 @@ public class TestScannersFromClientSide {
* @param columns * @param columns
* @throws Exception * @throws Exception
*/ */
private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { private void testSmallScan(
Table table, boolean reversed, int rows, int columns) throws Exception {
Scan baseScan = new Scan(); Scan baseScan = new Scan();
baseScan.setReversed(reversed); baseScan.setReversed(reversed);
baseScan.setSmall(true); baseScan.setSmall(true);
@ -356,7 +416,7 @@ public class TestScannersFromClientSide {
*/ */
@Test @Test
public void testGetMaxResults() throws Exception { public void testGetMaxResults() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = name.getTableName();
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
@ -430,8 +490,8 @@ public class TestScannersFromClientSide {
kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE));
} }
for (int i=0; i < 2; i++) { for (int i=0; i < 2; i++) {
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
} }
for (int i=10; i < 20; i++) { for (int i=10; i < 20; i++) {
kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE));
} }
@ -476,7 +536,7 @@ public class TestScannersFromClientSide {
*/ */
@Test @Test
public void testScanMaxResults() throws Exception { public void testScanMaxResults() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = name.getTableName();
byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); byte [][] ROWS = HTestConst.makeNAscii(ROW, 2);
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10);
@ -526,7 +586,7 @@ public class TestScannersFromClientSide {
*/ */
@Test @Test
public void testGetRowOffset() throws Exception { public void testGetRowOffset() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = name.getTableName();
byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3);
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20);
@ -545,7 +605,9 @@ public class TestScannersFromClientSide {
KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE);
put.add(kv); put.add(kv);
// skipping first two kvs // skipping first two kvs
if (i < 2) continue; if (i < 2) {
continue;
}
kvListExp.add(kv); kvListExp.add(kv);
} }
ht.put(put); ht.put(put);
@ -616,7 +678,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testScanRawDeleteFamilyVersion() throws Exception { public void testScanRawDeleteFamilyVersion() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = name.getTableName();
TEST_UTIL.createTable(tableName, FAMILY); TEST_UTIL.createTable(tableName, FAMILY);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.set(RPC_CODEC_CONF_KEY, ""); conf.set(RPC_CODEC_CONF_KEY, "");
@ -646,7 +708,7 @@ public class TestScannersFromClientSide {
*/ */
@Test @Test
public void testScanOnReopenedRegion() throws Exception { public void testScanOnReopenedRegion() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName()); final TableName tableName = name.getTableName();
byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
Table ht = TEST_UTIL.createTable(tableName, FAMILY); Table ht = TEST_UTIL.createTable(tableName, FAMILY);
@ -717,7 +779,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testAsyncScannerWithSmallData() throws Exception { public void testAsyncScannerWithSmallData() throws Exception {
testAsyncScanner(TableName.valueOf(name.getMethodName()), testAsyncScanner(name.getTableName(),
2, 2,
3, 3,
10, 10,
@ -727,7 +789,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testAsyncScannerWithManyRows() throws Exception { public void testAsyncScannerWithManyRows() throws Exception {
testAsyncScanner(TableName.valueOf(name.getMethodName()), testAsyncScanner(name.getTableName(),
30000, 30000,
1, 1,
1, 1,
@ -737,7 +799,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testAsyncScannerWithoutCaching() throws Exception { public void testAsyncScannerWithoutCaching() throws Exception {
testAsyncScanner(TableName.valueOf(name.getMethodName()), testAsyncScanner(name.getTableName(),
5, 5,
1, 1,
1, 1,
@ -840,8 +902,9 @@ public class TestScannersFromClientSide {
LOG.info(msg); LOG.info(msg);
LOG.info("Expected count: " + expKvList.size()); LOG.info("Expected count: " + expKvList.size());
LOG.info("Actual count: " + result.size()); LOG.info("Actual count: " + result.size());
if (expKvList.isEmpty()) if (expKvList.isEmpty()) {
return; return;
}
int i = 0; int i = 0;
for (Cell kv : result.rawCells()) { for (Cell kv : result.rawCells()) {
@ -862,7 +925,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testReadExpiredDataForRawScan() throws IOException { public void testReadExpiredDataForRawScan() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = name.getTableName();
long ts = System.currentTimeMillis() - 10000; long ts = System.currentTimeMillis() - 10000;
byte[] value = Bytes.toBytes("expired"); byte[] value = Bytes.toBytes("expired");
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
@ -882,7 +945,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testScanWithColumnsAndFilterAndVersion() throws IOException { public void testScanWithColumnsAndFilterAndVersion() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) {
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
Put put = new Put(ROW); Put put = new Put(ROW);
@ -904,7 +967,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testScanWithSameStartRowStopRow() throws IOException { public void testScanWithSameStartRowStopRow() throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = name.getTableName();
try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
@ -941,7 +1004,7 @@ public class TestScannersFromClientSide {
@Test @Test
public void testReverseScanWithFlush() throws Exception { public void testReverseScanWithFlush() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName()); TableName tableName = name.getTableName();
final int BATCH_SIZE = 10; final int BATCH_SIZE = 10;
final int ROWS_TO_INSERT = 100; final int ROWS_TO_INSERT = 100;
final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); final byte[] LARGE_VALUE = generateHugeValue(128 * 1024);

View File

@ -30,22 +30,31 @@ import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times; import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -54,14 +63,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/** /**
* Some basic ipc tests. * Some basic ipc tests.
@ -232,7 +233,6 @@ public abstract class AbstractTestIPC {
/** /**
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
* remoteAddress set to its Call Object * remoteAddress set to its Call Object
* @throws ServiceException
*/ */
@Test @Test
public void testRpcServerForNotNullRemoteAddressInCallObject() public void testRpcServerForNotNullRemoteAddressInCallObject()
@ -363,6 +363,104 @@ public abstract class AbstractTestIPC {
} }
} }
/**
* Tests the various request fan out values using a simple RPC hedged across a mix of running and
* failing servers.
*/
@Test
public void testHedgedAsyncEcho() throws Exception {
// Hedging is not supported for blocking connection types.
Assume.assumeFalse(this instanceof TestBlockingIPC);
List<RpcServer> rpcServers = new ArrayList<>();
List<InetSocketAddress> addresses = new ArrayList<>();
// Create a mix of running and failing servers.
final int numRunningServers = 5;
final int numFailingServers = 3;
final int numServers = numRunningServers + numFailingServers;
for (int i = 0; i < numRunningServers; i++) {
RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
for (int i = 0; i < numFailingServers; i++) {
RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
Configuration conf = HBaseConfiguration.create();
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
// Try out various fan out values starting from 1 -> numServers.
for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
// Update the client's underlying conf, should be ok for the test.
LOG.debug("Testing with request fan out: " + reqFanOut);
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
Interface stub = newStub(client, addresses);
BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
stub.echo(new HBaseRpcControllerImpl(),
EchoRequestProto.newBuilder().setMessage("hello").build(), done);
TestProtos.EchoResponseProto responseProto = done.get();
assertNotNull(responseProto);
assertEquals("hello", responseProto.getMessage());
LOG.debug("Ended test with request fan out: " + reqFanOut);
}
} finally {
for (RpcServer rpcServer: rpcServers) {
rpcServer.stop();
}
}
}
@Test
public void testHedgedAsyncTimeouts() throws Exception {
// Hedging is not supported for blocking connection types.
Assume.assumeFalse(this instanceof TestBlockingIPC);
List<RpcServer> rpcServers = new ArrayList<>();
List<InetSocketAddress> addresses = new ArrayList<>();
final int numServers = 3;
for (int i = 0; i < numServers; i++) {
RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i,
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF,
new FifoRpcScheduler(CONF, 1));
rpcServer.start();
addresses.add(rpcServer.getListenerAddress());
rpcServers.add(rpcServer);
}
Configuration conf = HBaseConfiguration.create();
int timeout = 100;
int pauseTime = 1000;
try (AbstractRpcClient<?> client = createRpcClient(conf)) {
// Try out various fan out values starting from 1 -> numServers.
for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) {
// Update the client's underlying conf, should be ok for the test.
LOG.debug("Testing with request fan out: " + reqFanOut);
conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut);
Interface stub = newStub(client, addresses);
HBaseRpcController pcrc = new HBaseRpcControllerImpl();
pcrc.setCallTimeout(timeout);
BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback);
assertNull(callback.get());
// Make sure the controller has the right exception propagated.
assertTrue(pcrc.getFailed() instanceof CallTimeoutException);
LOG.debug("Ended test with request fan out: " + reqFanOut);
}
} finally {
for (RpcServer rpcServer: rpcServers) {
rpcServer.stop();
}
}
}
@Test @Test
public void testAsyncRemoteError() throws IOException { public void testAsyncRemoteError() throws IOException {
AbstractRpcClient<?> client = createRpcClient(CONF); AbstractRpcClient<?> client = createRpcClient(CONF);

View File

@ -17,21 +17,23 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@ -41,8 +43,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseReq
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Threads;
@InterfaceAudience.Private @InterfaceAudience.Private
public class TestProtobufRpcServiceImpl implements BlockingInterface { public class TestProtobufRpcServiceImpl implements BlockingInterface {
@ -67,6 +67,17 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
User.getCurrent(), 0)); User.getCurrent(), 0));
} }
public static Interface newStub(RpcClient client, List<InetSocketAddress> addrs)
throws IOException {
Set<ServerName> serverNames = new HashSet<>();
for (InetSocketAddress addr: addrs) {
serverNames.add(ServerName.valueOf(
addr.getHostName(), addr.getPort(), System.currentTimeMillis()));
}
return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel(
serverNames, User.getCurrent(), 0));
}
@Override @Override
public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request)
throws ServiceException { throws ServiceException {