HBASE-26150 Let region server also carry ClientMetaService (#3550)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Duo Zhang 2021-08-04 23:44:10 +08:00 committed by GitHub
parent 73a0411bb9
commit 63d4970de4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1309 additions and 678 deletions

View File

@ -0,0 +1,283 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
/**
* Base class for rpc based connection registry implementation.
* <p/>
* The implementation needs a bootstrap node list in configuration, and then it will use the methods
* in {@link ClientMetaService} to refresh the connection registry end points.
* <p/>
* It also supports hedged reads, the default fan out value is 2.
* <p/>
* For the actual configuration names, see javadoc of sub classes.
*/
@InterfaceAudience.Private
abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry {
/** Default value for the fan out of hedged requests. **/
public static final int HEDGED_REQS_FANOUT_DEFAULT = 2;
private final int hedgedReadFanOut;
// Configured list of end points to probe the meta information from.
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;
private final RegistryEndpointsRefresher registryEndpointRefresher;
protected AbstractRpcBasedConnectionRegistry(Configuration conf,
String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
String minRefreshIntervalSecsConfigName) throws IOException {
this.hedgedReadFanOut =
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
populateStubs(getBootstrapNodes(conf));
registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName,
minRefreshIntervalSecsConfigName, this::refreshStubs);
registryEndpointRefresher.start();
}
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints();
private void refreshStubs() throws IOException {
populateStubs(FutureUtils.get(fetchEndpoints()));
}
private void populateStubs(Set<ServerName> addrs) throws IOException {
Preconditions.checkNotNull(addrs);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(addrs.size());
User user = User.getCurrent();
for (ServerName masterAddr : addrs) {
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
addr2Stub = builder.build();
}
/**
* For describing the actual asynchronous rpc call.
* <p/>
* Typically, you can use lambda expression to implement this interface as
*
* <pre>
* (c, s, d) -> s.xxx(c, your request here, d)
* </pre>
*/
@FunctionalInterface
protected interface Callable<T> {
void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
}
private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
Callable<T> callable) {
HBaseRpcController controller = rpcControllerFactory.newController();
CompletableFuture<T> future = new CompletableFuture<>();
callable.call(controller, stub, resp -> {
if (controller.failed()) {
IOException failureReason = controller.getFailed();
future.completeExceptionally(failureReason);
if (ClientExceptionsUtil.isConnectionException(failureReason)) {
// RPC has failed, trigger a refresh of end points. We can have some spurious
// refreshes, but that is okay since the RPC is not expensive and not in a hot path.
registryEndpointRefresher.refreshNow();
}
} else {
future.complete(resp);
}
});
return future;
}
private IOException badResponse(String debug) {
return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
}
/**
* send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded,
* we will complete the future and quit. If all the requests in one round are failed, we will
* start another round to send requests concurrently tohedgedReadsFanout end points. If all end
* points have been tried and all of them are failed, we will fail the future.
*/
private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers,
List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable,
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size());
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
addListener(call(stubs.get(i), callable), (r, e) -> {
// a simple check to skip all the later operations earlier
if (future.isDone()) {
return;
}
if (e == null && !isValidResp.test(r)) {
e = badResponse(debug);
}
if (e != null) {
// make sure when remaining reaches 0 we have all exceptions in the errors queue
errors.add(e);
if (remaining.decrementAndGet() == 0) {
if (endIndexExclusive == stubs.size()) {
// we are done, complete the future with exception
RetriesExhaustedException ex =
new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors));
future.completeExceptionally(new MasterRegistryFetchException(servers, ex));
} else {
groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug,
errors);
}
}
} else {
// do not need to decrement the counter any more as we have already finished the future.
future.complete(r);
}
});
}
}
protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable,
Predicate<T> isValidResp, String debug) {
ImmutableMap<ServerName, ClientMetaService.Interface> addr2StubRef = addr2Stub;
Set<ServerName> servers = addr2StubRef.keySet();
List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2StubRef.values());
Collections.shuffle(stubs, ThreadLocalRandom.current());
CompletableFuture<T> future = new CompletableFuture<>();
groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
new ConcurrentLinkedQueue<>());
return future;
}
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
Set<ServerName> getParsedServers() {
return addr2Stub.keySet();
}
/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList()
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
return new RegionLocations(regionLocations);
}
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return tracedFuture(
() -> this
.<GetMetaRegionLocationsResponse> call(
(c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d),
r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
.thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations),
getClass().getSimpleName() + ".getMetaRegionLocations");
}
@Override
public CompletableFuture<String> getClusterId() {
return tracedFuture(
() -> this
.<GetClusterIdResponse> call(
(c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
GetClusterIdResponse::hasClusterId, "getClusterId()")
.thenApply(GetClusterIdResponse::getClusterId),
getClass().getSimpleName() + ".getClusterId");
}
@Override
public CompletableFuture<ServerName> getActiveMaster() {
return tracedFuture(
() -> this
.<GetActiveMasterResponse> call(
(c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
GetActiveMasterResponse::hasServerName, "getActiveMaster()")
.thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName())),
getClass().getSimpleName() + ".getClusterId");
}
@Override
public void close() {
trace(() -> {
if (registryEndpointRefresher != null) {
registryEndpointRefresher.stop();
}
if (rpcClient != null) {
rpcClient.close();
}
}, getClass().getSimpleName() + ".close");
}
}

View File

@ -36,8 +36,7 @@ final class ConnectionRegistryFactory {
*/
static ConnectionRegistry getRegistry(Configuration conf) {
Class<? extends ConnectionRegistry> clazz = conf.getClass(
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class,
ConnectionRegistry.class);
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class, ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf);
}
}

View File

@ -1,126 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
/**
* Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
* uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
* By default the refresh happens periodically (configured via
* {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
* {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
* should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
*/
@InterfaceAudience.Private
public class MasterAddressRefresher implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.master_registry.refresh_interval_secs";
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.master_registry.min_secs_between_refreshes";
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
private final ExecutorService pool;
private final MasterRegistry registry;
private final long periodicRefreshMs;
private final long timeBetweenRefreshesMs;
private final Object refreshMasters = new Object();
@Override
public void close() {
pool.shutdownNow();
}
/**
* Thread that refreshes the master end points until it is interrupted via {@link #close()}.
* Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
*/
private class RefreshThread implements Runnable {
@Override
public void run() {
long lastRpcTs = 0;
while (!Thread.interrupted()) {
try {
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
// have duplicate refreshes because once the thread is past the wait(), notify()s are
// ignored until the thread is back to the waiting state.
synchronized (refreshMasters) {
refreshMasters.wait(periodicRefreshMs);
}
long currentTs = EnvironmentEdgeManager.currentTime();
if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
continue;
}
lastRpcTs = currentTs;
LOG.debug("Attempting to refresh master address end points.");
Set<ServerName> newMasters = new HashSet<>(registry.getMasters().get());
registry.populateMasterStubs(newMasters);
LOG.debug("Finished refreshing master end points. {}", newMasters);
} catch (InterruptedException e) {
LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
break;
} catch (ExecutionException | IOException e) {
LOG.debug("Error populating latest list of masters.", e);
}
}
LOG.info("Master end point refresher loop exited.");
}
}
MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(periodicRefreshMs > 0);
Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
this.registry = registry;
pool.submit(new RefreshThread());
}
/**
* Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
* See class comment for details.
*/
void refreshNow() {
synchronized (refreshMasters) {
refreshMasters.notify();
}
}
}

View File

@ -18,55 +18,28 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.util.DNS.getHostname;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.DNS.ServerType;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
@ -74,40 +47,31 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaReg
* <p/>
* It supports hedged reads, set the fan out of the requests batch by
* {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
* it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
* it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
* <p/>
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {
public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
/** Configuration key that controls the fan out of requests **/
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
"hbase.client.master_registry.hedged.fanout";
/** Default value for the fan out of hedged requests. **/
public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.master_registry.refresh_interval_secs";
public static final String MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.master_registry.min_secs_between_refreshes";
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
private final int hedgedReadFanOut;
// Configured list of masters to probe the meta information from.
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;
protected final MasterAddressRefresher masterAddressRefresher;
/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
* separated host[:port] values. If no port number if specified, default master port is assumed.
* @param conf Configuration to parse from.
*/
private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
public static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
Set<ServerName> masterAddrs = new HashSet<>();
String configuredMasters = getMasterAddr(conf);
for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
@ -120,31 +84,18 @@ public class MasterRegistry implements ConnectionRegistry {
}
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
// Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
// by fetching the end points from this list.
populateMasterStubs(parseMasterAddrs(conf));
masterAddressRefresher = new MasterAddressRefresher(conf, this);
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
}
void populateMasterStubs(Set<ServerName> masters) throws IOException {
Preconditions.checkNotNull(masters);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(masters.size());
User user = User.getCurrent();
for (ServerName masterAddr : masters) {
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
masterAddr2Stub = builder.build();
@Override
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
return parseMasterAddrs(conf);
}
@Override
protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
return getMasters();
}
/**
@ -162,195 +113,18 @@ public class MasterRegistry implements ConnectionRegistry {
return String.format("%s:%d", hostname, port);
}
/**
* For describing the actual asynchronous rpc call.
* <p/>
* Typically, you can use lambda expression to implement this interface as
*
* <pre>
* (c, s, d) -> s.xxx(c, your request here, d)
* </pre>
*/
@FunctionalInterface
private interface Callable<T> {
void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
private static Set<ServerName> transformServerNames(GetMastersResponse resp) {
return resp.getMasterServersList().stream()
.map(s -> ProtobufUtil.toServerName(s.getServerName())).collect(Collectors.toSet());
}
private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
Callable<T> callable) {
HBaseRpcController controller = rpcControllerFactory.newController();
CompletableFuture<T> future = new CompletableFuture<>();
callable.call(controller, stub, resp -> {
if (controller.failed()) {
IOException failureReason = controller.getFailed();
future.completeExceptionally(failureReason);
if (ClientExceptionsUtil.isConnectionException(failureReason)) {
// RPC has failed, trigger a refresh of master end points. We can have some spurious
// refreshes, but that is okay since the RPC is not expensive and not in a hot path.
masterAddressRefresher.refreshNow();
}
} else {
future.complete(resp);
}
});
return future;
}
private IOException badResponse(String debug) {
return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
}
/**
* send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
* will complete the future and quit. If all the requests in one round are failed, we will start
* another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
* been tried and all of them are failed, we will fail the future.
*/
private <T extends Message> void groupCall(CompletableFuture<T> future,
Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
ConcurrentLinkedQueue<Throwable> errors) {
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
addListener(call(masterStubs.get(i), callable), (r, e) -> {
// a simple check to skip all the later operations earlier
if (future.isDone()) {
return;
}
if (e == null && !isValidResp.test(r)) {
e = badResponse(debug);
}
if (e != null) {
// make sure when remaining reaches 0 we have all exceptions in the errors queue
errors.add(e);
if (remaining.decrementAndGet() == 0) {
if (endIndexExclusive == masterStubs.size()) {
// we are done, complete the future with exception
RetriesExhaustedException ex = new RetriesExhaustedException("masters",
masterStubs.size(), new ArrayList<>(errors));
future.completeExceptionally(
new MasterRegistryFetchException(masterServers, ex));
} else {
groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
isValidResp, debug, errors);
}
}
} else {
// do not need to decrement the counter any more as we have already finished the future.
future.complete(r);
}
});
}
}
private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
Predicate<T> isValidResp, String debug) {
ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub;
Set<ServerName> masterServers = masterAddr2StubRef.keySet();
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values());
Collections.shuffle(masterStubs, ThreadLocalRandom.current());
CompletableFuture<T> future = new CompletableFuture<>();
groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
new ConcurrentLinkedQueue<>());
return future;
}
/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList()
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
return new RegionLocations(regionLocations);
}
@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return tracedFuture(
() -> this
.<GetMetaRegionLocationsResponse> call(
(c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d),
r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
.thenApply(MasterRegistry::transformMetaRegionLocations),
"MasterRegistry.getMetaRegionLocations");
}
@Override
public CompletableFuture<String> getClusterId() {
return tracedFuture(() -> this
.<GetClusterIdResponse> call(
(c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
GetClusterIdResponse::hasClusterId, "getClusterId()")
.thenApply(GetClusterIdResponse::getClusterId), "MasterRegistry.getClusterId");
}
private static boolean hasActiveMaster(GetMastersResponse resp) {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
return activeMasters.size() == 1;
}
private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
if (activeMasters.size() != 1) {
throw new IOException(String.format("Incorrect number of active masters encountered." +
" Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters));
}
return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
}
@Override
public CompletableFuture<ServerName> getActiveMaster() {
return tracedFuture(() -> {
CompletableFuture<ServerName> future = new CompletableFuture<>();
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
ServerName result = null;
try {
result = filterActiveMaster((GetMastersResponse) resp);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(result);
});
return future;
}, "MasterRegistry.getActiveMaster");
}
private static List<ServerName> transformServerNames(GetMastersResponse resp) {
return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
s.getServerName())).collect(Collectors.toList());
}
CompletableFuture<List<ServerName>> getMasters() {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/(.*/MasterRegistry.java|src/test/.*)")
CompletableFuture<Set<ServerName>> getMasters() {
return this
.<GetMastersResponse> call((c, s, d) -> s.getMasters(
c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
"getMasters()").thenApply(MasterRegistry::transformServerNames);
}
Set<ServerName> getParsedMasterServers() {
return masterAddr2Stub.keySet();
}
@Override
public void close() {
trace(() -> {
if (masterAddressRefresher != null) {
masterAddressRefresher.close();
}
if (rpcClient != null) {
rpcClient.close();
}
}, "MasterRegistry.close");
.<GetMastersResponse> call(
(c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
r -> r.getMasterServersCount() != 0, "getMasters()")
.thenApply(MasterRegistry::transformServerNames);
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* Thread safe utility that keeps registry end points used by {@link ConnectionRegistry} up to date.
* By default the refresh happens periodically (configured via {@code intervalSecsConfigName}). The
* refresh can also be triggered on demand via {@link #refreshNow()}. To prevent a flood of
* on-demand refreshes we expect that any attempts two should be spaced at least
* {@code minIntervalSecsConfigName} seconds apart.
*/
@InterfaceAudience.Private
class RegistryEndpointsRefresher {
private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class);
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.rpc_registry.refresh_interval_secs";
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.rpc_registry.min_secs_between_refreshes";
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
private final Thread thread;
private final Refresher refresher;
private final long periodicRefreshMs;
private final long minTimeBetweenRefreshesMs;
private boolean refreshNow = false;
private boolean stopped = false;
public void start() {
thread.start();
}
public synchronized void stop() {
stopped = true;
notifyAll();
}
// The main loop for the refresh thread.
private void mainLoop() {
long lastRefreshTime = EnvironmentEdgeManager.currentTime();
for (;;) {
synchronized (this) {
for (;;) {
if (stopped) {
LOG.info("Registry end points refresher loop exited.");
return;
}
// if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed,
// otherwise wait until periodicRefreshMs elapsed
long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) -
(EnvironmentEdgeManager.currentTime() - lastRefreshTime);
if (waitTime <= 0) {
break;
}
try {
wait(waitTime);
} catch (InterruptedException e) {
LOG.warn("Interrupted during wait", e);
Thread.currentThread().interrupt();
continue;
}
}
// we are going to refresh, reset this flag
refreshNow = false;
}
LOG.debug("Attempting to refresh registry end points");
try {
refresher.refresh();
} catch (IOException e) {
LOG.warn("Error refresh registry end points", e);
}
// We do not think it is a big deal to fail one time, so no matter what is refresh result, we
// just update this refresh time and wait for the next round. If later this becomes critical,
// could change to only update this value when we have done a successful refreshing.
lastRefreshTime = EnvironmentEdgeManager.currentTime();
LOG.debug("Finished refreshing registry end points");
}
}
@FunctionalInterface
public interface Refresher {
void refresh() throws IOException;
}
RegistryEndpointsRefresher(Configuration conf, String intervalSecsConfigName,
String minIntervalSecsConfigName, Refresher refresher) {
periodicRefreshMs = TimeUnit.SECONDS
.toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
minTimeBetweenRefreshesMs = TimeUnit.SECONDS
.toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(periodicRefreshMs > 0);
Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs);
thread = new Thread(this::mainLoop);
thread.setName("Registry-endpoints-refresh-end-points");
thread.setDaemon(true);
this.refresher = refresher;
}
/**
* Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
* See class comment for details.
*/
synchronized void refreshNow() {
refreshNow = true;
notifyAll();
}
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
/**
* Rpc based connection registry. It will make use of the {@link ClientMetaService} to get registry
* information.
* <p/>
* It needs bootstrap node list when start up, and then it will use {@link ClientMetaService} to
* refresh the bootstrap node list periodically.
* <p/>
* Usually, you could set masters as the bootstrap nodes,as they will also implement the
* {@link ClientMetaService}, and then, we will switch to use region servers after refreshing the
* bootstrap nodes.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
/** Configuration key that controls the fan out of requests **/
public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.rpc_registry.hedged.fanout";
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.bootstrap.refresh_interval_secs";
public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.bootstrap.min_secs_between_refreshes";
public static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers";
private static final char ADDRS_CONF_SEPARATOR = ',';
RpcConnectionRegistry(Configuration conf) throws IOException {
super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES);
}
@Override
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
// try get bootstrap nodes config first
String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
if (!StringUtils.isBlank(configuredBootstrapNodes)) {
return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes)
.map(addr -> ServerName.valueOf(addr, ServerName.NON_STARTCODE))
.collect(Collectors.toSet());
} else {
// otherwise, just use master addresses
return MasterRegistry.parseMasterAddrs(conf);
}
}
private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toSet());
}
private CompletableFuture<Set<ServerName>> getBootstrapNodes() {
return this
.<GetBootstrapNodesResponse> call(
(c, s, d) -> s.getBootstrapNodes(c, GetBootstrapNodesRequest.getDefaultInstance(), d),
r -> r.getServerNameCount() != 0, "getBootstrapNodes()")
.thenApply(RpcConnectionRegistry::transformServerNames);
}
@Override
protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
return getBootstrapNodes();
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos;
/**
* Maps RPC protocol interfaces to required configuration
@ -49,7 +50,7 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.HbckService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(),
infos.put(RegistryProtos.ClientMetaService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
// new Service will not be found when all is Kerberized!!!!

View File

@ -33,7 +33,6 @@ import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
@ -58,22 +57,30 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
@Category({ ClientTests.class, SmallTests.class })
public class TestMasterRegistryHedgedReads {
public class TestRpcBasedRegistryHedgedReads {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
HBaseClassTestRule.forClass(TestRpcBasedRegistryHedgedReads.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
"hbase.test.refresh.interval.secs";
private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
"hbase.test.min.refresh.interval.secs";
private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
private static final ExecutorService EXECUTOR =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
private static Set<ServerName> BOOTSTRAP_NODES;
private static AtomicInteger CALLED = new AtomicInteger(0);
private static volatile int BAD_RESP_INDEX;
@ -142,14 +149,35 @@ public class TestMasterRegistryHedgedReads {
}
}
private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
@Override
protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
return BOOTSTRAP_NODES;
}
@Override
protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
return CompletableFuture.completedFuture(BOOTSTRAP_NODES);
}
};
}
@BeforeClass
public static void setUpBeforeClass() {
Configuration conf = UTIL.getConfiguration();
conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
RpcClient.class);
String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
.collect(Collectors.joining(","));
conf.set(HConstants.MASTER_ADDRS_KEY, masters);
// disable refresh, we do not need to refresh in this test
conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
BOOTSTRAP_NODES = IntStream.range(0, 10)
.mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), ServerName.NON_STARTCODE))
.collect(Collectors.toSet());
}
@AfterClass
@ -175,9 +203,7 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testAllFailNoHedged() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
try (MasterRegistry registry = new MasterRegistry(conf)) {
try (AbstractRpcBasedConnectionRegistry registry = createRegistry(1)) {
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
assertEquals(10, CALLED.get());
}
@ -185,10 +211,8 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testAllFailHedged3() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
BAD_RESP_INDEX = 5;
try (MasterRegistry registry = new MasterRegistry(conf)) {
try (AbstractRpcBasedConnectionRegistry registry = createRegistry(3)) {
assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
assertEquals(10, CALLED.get());
}
@ -196,12 +220,10 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testFirstSucceededNoHedge() throws IOException {
Configuration conf = UTIL.getConfiguration();
// will be set to 1
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
GOOD_RESP_INDEXS =
IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
try (MasterRegistry registry = new MasterRegistry(conf)) {
// will be set to 1
try (AbstractRpcBasedConnectionRegistry registry = createRegistry(0)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
assertEquals(1, CALLED.get());
@ -210,10 +232,8 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testSecondRoundSucceededHedge4() throws IOException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
GOOD_RESP_INDEXS = Collections.singleton(6);
try (MasterRegistry registry = new MasterRegistry(conf)) {
try (AbstractRpcBasedConnectionRegistry registry = createRegistry(4)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
UTIL.waitFor(5000, () -> CALLED.get() == 8);
@ -222,10 +242,8 @@ public class TestMasterRegistryHedgedReads {
@Test
public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
Configuration conf = UTIL.getConfiguration();
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
GOOD_RESP_INDEXS = Collections.singleton(5);
try (MasterRegistry registry = new MasterRegistry(conf)) {
try (AbstractRpcBasedConnectionRegistry registry = createRegistry(Integer.MAX_VALUE)) {
String clusterId = logIfError(registry.getClusterId());
assertEquals(RESP.getClusterId(), clusterId);
UTIL.waitFor(5000, () -> CALLED.get() == 10);

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
// The protos for ConnectionRegistry.
package hbase.pb;
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "RegistryProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "HBase.proto";
/** Request and response to get the clusterID for this cluster */
message GetClusterIdRequest {
}
message GetClusterIdResponse {
/** Not set if cluster ID could not be determined. */
optional string cluster_id = 1;
}
/** Request and response to get the currently active master name for this cluster */
message GetActiveMasterRequest {
}
message GetActiveMasterResponse {
/** Not set if an active master could not be determined. */
optional ServerName server_name = 1;
}
/** Request and response to get the current list of all registers master servers */
message GetMastersRequest {
option deprecated = true;
}
message GetMastersResponseEntry {
option deprecated = true;
required ServerName server_name = 1;
required bool is_active = 2;
}
message GetMastersResponse {
option deprecated = true;
repeated GetMastersResponseEntry master_servers = 1;
}
/** Request and response to get the current list of meta region locations */
message GetMetaRegionLocationsRequest {
}
message GetMetaRegionLocationsResponse {
/** Not set if meta region locations could not be determined. */
repeated RegionLocation meta_locations = 1;
}
/** Request and response to get the nodes which could be used to as ClientMetaService */
message GetBootstrapNodesRequest {
}
message GetBootstrapNodesResponse {
repeated ServerName server_name = 1;
}
/**
* Implements all the RPCs needed by clients to look up cluster meta information needed for
* connection establishment.
*/
service ClientMetaService {
/**
* Get Cluster ID for this cluster.
*/
rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
/**
* Get active master server name for this cluster. Retained for out of sync client and master
* rolling upgrades. Newer clients switched to GetMasters RPC request.
*/
rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
/**
* Get registered list of master servers in this cluster.
*/
rpc GetMasters(GetMastersRequest) returns(GetMastersResponse) {
option deprecated = true;
};
/**
* Get current meta replicas' region locations.
*/
rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
/**
* Get nodes which could be used as ClientMetaService
*/
rpc GetBootstrapNodes(GetBootstrapNodesRequest) returns (GetBootstrapNodesResponse);
}

View File

@ -1295,65 +1295,3 @@ service HbckService {
rpc FixMeta(FixMetaRequest)
returns(FixMetaResponse);
}
/** Request and response to get the clusterID for this cluster */
message GetClusterIdRequest {
}
message GetClusterIdResponse {
/** Not set if cluster ID could not be determined. */
optional string cluster_id = 1;
}
/** Request and response to get the currently active master name for this cluster */
message GetActiveMasterRequest {
}
message GetActiveMasterResponse {
/** Not set if an active master could not be determined. */
optional ServerName server_name = 1;
}
/** Request and response to get the current list of all registers master servers */
message GetMastersRequest {
}
message GetMastersResponseEntry {
required ServerName server_name = 1;
required bool is_active = 2;
}
message GetMastersResponse {
repeated GetMastersResponseEntry master_servers = 1;
}
/** Request and response to get the current list of meta region locations */
message GetMetaRegionLocationsRequest {
}
message GetMetaRegionLocationsResponse {
/** Not set if meta region locations could not be determined. */
repeated RegionLocation meta_locations = 1;
}
/**
* Implements all the RPCs needed by clients to look up cluster meta information needed for
* connection establishment.
*/
service ClientMetaService {
/**
* Get Cluster ID for this cluster.
*/
rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
/**
* Get active master server name for this cluster. Retained for out of sync client and master
* rolling upgrades. Newer clients switched to GetMasters RPC request.
*/
rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
/**
* Get registered list of master servers in this cluster.
*/
rpc GetMasters(GetMastersRequest) returns(GetMastersResponse);
/**
* Get current meta replicas' region locations.
*/
rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse);
}

View File

@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
package org.apache.hadoop.hbase;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
@ -35,7 +35,9 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@ -87,10 +89,10 @@ public class MetaRegionLocationCache extends ZKListener {
// are established. Subsequent updates are handled by the registered listener. Also, this runs
// in a separate thread in the background to not block master init.
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
RetryCounterFactory retryFactory = new RetryCounterFactory(
Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
threadFactory.newThread(
()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start();
RetryCounterFactory retryFactory = new RetryCounterFactory(Integer.MAX_VALUE,
SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
threadFactory.newThread(() -> loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT))
.start();
}
/**

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
@ -222,6 +223,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@ -232,6 +234,7 @@ import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@ -307,13 +310,6 @@ public class HMaster extends HRegionServer implements MasterServices {
// manager of assignment nodes in zookeeper
private AssignmentManager assignmentManager;
/**
* Cache for the meta region replica's locations. Also tracks their changes to avoid stale
* cache entries.
*/
private final MetaRegionLocationCache metaRegionLocationCache;
private RSGroupInfoManager rsGroupInfoManager;
// manager of replication
@ -480,7 +476,6 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
cachedClusterId = new CachedClusterId(this, conf);
@ -3810,10 +3805,6 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
}
@Override
public RSGroupInfoManager getRSGroupInfoManager() {
return rsGroupInfoManager;

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.Server;
@ -75,7 +74,6 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.master.janitor.MetaFixer;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -183,7 +181,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceReq
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
@ -208,21 +205,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
@ -383,6 +371,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
@ -3005,9 +3003,11 @@ public class MasterRpcServices extends RSRpcServices implements
return true;
}
// Override this method since for backup master we will not set the clusterId field, which means
// we need to find another way to get cluster id for backup masters.
@Override
public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request)
throws ServiceException {
throws ServiceException {
GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder();
String clusterId = master.getClusterId();
if (clusterId != null) {
@ -3016,40 +3016,43 @@ public class MasterRpcServices extends RSRpcServices implements
return resp.build();
}
// Override this method since we use ActiveMasterManager to get active master on HMaster while in
// HRegionServer we use MasterAddressTracker
@Override
public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name)));
return resp.build();
}
// Override this method since we use ActiveMasterManager to get backup masters on HMaster while in
// HRegionServer we use MasterAddressTracker
@Override
public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
throws ServiceException {
throws ServiceException {
GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
// Active master
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
// Backup masters
for (ServerName backupMaster: master.getBackupMasters()) {
resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
for (ServerName backupMaster : master.getBackupMasters()) {
resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
}
return resp.build();
}
@Override
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
GetMetaRegionLocationsRequest request) throws ServiceException {
GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder();
Optional<List<HRegionLocation>> metaLocations =
master.getMetaRegionLocationCache().getMetaRegionLocations();
metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach(
location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
return response.build();
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
for (ServerName sn : master.getServerManager().getOnlineServers().keySet()) {
builder.addServerName(ProtobufUtil.toServerName(sn));
}
return builder.build();
}
@Override

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HealthCheckChore;
import org.apache.hadoop.hbase.MetaRegionLocationCache;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PleaseHoldException;
@ -179,6 +180,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -423,6 +425,16 @@ public class HRegionServer extends Thread implements
// master address tracker
private final MasterAddressTracker masterAddressTracker;
/**
* Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache
* entries. Used for serving ClientMetaService.
*/
private final MetaRegionLocationCache metaRegionLocationCache;
/**
* Cache for all the region servers in the cluster. Used for serving ClientMetaService.
*/
private final RegionServerAddressTracker regionServerAddressTracker;
// Cluster Status Tracker
protected final ClusterStatusTracker clusterStatusTracker;
@ -669,6 +681,8 @@ public class HRegionServer extends Thread implements
clusterStatusTracker = null;
}
this.rpcServices.start(zooKeeper);
this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper);
this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this);
// This violates 'no starting stuff in Constructor' but Master depends on the below chore
// and executor being created and takes a different startup route. Lots of overlap between HRS
// and M (An M IS A HRS now). Need to refactor so less duplication between M and its super
@ -3992,4 +4006,12 @@ public class HRegionServer extends Thread implements
public long getRetryPauseTime() {
return this.retryPauseTime;
}
public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
}
RegionServerAddressTracker getRegionServerAddressTracker() {
return regionServerAddressTracker;
}
}

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PrivateCellUtil;
@ -160,6 +161,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
@ -258,6 +260,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@ -270,8 +284,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class RSRpcServices implements HBaseRPCErrorHandler,
AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction,
ConfigurationObserver {
AdminService.BlockingInterface, ClientService.BlockingInterface,
ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class);
/** RPC scheduler to use for the region server. */
@ -377,9 +391,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* where you would ever turn off one or the other).
*/
public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG =
"hbase.regionserver.admin.executorService";
"hbase.regionserver.admin.executorService";
public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG =
"hbase.regionserver.client.executorService";
"hbase.regionserver.client.executorService";
public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG =
"hbase.regionserver.client.meta.executorService";
/**
* An Rpc callback for closing a RegionScanner.
@ -1582,23 +1598,24 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* supports
*/
protected List<BlockingServiceAndInterface> getServices() {
boolean admin =
getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
boolean client =
getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true);
boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true);
boolean clientMeta =
getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true);
List<BlockingServiceAndInterface> bssi = new ArrayList<>();
if (client) {
bssi.add(new BlockingServiceAndInterface(
ClientService.newReflectiveBlockingService(this),
ClientService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this),
ClientService.BlockingInterface.class));
}
if (admin) {
bssi.add(new BlockingServiceAndInterface(
AdminService.newReflectiveBlockingService(this),
AdminService.BlockingInterface.class));
bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this),
AdminService.BlockingInterface.class));
}
return new org.apache.hbase.thirdparty.com.google.common.collect.
ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
if (clientMeta) {
bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this),
ClientMetaService.BlockingInterface.class));
}
return new ImmutableList.Builder<BlockingServiceAndInterface>().addAll(bssi).build();
}
public InetSocketAddress getSocketAddress() {
@ -4064,4 +4081,61 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
protected ZKPermissionWatcher getZkPermissionWatcher() {
return zkPermissionWatcher;
}
@Override
public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request)
throws ServiceException {
return GetClusterIdResponse.newBuilder().setClusterId(regionServer.getClusterId()).build();
}
@Override
public GetActiveMasterResponse getActiveMaster(RpcController controller,
GetActiveMasterRequest request) throws ServiceException {
GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder();
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
if (activeMaster != null) {
builder.setServerName(ProtobufUtil.toServerName(activeMaster));
}
return builder.build();
}
@Override
public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request)
throws ServiceException {
try {
GetMastersResponse.Builder builder = GetMastersResponse.newBuilder();
ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress();
if (activeMaster != null) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true));
}
for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) {
builder.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller,
GetMetaRegionLocationsRequest request) throws ServiceException {
GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder();
Optional<List<HRegionLocation>> metaLocations =
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
metaLocations.ifPresent(hRegionLocations -> hRegionLocations
.forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))));
return builder.build();
}
@Override
public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller,
GetBootstrapNodesRequest request) throws ServiceException {
GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder();
regionServer.getRegionServerAddressTracker().getRegionServers().stream()
.map(ProtobufUtil::toServerName).forEach(builder::addServerName);
return builder.build();
}
}

View File

@ -17,18 +17,20 @@
*/
package org.apache.hadoop.hbase.security;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos;
/**
* Implementation of secure Hadoop policy provider for mapping
* protocol interfaces to hbase-policy.xml entries.
@ -41,7 +43,7 @@ public class HBasePolicyProvider extends PolicyProvider {
new Service("security.client.protocol.acl",
MasterProtos.HbckService.BlockingInterface.class),
new Service("security.client.protocol.acl",
MasterProtos.ClientMetaService.BlockingInterface.class),
RegistryProtos.ClientMetaService.BlockingInterface.class),
new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class),
new Service("security.masterregion.protocol.acl",
RegionServerStatusService.BlockingInterface.class)

View File

@ -96,7 +96,7 @@ class FromClientSideBase {
Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
}

View File

@ -1,113 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
@Category({ClientTests.class, SmallTests.class})
public class TestMasterAddressRefresher {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterAddressRefresher.class);
private class DummyMasterRegistry extends MasterRegistry {
private final AtomicInteger getMastersCallCounter = new AtomicInteger(0);
private final List<Long> callTimeStamps = new ArrayList<>();
DummyMasterRegistry(Configuration conf) throws IOException {
super(conf);
}
@Override
CompletableFuture<List<ServerName>> getMasters() {
getMastersCallCounter.incrementAndGet();
callTimeStamps.add(EnvironmentEdgeManager.currentTime());
return CompletableFuture.completedFuture(new ArrayList<>());
}
public int getMastersCount() {
return getMastersCallCounter.get();
}
public List<Long> getCallTimeStamps() {
return callTimeStamps;
}
}
@Test
public void testPeriodicMasterEndPointRefresh() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Refresh every 1 second.
conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, 1);
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
// Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
Waiter.waitFor(
conf, 5000, (Waiter.Predicate<Exception>) () -> registry.getMastersCount() > 3);
}
}
@Test
public void testDurationBetweenRefreshes() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Disable periodic refresh
conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, Integer.MAX_VALUE);
// A minimum duration of 1s between refreshes
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 1);
try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
// Issue a ton of manual refreshes.
for (int i = 0; i < 10000; i++) {
registry.masterAddressRefresher.refreshNow();
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
// Overall wait time is 10000 ms, so the number of requests should be <=10
List<Long> callTimeStamps = registry.getCallTimeStamps();
// Actual calls to getMasters() should be much lower than the refresh count.
Assert.assertTrue(
String.valueOf(registry.getMastersCount()), registry.getMastersCount() <= 20);
Assert.assertTrue(callTimeStamps.size() > 0);
// Verify that the delta between subsequent RPCs is at least 1sec as configured.
for (int i = 1; i < callTimeStamps.size() - 1; i++) {
long delta = callTimeStamps.get(i) - callTimeStamps.get(i - 1);
// Few ms cushion to account for any env jitter.
Assert.assertTrue(callTimeStamps.toString(), delta > 990);
}
}
}
}

View File

@ -53,7 +53,7 @@ public class TestMasterRegistry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterRegistry.class);
HBaseClassTestRule.forClass(TestMasterRegistry.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
@BeforeClass
@ -90,7 +90,7 @@ public class TestMasterRegistry {
int numMasters = 10;
conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters));
try (MasterRegistry registry = new MasterRegistry(conf)) {
List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedMasterServers());
List<ServerName> parsedMasters = new ArrayList<>(registry.getParsedServers());
// Half of them would be without a port, duplicates are removed.
assertEquals(numMasters / 2 + 1, parsedMasters.size());
// Sort in the increasing order of port numbers.
@ -149,17 +149,17 @@ public class TestMasterRegistry {
// Set the hedging fan out so that all masters are queried.
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
// Do not limit the number of refreshes during the test run.
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
conf.setLong(MasterRegistry.MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES, 0);
try (MasterRegistry registry = new MasterRegistry(conf)) {
final Set<ServerName> masters = registry.getParsedMasterServers();
final Set<ServerName> masters = registry.getParsedServers();
assertTrue(masters.contains(badServer));
// Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails.
assertEquals(registry.getClusterId().get(), clusterId);
// Wait for new set of masters to be populated.
TEST_UTIL.waitFor(5000,
(Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters));
(Waiter.Predicate<Exception>) () -> !registry.getParsedServers().equals(masters));
// new set of masters should not include the bad server
final Set<ServerName> newMasters = registry.getParsedMasterServers();
final Set<ServerName> newMasters = registry.getParsedServers();
// Bad one should be out.
assertEquals(3, newMasters.size());
assertFalse(newMasters.contains(badServer));
@ -170,8 +170,8 @@ public class TestMasterRegistry {
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(10000);
// Wait until the killed master de-registered. This should also trigger another refresh.
TEST_UTIL.waitFor(10000, () -> registry.getMasters().get().size() == 2);
TEST_UTIL.waitFor(20000, () -> registry.getParsedMasterServers().size() == 2);
final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
TEST_UTIL.waitFor(20000, () -> registry.getParsedServers().size() == 2);
final Set<ServerName> newMasters2 = registry.getParsedServers();
assertEquals(2, newMasters2.size());
assertFalse(newMasters2.contains(activeMaster.getServerName()));
} finally {

View File

@ -28,11 +28,11 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaRegionLocationCache;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MetaRegionLocationCache;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -94,20 +94,20 @@ public class TestMetaRegionLocationCache {
break;
}
}
List<HRegionLocation> metaHRLs =
master.getMetaRegionLocationCache().getMetaRegionLocations().get();
assertFalse(metaHRLs.isEmpty());
ZKWatcher zk = master.getZooKeeper();
List<String> metaZnodes = zk.getMetaReplicaNodes();
// Wait till all replicas available.
retries = 0;
while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() !=
metaZnodes.size()) {
while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() != metaZnodes
.size()) {
Thread.sleep(1000);
if (++retries == 10) {
break;
}
}
List<HRegionLocation> metaHRLs =
master.getMetaRegionLocationCache().getMetaRegionLocations().get();
assertFalse(metaHRLs.isEmpty());
assertEquals(metaZnodes.size(), metaHRLs.size());
List<HRegionLocation> actualHRLs = getCurrentMetaLocations(zk);
Collections.sort(metaHRLs);
@ -115,13 +115,14 @@ public class TestMetaRegionLocationCache {
assertEquals(actualHRLs, metaHRLs);
}
@Test public void testInitialMetaLocations() throws Exception {
@Test
public void testInitialMetaLocations() throws Exception {
verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster());
}
@Test public void testStandByMetaLocations() throws Exception {
@Test
public void testStandByMetaLocations() throws Exception {
HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster();
standBy.isInitialized();
verifyCachedMetaLocations(standBy);
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
@Category({ ClientTests.class, SmallTests.class })
public class TestRegistryEndpointsRefresher {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class);
private static final String INTERVAL_SECS_CONFIG_NAME =
"hbase.test.registry.refresh.interval.secs";
private static final String MIN_INTERVAL_SECS_CONFIG_NAME =
"hbase.test.registry.refresh.min.interval.secs";
private Configuration conf;
private RegistryEndpointsRefresher refresher;
private AtomicInteger getMastersCallCounter;
private CopyOnWriteArrayList<Long> callTimestamps;
@Before
public void setUp() {
conf = HBaseConfiguration.create();
getMastersCallCounter = new AtomicInteger(0);
callTimestamps = new CopyOnWriteArrayList<>();
}
@After
public void tearDown() {
if (refresher != null) {
refresher.stop();
}
}
private void refresh() {
getMastersCallCounter.incrementAndGet();
callTimestamps.add(EnvironmentEdgeManager.currentTime());
}
private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) {
conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs);
conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs);
refresher = new RegistryEndpointsRefresher(conf, INTERVAL_SECS_CONFIG_NAME,
MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh);
refresher.start();
}
@Test
public void testPeriodicMasterEndPointRefresh() throws IOException {
// Refresh every 1 second.
createAndStartRefresher(1, 0);
// Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
Waiter.waitFor(conf, 5000, () -> getMastersCallCounter.get() > 3);
}
@Test
public void testDurationBetweenRefreshes() throws IOException {
// Disable periodic refresh
// A minimum duration of 1s between refreshes
createAndStartRefresher(Integer.MAX_VALUE, 1);
// Issue a ton of manual refreshes.
for (int i = 0; i < 10000; i++) {
refresher.refreshNow();
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
// Overall wait time is 10000 ms, so the number of requests should be <=10
// Actual calls to getMasters() should be much lower than the refresh count.
assertTrue(String.valueOf(getMastersCallCounter.get()), getMastersCallCounter.get() <= 20);
assertTrue(callTimestamps.size() > 0);
// Verify that the delta between subsequent RPCs is at least 1sec as configured.
for (int i = 1; i < callTimestamps.size() - 1; i++) {
long delta = callTimestamps.get(i) - callTimestamps.get(i - 1);
// Few ms cushion to account for any env jitter.
assertTrue(callTimestamps.toString(), delta > 990);
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ MediumTests.class, ClientTests.class })
public class TestRpcConnectionRegistry {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRpcConnectionRegistry.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private RpcConnectionRegistry registry;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// allow refresh immediately so we will switch to use region servers soon.
UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1);
UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0);
UTIL.startMiniCluster(3);
HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws IOException {
registry = new RpcConnectionRegistry(UTIL.getConfiguration());
}
@After
public void tearDown() throws IOException {
Closeables.close(registry, true);
}
@Test
public void testRegistryRPCs() throws Exception {
HMaster activeMaster = UTIL.getHBaseCluster().getMaster();
// wait until we switch to use region servers
UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3);
assertThat(registry.getParsedServers(),
hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0])));
// Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion
// because not all replicas had made it up before test started.
RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry);
assertEquals(registry.getClusterId().get(), activeMaster.getClusterId());
assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName());
List<HRegionLocation> metaLocations =
Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations());
List<HRegionLocation> actualMetaLocations =
activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get();
Collections.sort(metaLocations);
Collections.sort(actualMetaLocations);
assertEquals(actualMetaLocations, metaLocations);
}
}

View File

@ -133,7 +133,7 @@ public class TestScannersFromClientSide {
Class<?> confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class);
int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT);
AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT);
return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig;
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -45,14 +46,15 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
@Category({MediumTests.class, MasterTests.class})
public class TestClientMetaServiceRPCs {

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* Class for tracking the region servers for a cluster.
*/
@InterfaceAudience.Private
public class RegionServerAddressTracker extends ZKListener {
private static final Logger LOG = LoggerFactory.getLogger(RegionServerAddressTracker.class);
private volatile List<ServerName> regionServers = Collections.emptyList();
private final Abortable abortable;
public RegionServerAddressTracker(ZKWatcher watcher, Abortable abortable) {
super(watcher);
this.abortable = abortable;
watcher.registerListener(this);
loadRegionServerList();
}
private void loadRegionServerList() {
List<String> names;
try {
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
} catch (KeeperException e) {
LOG.error("failed to list region servers", e);
abortable.abort("failed to list region servers", e);
return;
}
if (CollectionUtils.isEmpty(names)) {
regionServers = Collections.emptyList();
} else {
regionServers = names.stream().map(ServerName::parseServerName)
.collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
}
}
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().rsZNode)) {
loadRegionServerList();
}
}
public List<ServerName> getRegionServers() {
return regionServers;
}
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ZKTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ZKTests.class, MediumTests.class })
public class TestRegionServerAddressTracker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionServerAddressTracker.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAddressTracker.class);
private static final HBaseZKTestingUtil TEST_UTIL = new HBaseZKTestingUtil();
private ZKWatcher zk;
private RegionServerAddressTracker tracker;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@Before
public void setUp() throws ZooKeeperConnectionException, IOException, KeeperException {
TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + name.getMethodName());
zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null);
ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode);
tracker = new RegionServerAddressTracker(zk, new WarnOnlyAbortable());
}
@After
public void tearDown() throws IOException {
Closeables.close(zk, true);
}
@Test
public void test() throws KeeperException {
ServerName rs1 = ServerName.valueOf("127.0.0.1", 16000, EnvironmentEdgeManager.currentTime());
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
assertEquals(rs1, tracker.getRegionServers().get(0));
ServerName rs2 = ServerName.valueOf("127.0.0.2", 16000, EnvironmentEdgeManager.currentTime());
ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 2);
assertThat(tracker.getRegionServers(), hasItems(rs1, rs2));
ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1);
assertEquals(rs2, tracker.getRegionServers().get(0));
ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString()));
TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().isEmpty());
}
private static final class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("RegionServerAddressTracker received abort, ignoring. Reason: {}", why, e);
}
@Override
public boolean isAborted() {
return false;
}
}
}