HBASE-24765: Dynamic master discovery (#2130)

This patch adds the ability to discover newly added masters
dynamically on the master registry side. The trigger for the
re-fetch is either periodic (5 mins) or any registry RPC failure.
Master server information is cached in masters to avoid repeated
ZK lookups.

Updates the client side connection metrics to maintain a counter
per RPC type so that clients have visibility into counts grouped
by RPC method name.

I didn't add the method to ZK registry interface since there
is a design discussion going on in splittable meta doc. We can
add it later if needed.

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Bharath Vissapragada 2020-08-25 15:09:03 -07:00 committed by GitHub
parent ebe321a99b
commit 01cf60067c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 734 additions and 203 deletions

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
/**
* Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
* uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
* By default the refresh happens periodically (configured via
* {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
* {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
* should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
*/
@InterfaceAudience.Private
public class MasterAddressRefresher implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.master_registry.refresh_interval_secs";
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.master_registry.min_secs_between_refreshes";
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
private final ExecutorService pool;
private final MasterRegistry registry;
private final long periodicRefreshMs;
private final long timeBetweenRefreshesMs;
private final Object refreshMasters = new Object();
@Override
public void close() {
pool.shutdownNow();
}
/**
* Thread that refreshes the master end points until it is interrupted via {@link #close()}.
* Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
*/
private class RefreshThread implements Runnable {
@Override
public void run() {
long lastRpcTs = 0;
while (!Thread.interrupted()) {
try {
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
// have duplicate refreshes because once the thread is past the wait(), notify()s are
// ignored until the thread is back to the waiting state.
synchronized (refreshMasters) {
refreshMasters.wait(periodicRefreshMs);
}
long currentTs = EnvironmentEdgeManager.currentTime();
if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
continue;
}
lastRpcTs = currentTs;
LOG.debug("Attempting to refresh master address end points.");
Set<ServerName> newMasters = new HashSet<>(registry.getMasters().get());
registry.populateMasterStubs(newMasters);
LOG.debug("Finished refreshing master end points. {}", newMasters);
} catch (InterruptedException e) {
LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
break;
} catch (ExecutionException | IOException e) {
LOG.debug("Error populating latest list of masters.", e);
}
}
LOG.info("Master end point refresher loop exited.");
}
}
MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(periodicRefreshMs > 0);
Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
this.registry = registry;
pool.submit(new RefreshThread());
}
/**
* Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
* See class comment for details.
*/
void refreshNow() {
synchronized (refreshMasters) {
refreshMasters.notify();
}
}
}

View File

@ -33,11 +33,13 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
@ -57,10 +59,11 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
@ -89,11 +92,14 @@ public class MasterRegistry implements ConnectionRegistry {
private final int hedgedReadFanOut;
// Configured list of masters to probe the meta information from.
private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;
protected final MasterAddressRefresher masterAddressRefresher;
/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
@ -115,18 +121,25 @@ public class MasterRegistry implements ConnectionRegistry {
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
Set<ServerName> masterAddrs = parseMasterAddrs(conf);
// Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
// by fetching the end points from this list.
populateMasterStubs(parseMasterAddrs(conf));
masterAddressRefresher = new MasterAddressRefresher(conf, this);
}
void populateMasterStubs(Set<ServerName> masters) throws IOException {
Preconditions.checkNotNull(masters);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(masterAddrs.size());
ImmutableMap.builderWithExpectedSize(masters.size());
User user = User.getCurrent();
for (ServerName masterAddr : masterAddrs) {
for (ServerName masterAddr : masters) {
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
@ -169,7 +182,13 @@ public class MasterRegistry implements ConnectionRegistry {
CompletableFuture<T> future = new CompletableFuture<>();
callable.call(controller, stub, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
IOException failureReason = controller.getFailed();
future.completeExceptionally(failureReason);
if (ClientExceptionsUtil.isConnectionException(failureReason)) {
// RPC has failed, trigger a refresh of master end points. We can have some spurious
// refreshes, but that is okay since the RPC is not expensive and not in a hot path.
masterAddressRefresher.refreshNow();
}
} else {
future.complete(resp);
}
@ -188,8 +207,9 @@ public class MasterRegistry implements ConnectionRegistry {
* been tried and all of them are failed, we will fail the future.
*/
private <T extends Message> void groupCall(CompletableFuture<T> future,
List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
ConcurrentLinkedQueue<Throwable> errors) {
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
@ -210,10 +230,10 @@ public class MasterRegistry implements ConnectionRegistry {
RetriesExhaustedException ex = new RetriesExhaustedException("masters",
masterStubs.size(), new ArrayList<>(errors));
future.completeExceptionally(
new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
new MasterRegistryFetchException(masterServers, ex));
} else {
groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
errors);
groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
isValidResp, debug, errors);
}
}
} else {
@ -226,17 +246,20 @@ public class MasterRegistry implements ConnectionRegistry {
private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
Predicate<T> isValidResp, String debug) {
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub;
Set<ServerName> masterServers = masterAddr2StubRef.keySet();
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values());
Collections.shuffle(masterStubs, ThreadLocalRandom.current());
CompletableFuture<T> future = new CompletableFuture<>();
groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
new ConcurrentLinkedQueue<>());
return future;
}
/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList()
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
@ -247,7 +270,7 @@ public class MasterRegistry implements ConnectionRegistry {
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
"getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
"getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
}
@Override
@ -259,17 +282,54 @@ public class MasterRegistry implements ConnectionRegistry {
.thenApply(GetClusterIdResponse::getClusterId);
}
private ServerName transformServerName(GetActiveMasterResponse resp) {
return ProtobufUtil.toServerName(resp.getServerName());
private static boolean hasActiveMaster(GetMastersResponse resp) {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
return activeMasters.size() == 1;
}
private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
if (activeMasters.size() != 1) {
throw new IOException(String.format("Incorrect number of active masters encountered." +
" Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters));
}
return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
}
@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> future = new CompletableFuture<>();
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
ServerName result = null;
try {
result = filterActiveMaster((GetMastersResponse)resp);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(result);
});
return future;
}
private static List<ServerName> transformServerNames(GetMastersResponse resp) {
return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
s.getServerName())).collect(Collectors.toList());
}
CompletableFuture<List<ServerName>> getMasters() {
System.out.println("getMasters()");
return this
.<GetActiveMasterResponse> call(
(c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
GetActiveMasterResponse::hasServerName, "getActiveMaster()")
.thenApply(this::transformServerName);
.<GetMastersResponse> call((c, s, d) -> s.getMasters(
c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
"getMasters()").thenApply(MasterRegistry::transformServerNames);
}
@VisibleForTesting
@ -279,6 +339,9 @@ public class MasterRegistry implements ConnectionRegistry {
@Override
public void close() {
if (masterAddressRefresher != null) {
masterAddressRefresher.close();
}
if (rpcClient != null) {
rpcClient.close();
}

View File

@ -58,6 +58,7 @@ public class MetricsConnection implements StatisticTrackable {
/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
private static final String CNT_BASE = "rpcCount_";
private static final String DRTN_BASE = "rpcCallDurationMs_";
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
@ -303,6 +304,8 @@ public class MetricsConnection implements StatisticTrackable {
LOAD_FACTOR, CONCURRENCY_LEVEL);
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
@VisibleForTesting protected final ConcurrentMap<String, Counter> rpcCounters =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
@ -434,8 +437,7 @@ public class MetricsConnection implements StatisticTrackable {
}
/** Update call stats for non-critical-path methods */
private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
final String methodName = method.getService().getName() + "_" + method.getName();
private void updateRpcGeneric(String methodName, CallStats stats) {
getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
@ -450,6 +452,9 @@ public class MetricsConnection implements StatisticTrackable {
if (callsPerServer > 0) {
concurrentCallsPerServerHist.update(callsPerServer);
}
// Update the counter that tracks RPCs by type.
final String methodName = method.getService().getName() + "_" + method.getName();
getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
// this implementation is tied directly to protobuf implementation details. would be better
// if we could dispatch based on something static, ie, request Message type.
if (method.getService() == ClientService.getDescriptor()) {
@ -511,7 +516,7 @@ public class MetricsConnection implements StatisticTrackable {
}
}
// Fallback to dynamic registry lookup for DDL methods.
updateRpcGeneric(method, stats);
updateRpcGeneric(methodName, stats);
}
public void incrCacheDroppingExceptions(Object exception) {

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.security.User;
@ -116,11 +115,20 @@ public class TestMasterRegistryHedgedReads {
}
}
/**
* A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
* errors. All other RPCs are ignored.
*/
public static final class RpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
if (!method.getName().equals("GetClusterId")) {
// On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
// fresh. We do not want to intercept those RPCs here and double count.
return;
}
// simulate the asynchronous behavior otherwise all logic will perform in the same thread...
EXECUTOR.execute(() -> {
int index = CALLED.getAndIncrement();
@ -129,7 +137,7 @@ public class TestMasterRegistryHedgedReads {
} else if (GOOD_RESP_INDEXS.contains(index)) {
done.run(RESP);
} else {
((HBaseRpcController) controller).setFailed("inject error");
controller.setFailed("inject error");
done.run(null);
}
});

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.RatioGauge.Ratio;
@ -117,6 +118,11 @@ public class TestMetricsConnection {
.build(),
MetricsConnection.newCallStats());
}
for (String method: new String[]{"Get", "Scan", "Mutate"}) {
final String metricKey = "rpcCount_" + ClientService.getDescriptor().getName() + "_" + method;
final long metricVal = METRICS.rpcCounters.get(metricKey).getCount();
assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop);
}
for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker,
METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker

View File

@ -1264,6 +1264,17 @@ message GetActiveMasterResponse {
optional ServerName server_name = 1;
}
/** Request and response to get the current list of all registers master servers */
message GetMastersRequest {
}
message GetMastersResponseEntry {
required ServerName server_name = 1;
required bool is_active = 2;
}
message GetMastersResponse {
repeated GetMastersResponseEntry master_servers = 1;
}
/** Request and response to get the current list of meta region locations */
message GetMetaRegionLocationsRequest {
}
@ -1273,7 +1284,8 @@ message GetMetaRegionLocationsResponse {
}
/**
* Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment.
* Implements all the RPCs needed by clients to look up cluster meta information needed for
* connection establishment.
*/
service ClientMetaService {
/**
@ -1282,10 +1294,16 @@ service ClientMetaService {
rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
/**
* Get active master server name for this cluster.
* Get active master server name for this cluster. Retained for out of sync client and master
* rolling upgrades. Newer clients switched to GetMasters RPC request.
*/
rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
/**
* Get registered list of master servers in this cluster.
*/
rpc GetMasters(GetMastersRequest) returns(GetMastersResponse);
/**
* Get current meta replicas' region locations.
*/

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Server;
@ -34,12 +36,14 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* Handles everything on master-side related to master election.
* Handles everything on master-side related to master election. Keeps track of
* currently active master and registered backup masters.
*
* <p>Listens and responds to ZooKeeper notifications on the master znode,
* <p>Listens and responds to ZooKeeper notifications on the master znodes,
* both <code>nodeCreated</code> and <code>nodeDeleted</code>.
*
* <p>Contains blocking methods which will hold up backup masters, waiting
@ -65,17 +69,22 @@ public class ActiveMasterManager extends ZKListener {
// notifications) and lazily fetched on-demand.
// ServerName is immutable, so we don't need heavy synchronization around it.
volatile ServerName activeMasterServerName;
// Registered backup masters. List is kept up to date based on ZK change notifications to
// backup znode.
private volatile ImmutableList<ServerName> backupMasters;
/**
* @param watcher ZK watcher
* @param sn ServerName
* @param master In an instance of a Master.
*/
ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master)
throws InterruptedIOException {
super(watcher);
watcher.registerListener(this);
this.sn = sn;
this.master = master;
updateBackupMasters();
}
// will be set after jetty server is started
@ -89,8 +98,18 @@ public class ActiveMasterManager extends ZKListener {
}
@Override
public void nodeDeleted(String path) {
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().backupMasterAddressesZNode)) {
try {
updateBackupMasters();
} catch (InterruptedIOException ioe) {
LOG.error("Error updating backup masters", ioe);
}
}
}
@Override
public void nodeDeleted(String path) {
// We need to keep track of the cluster's shutdown status while
// we wait on the current master. We consider that, if the cluster
// was already in a "shutdown" state when we started, that this master
@ -101,7 +120,6 @@ public class ActiveMasterManager extends ZKListener {
if(path.equals(watcher.getZNodePaths().clusterStateZNode) && !master.isStopped()) {
clusterShutDown.set(true);
}
handle(path);
}
@ -111,6 +129,11 @@ public class ActiveMasterManager extends ZKListener {
}
}
private void updateBackupMasters() throws InterruptedIOException {
backupMasters =
ImmutableList.copyOf(MasterAddressTracker.getBackupMastersAndRenewWatch(watcher));
}
/**
* Fetches the active master's ServerName from zookeeper.
*/
@ -318,4 +341,11 @@ public class ActiveMasterManager extends ZKListener {
e.getMessage()));
}
}
/**
* @return list of registered backup masters.
*/
public List<ServerName> getBackupMasters() {
return backupMasters;
}
}

View File

@ -91,7 +91,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.http.InfoServer;
@ -223,7 +222,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@ -235,8 +233,6 @@ import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@ -610,8 +606,8 @@ public class HMaster extends HRegionServer implements MasterServices {
* Protected to have custom implementations in tests override the default ActiveMaster
* implementation.
*/
protected ActiveMasterManager createActiveMasterManager(
ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
return new ActiveMasterManager(zk, sn, server);
}
@ -2731,51 +2727,8 @@ public class HMaster extends HRegionServer implements MasterServices {
return status;
}
private List<ServerName> getBackupMasters() throws InterruptedIOException {
// Build Set of backup masters from ZK nodes
List<String> backupMasterStrings;
try {
backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
this.zooKeeper.getZNodePaths().backupMasterAddressesZNode);
} catch (KeeperException e) {
LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
backupMasterStrings = null;
}
List<ServerName> backupMasters = Collections.emptyList();
if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
backupMasters = new ArrayList<>(backupMasterStrings.size());
for (String s: backupMasterStrings) {
try {
byte [] bytes;
try {
bytes = ZKUtil.getData(this.zooKeeper, ZNodePaths.joinZNode(
this.zooKeeper.getZNodePaths().backupMasterAddressesZNode, s));
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (bytes != null) {
ServerName sn;
try {
sn = ProtobufUtil.parseServerNameFrom(bytes);
} catch (DeserializationException e) {
LOG.warn("Failed parse, skipping registering backup server", e);
continue;
}
backupMasters.add(sn);
}
} catch (KeeperException e) {
LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
"backup servers"), e);
}
}
Collections.sort(backupMasters, new Comparator<ServerName>() {
@Override
public int compare(ServerName s1, ServerName s2) {
return s1.getServerName().compareTo(s2.getServerName());
}});
}
return backupMasters;
List<ServerName> getBackupMasters() {
return activeMasterManager.getBackupMasters();
}
/**

View File

@ -208,6 +208,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetComplet
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
@ -2958,6 +2961,22 @@ public class MasterRpcServices extends RSRpcServices implements
return resp.build();
}
@Override
public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
throws ServiceException {
GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
// Active master
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
// Backup masters
for (ServerName backupMaster: master.getBackupMasters()) {
resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
}
return resp.build();
}
@Override
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
GetMetaRegionLocationsRequest request) throws ServiceException {

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
@Category({ClientTests.class, SmallTests.class})
public class TestMasterAddressRefresher {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterAddressRefresher.class);
private class DummyMasterRegistry extends MasterRegistry {
private final AtomicInteger getMastersCallCounter = new AtomicInteger(0);
private final List<Long> callTimeStamps = new ArrayList<>();
DummyMasterRegistry(Configuration conf) throws IOException {
super(conf);
}
@Override
CompletableFuture<List<ServerName>> getMasters() {
getMastersCallCounter.incrementAndGet();
callTimeStamps.add(EnvironmentEdgeManager.currentTime());
return CompletableFuture.completedFuture(new ArrayList<>());
}
public int getMastersCount() {
return getMastersCallCounter.get();
}
public List<Long> getCallTimeStamps() {
return callTimeStamps;
}
}
@Test
public void testPeriodicMasterEndPointRefresh() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Refresh every 1 second.
conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, 1);
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
// Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
Waiter.waitFor(
conf, 5000, (Waiter.Predicate<Exception>) () -> registry.getMastersCount() > 3);
}
}
@Test
public void testDurationBetweenRefreshes() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Disable periodic refresh
conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, Integer.MAX_VALUE);
// A minimum duration of 1s between refreshes
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 1);
try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) {
// Issue a ton of manual refreshes.
for (int i = 0; i < 10000; i++) {
registry.masterAddressRefresher.refreshNow();
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
// Overall wait time is 10000 ms, so the number of requests should be <=10
List<Long> callTimeStamps = registry.getCallTimeStamps();
// Actual calls to getMasters() should be much lower than the refresh count.
Assert.assertTrue(
String.valueOf(registry.getMastersCount()), registry.getMastersCount() <= 20);
Assert.assertTrue(callTimeStamps.size() > 0);
// Verify that the delta between subsequent RPCs is at least 1sec as configured.
for (int i = 1; i < callTimeStamps.size() - 1; i++) {
long delta = callTimeStamps.get(i) - callTimeStamps.get(i - 1);
// Few ms cushion to account for any env jitter.
Assert.assertTrue(callTimeStamps.toString(), delta > 990);
}
}
}
}

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@ -26,6 +28,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -33,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -41,6 +45,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@Category({ MediumTests.class, ClientTests.class })
public class TestMasterRegistry {
@ -126,4 +131,51 @@ public class TestMasterRegistry {
}
}
}
/**
* Tests that the list of masters configured in the MasterRegistry is dynamically refreshed in the
* event of errors.
*/
@Test
public void testDynamicMasterConfigurationRefresh() throws Exception {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
String clusterId = activeMaster.getClusterId();
// Add a non-working master
ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + currentMasterAddrs);
// Set the hedging fan out so that all masters are queried.
conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
// Do not limit the number of refreshes during the test run.
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
try (MasterRegistry registry = new MasterRegistry(conf)) {
final Set<ServerName> masters = registry.getParsedMasterServers();
assertTrue(masters.contains(badServer));
// Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails.
assertEquals(registry.getClusterId().get(), clusterId);
// Wait for new set of masters to be populated.
TEST_UTIL.waitFor(5000,
(Waiter.Predicate<Exception>) () -> !registry.getParsedMasterServers().equals(masters));
// new set of masters should not include the bad server
final Set<ServerName> newMasters = registry.getParsedMasterServers();
// Bad one should be out.
assertEquals(3, newMasters.size());
assertFalse(newMasters.contains(badServer));
// Kill the active master
activeMaster.stopMaster();
TEST_UTIL.waitFor(10000,
() -> TEST_UTIL.getMiniHBaseCluster().getLiveMasterThreads().size() == 2);
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(10000);
// Wait until the killed master de-registered. This should also trigger another refresh.
TEST_UTIL.waitFor(10000, () -> registry.getMasters().get().size() == 2);
TEST_UTIL.waitFor(20000, () -> registry.getParsedMasterServers().size() == 2);
final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
assertEquals(2, newMasters2.size());
assertFalse(newMasters2.contains(activeMaster.getServerName()));
} finally {
// Reset the state, add a killed master.
TEST_UTIL.getMiniHBaseCluster().startMaster();
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@ -46,7 +47,8 @@ public class AlwaysStandByHMaster extends HMaster {
private static final Logger LOG =
LoggerFactory.getLogger(AlwaysStandByMasterManager.class);
AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master)
throws InterruptedIOException {
super(watcher, sn, master);
}
@ -94,8 +96,8 @@ public class AlwaysStandByHMaster extends HMaster {
super(conf);
}
protected ActiveMasterManager createActiveMasterManager(
ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
return new AlwaysStandByMasterManager(zk, sn, server);
}
}

View File

@ -23,6 +23,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -76,17 +80,18 @@ public class TestActiveMasterManager {
}
@Test public void testRestartMaster() throws IOException, KeeperException {
ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null, true);
try (ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null, true)) {
try {
ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode);
} catch(KeeperException.NoNodeException nne) {}
} catch (KeeperException.NoNodeException nne) {
}
// Create the master node with a dummy address
ServerName master = ServerName.valueOf("localhost", 1, System.currentTimeMillis());
// Should not have a master yet
DummyMaster dummyMaster = new DummyMaster(zk,master);
DummyMaster dummyMaster = new DummyMaster(zk, master);
ClusterStatusTracker clusterStatusTracker =
dummyMaster.getClusterStatusTracker();
ActiveMasterManager activeMasterManager =
@ -104,7 +109,7 @@ public class TestActiveMasterManager {
assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
// Now pretend master restart
DummyMaster secondDummyMaster = new DummyMaster(zk,master);
DummyMaster secondDummyMaster = new DummyMaster(zk, master);
ActiveMasterManager secondActiveMasterManager =
secondDummyMaster.getActiveMasterManager();
assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
@ -114,6 +119,7 @@ public class TestActiveMasterManager {
assertMaster(zk, activeMasterManager.getActiveMasterServerName().get());
assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName().get());
}
}
/**
* Unit tests that uses ZooKeeper but does not use the master-side methods
@ -122,12 +128,13 @@ public class TestActiveMasterManager {
*/
@Test
public void testActiveMasterManagerFromZK() throws Exception {
ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null, true);
try (ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null, true)) {
try {
ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
ZKUtil.deleteNode(zk, zk.getZNodePaths().clusterStateZNode);
} catch(KeeperException.NoNodeException nne) {}
} catch (KeeperException.NoNodeException nne) {
}
// Create the master node with a dummy address
ServerName firstMasterAddress =
@ -136,7 +143,7 @@ public class TestActiveMasterManager {
ServerName.valueOf("localhost", 2, System.currentTimeMillis());
// Should not have a master yet
DummyMaster ms1 = new DummyMaster(zk,firstMasterAddress);
DummyMaster ms1 = new DummyMaster(zk, firstMasterAddress);
ActiveMasterManager activeMasterManager =
ms1.getActiveMasterManager();
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
@ -158,7 +165,7 @@ public class TestActiveMasterManager {
// Wait for this guy to figure out there is another active master
// Wait for 1 second at most
int sleeps = 0;
while(!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
while (!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
Thread.sleep(10);
sleeps++;
}
@ -190,7 +197,7 @@ public class TestActiveMasterManager {
// Now we expect the secondary manager to have and be the active master
// Wait for 1 second at most
sleeps = 0;
while(!t.isActiveMaster && sleeps < 100) {
while (!t.isActiveMaster && sleeps < 100) {
Thread.sleep(10);
sleeps++;
}
@ -204,6 +211,40 @@ public class TestActiveMasterManager {
ZKUtil.deleteNode(zk, zk.getZNodePaths().masterAddressZNode);
}
}
@Test
public void testBackupMasterUpdates() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
try (ZKWatcher zk = new ZKWatcher(conf, "testBackupMasterUpdates", null, true)) {
ServerName sn1 = ServerName.valueOf("localhost", 1, -1);
DummyMaster master1 = new DummyMaster(zk, sn1);
ActiveMasterManager activeMasterManager = master1.getActiveMasterManager();
activeMasterManager.blockUntilBecomingActiveMaster(100,
Mockito.mock(MonitoredTask.class));
assertEquals(sn1, activeMasterManager.getActiveMasterServerName().get());
assertEquals(0, activeMasterManager.getBackupMasters().size());
// Add backup masters
List<String> backupZNodes = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
ServerName backupSn = ServerName.valueOf("localhost", 1000 + i, -1);
String backupZn = ZNodePaths.joinZNode(
zk.getZNodePaths().backupMasterAddressesZNode, backupSn.toString());
backupZNodes.add(backupZn);
MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234);
TEST_UTIL.waitFor(10000,
() -> activeMasterManager.getBackupMasters().size() == backupZNodes.size());
}
// Remove backup masters
int numBackups = backupZNodes.size();
for (String backupZNode: backupZNodes) {
ZKUtil.deleteNode(zk, backupZNode);
final int currentBackups = --numBackups;
TEST_UTIL.waitFor(10000,
() -> activeMasterManager.getBackupMasters().size() == currentBackups);
}
}
}
/**
* Assert there is an active master and that it has the specified address.
@ -212,12 +253,11 @@ public class TestActiveMasterManager {
* @throws KeeperException unexpected Zookeeper exception
* @throws IOException if an IO problem is encountered
*/
private void assertMaster(ZKWatcher zk,
ServerName expectedAddress)
throws KeeperException, IOException {
private void assertMaster(ZKWatcher zk, ServerName expectedAddress) throws
KeeperException, IOException {
ServerName readAddress = MasterAddressTracker.getMasterAddress(zk);
assertNotNull(readAddress);
assertTrue(expectedAddress.equals(readAddress));
assertEquals(expectedAddress, readAddress);
}
public static class WaitToBeMasterThread extends Thread {
@ -226,7 +266,7 @@ public class TestActiveMasterManager {
DummyMaster dummyMaster;
boolean isActiveMaster;
public WaitToBeMasterThread(ZKWatcher zk, ServerName address) {
public WaitToBeMasterThread(ZKWatcher zk, ServerName address) throws InterruptedIOException {
this.dummyMaster = new DummyMaster(zk,address);
this.manager = this.dummyMaster.getActiveMasterManager();
isActiveMaster = false;
@ -274,7 +314,7 @@ public class TestActiveMasterManager {
private ClusterStatusTracker clusterStatusTracker;
private ActiveMasterManager activeMasterManager;
public DummyMaster(ZKWatcher zk, ServerName master) {
public DummyMaster(ZKWatcher zk, ServerName master) throws InterruptedIOException {
this.clusterStatusTracker =
new ClusterStatusTracker(zk, this);
clusterStatusTracker.start();

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -32,6 +33,8 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -52,10 +55,19 @@ public class TestMasterAddressTracker {
private static final Logger LOG = LoggerFactory.getLogger(TestMasterAddressTracker.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
// Cleaned up after each unit test.
private static ZKWatcher zk;
@Rule
public TestName name = new TestName();
@After
public void cleanUp() {
if (zk != null) {
zk.close();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
@ -86,9 +98,10 @@ public class TestMasterAddressTracker {
*/
private MasterAddressTracker setupMasterTracker(final ServerName sn, final int infoPort)
throws Exception {
ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
zk = new ZKWatcher(TEST_UTIL.getConfiguration(),
name.getMethodName(), null);
ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().baseZNode);
ZKUtil.createAndFailSilent(zk, zk.getZNodePaths().backupMasterAddressesZNode);
// Should not have a master yet
MasterAddressTracker addressTracker = new MasterAddressTracker(zk, null);
@ -164,6 +177,31 @@ public class TestMasterAddressTracker {
assertEquals("Should receive 0 for backup not found.", 0, addressTracker.getMasterInfoPort());
}
@Test
public void testBackupMasters() throws Exception {
final ServerName sn = ServerName.valueOf("localhost", 5678, System.currentTimeMillis());
final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1111);
assertTrue(addressTracker.hasMaster());
ServerName activeMaster = addressTracker.getMasterAddress();
assertEquals(activeMaster, sn);
// No current backup masters
List<ServerName> backupMasters = addressTracker.getBackupMasters();
assertEquals(0, backupMasters.size());
ServerName backupMaster1 = ServerName.valueOf("localhost", 2222, -1);
ServerName backupMaster2 = ServerName.valueOf("localhost", 3333, -1);
String backupZNode1 = ZNodePaths.joinZNode(
zk.getZNodePaths().backupMasterAddressesZNode, backupMaster1.toString());
String backupZNode2 = ZNodePaths.joinZNode(
zk.getZNodePaths().backupMasterAddressesZNode, backupMaster2.toString());
// Add a backup master
MasterAddressTracker.setMasterAddress(zk, backupZNode1, backupMaster1, 2222);
MasterAddressTracker.setMasterAddress(zk, backupZNode2, backupMaster2, 3333);
backupMasters = addressTracker.getBackupMasters();
assertEquals(2, backupMasters.size());
assertTrue(backupMasters.contains(backupMaster1));
assertTrue(backupMasters.contains(backupMaster2));
}
public static class NodeCreationListener extends ZKListener {
private static final Logger LOG = LoggerFactory.getLogger(NodeCreationListener.class);

View File

@ -20,6 +20,10 @@ package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
@ -278,4 +282,58 @@ public class MasterAddressTracker extends ZKNodeTracker {
return false;
}
public List<ServerName> getBackupMasters() throws InterruptedIOException {
return getBackupMastersAndRenewWatch(watcher);
}
/**
* Retrieves the list of registered backup masters and renews a watch on the znode for children
* updates.
* @param zkw Zookeeper watcher to use
* @return List of backup masters.
* @throws InterruptedIOException if there is any issue fetching the required data from Zookeeper.
*/
public static List<ServerName> getBackupMastersAndRenewWatch(
ZKWatcher zkw) throws InterruptedIOException {
// Build Set of backup masters from ZK nodes
List<String> backupMasterStrings = Collections.emptyList();
try {
backupMasterStrings = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
zkw.getZNodePaths().backupMasterAddressesZNode);
} catch (KeeperException e) {
LOG.warn(zkw.prefix("Unable to list backup servers"), e);
}
List<ServerName> backupMasters = Collections.emptyList();
if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
backupMasters = new ArrayList<>(backupMasterStrings.size());
for (String s: backupMasterStrings) {
try {
byte [] bytes;
try {
bytes = ZKUtil.getData(zkw, ZNodePaths.joinZNode(
zkw.getZNodePaths().backupMasterAddressesZNode, s));
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (bytes != null) {
ServerName sn;
try {
sn = ProtobufUtil.parseServerNameFrom(bytes);
} catch (DeserializationException e) {
LOG.warn("Failed parse, skipping registering backup server", e);
continue;
}
backupMasters.add(sn);
}
} catch (KeeperException e) {
LOG.warn(zkw.prefix("Unable to get information about " +
"backup servers"), e);
}
}
backupMasters.sort(Comparator.comparing(ServerName::getServerName));
}
return backupMasters;
}
}