HBASE-24765: Dynamic master discovery (#2314)
This patch adds the ability to discover newly added masters dynamically on the master registry side. The trigger for the re-fetch is either periodic (5 mins) or any registry RPC failure. Master server information is cached in masters to avoid repeated ZK lookups. Updates the client side connection metrics to maintain a counter per RPC type so that clients have visibility into counts grouped by RPC method name. I didn't add the method to ZK registry interface since there is a design discussion going on in splittable meta doc. We can add it later if needed. Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org> (cherry picked from commit 275a38e1533eafa1d4bd1d50c13bcecd9a397ea8) (cherry picked from commit bb9121da77c7b881a3cc4c389029a610fc2b0925)
This commit is contained in:
parent
ebe9e68274
commit
3e1450d8b3
@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
|
||||
/**
|
||||
* Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
|
||||
* uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
|
||||
* By default the refresh happens periodically (configured via
|
||||
* {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
|
||||
* {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
|
||||
* should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MasterAddressRefresher implements Closeable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
|
||||
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
|
||||
"hbase.client.master_registry.refresh_interval_secs";
|
||||
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
|
||||
public static final String MIN_SECS_BETWEEN_REFRESHES =
|
||||
"hbase.client.master_registry.min_secs_between_refreshes";
|
||||
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
|
||||
|
||||
private final ExecutorService pool;
|
||||
private final MasterRegistry registry;
|
||||
private final long periodicRefreshMs;
|
||||
private final long timeBetweenRefreshesMs;
|
||||
private final Object refreshMasters = new Object();
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
pool.shutdownNow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread that refreshes the master end points until it is interrupted via {@link #close()}.
|
||||
* Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
|
||||
*/
|
||||
private class RefreshThread implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
long lastRpcTs = 0;
|
||||
while (!Thread.interrupted()) {
|
||||
try {
|
||||
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
|
||||
// have duplicate refreshes because once the thread is past the wait(), notify()s are
|
||||
// ignored until the thread is back to the waiting state.
|
||||
synchronized (refreshMasters) {
|
||||
refreshMasters.wait(periodicRefreshMs);
|
||||
}
|
||||
long currentTs = EnvironmentEdgeManager.currentTime();
|
||||
if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
|
||||
continue;
|
||||
}
|
||||
lastRpcTs = currentTs;
|
||||
LOG.debug("Attempting to refresh master address end points.");
|
||||
Set<ServerName> newMasters = new HashSet<>(registry.getMasters());
|
||||
registry.populateMasterStubs(newMasters);
|
||||
LOG.debug("Finished refreshing master end points. {}", newMasters);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Error populating latest list of masters.", e);
|
||||
}
|
||||
}
|
||||
LOG.info("Master end point refresher loop exited.");
|
||||
}
|
||||
}
|
||||
|
||||
MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
|
||||
pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
||||
.setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
|
||||
periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
|
||||
PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
|
||||
timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
|
||||
MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
|
||||
Preconditions.checkArgument(periodicRefreshMs > 0);
|
||||
Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
|
||||
this.registry = registry;
|
||||
pool.submit(new RefreshThread());
|
||||
}
|
||||
|
||||
/**
|
||||
* Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
|
||||
* See class comment for details.
|
||||
*/
|
||||
void refreshNow() {
|
||||
synchronized (refreshMasters) {
|
||||
refreshMasters.notify();
|
||||
}
|
||||
}
|
||||
}
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.util.DNS.getMasterHostname;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.RpcController;
|
||||
@ -40,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
|
||||
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
|
||||
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
@ -51,10 +53,11 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetActiveMasterRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetActiveMasterResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMastersRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMastersResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMastersResponseEntry;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNumLiveRSRequest;
|
||||
@ -69,13 +72,15 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNumLiveRSRespo
|
||||
public class MasterRegistry implements ConnectionRegistry {
|
||||
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
|
||||
|
||||
private ImmutableMap<String, ClientMetaService.Interface> masterAddr2Stub;
|
||||
private volatile ImmutableMap<String, ClientMetaService.Interface> masterAddr2Stub;
|
||||
|
||||
// RPC client used to talk to the masters.
|
||||
private RpcClient rpcClient;
|
||||
private RpcControllerFactory rpcControllerFactory;
|
||||
private int rpcTimeoutMs;
|
||||
|
||||
protected MasterAddressRefresher masterAddressRefresher;
|
||||
|
||||
@Override
|
||||
public void init(Connection connection) throws IOException {
|
||||
Configuration conf = connection.getConfiguration();
|
||||
@ -87,13 +92,15 @@ public class MasterRegistry implements ConnectionRegistry {
|
||||
rpcClient = RpcClientFactory.createClient(conf, null);
|
||||
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
|
||||
populateMasterStubs(parseMasterAddrs(conf));
|
||||
masterAddressRefresher = new MasterAddressRefresher(conf, this);
|
||||
}
|
||||
|
||||
private interface Callable <T extends Message> {
|
||||
protected interface Callable <T extends Message> {
|
||||
T call(ClientMetaService.Interface stub, RpcController controller) throws IOException;
|
||||
}
|
||||
|
||||
private <T extends Message> T doCall(Callable<T> callable) throws MasterRegistryFetchException {
|
||||
protected <T extends Message> T doCall(Callable<T> callable)
|
||||
throws MasterRegistryFetchException {
|
||||
Exception lastException = null;
|
||||
Set<String> masters = masterAddr2Stub.keySet();
|
||||
List<ClientMetaService.Interface> stubs = new ArrayList<>(masterAddr2Stub.values());
|
||||
@ -102,14 +109,16 @@ public class MasterRegistry implements ConnectionRegistry {
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
try {
|
||||
T resp = callable.call(stub, controller);
|
||||
if (controller.failed()) {
|
||||
lastException = controller.getFailed();
|
||||
continue;
|
||||
if (!controller.failed()) {
|
||||
return resp;
|
||||
}
|
||||
return resp;
|
||||
lastException = controller.getFailed();
|
||||
} catch (Exception e) {
|
||||
lastException = e;
|
||||
}
|
||||
if (ClientExceptionsUtil.isConnectionException(lastException)) {
|
||||
masterAddressRefresher.refreshNow();
|
||||
}
|
||||
}
|
||||
// rpcs to all masters failed.
|
||||
throw new MasterRegistryFetchException(masters, lastException);
|
||||
@ -117,19 +126,37 @@ public class MasterRegistry implements ConnectionRegistry {
|
||||
|
||||
@Override
|
||||
public ServerName getActiveMaster() throws IOException {
|
||||
GetActiveMasterResponse resp = doCall(new Callable<GetActiveMasterResponse>() {
|
||||
GetMastersResponseEntry activeMaster = null;
|
||||
for (GetMastersResponseEntry entry: getMastersInternal().getMasterServersList()) {
|
||||
if (entry.getIsActive()) {
|
||||
activeMaster = entry;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (activeMaster == null) {
|
||||
throw new HBaseIOException("No active master found");
|
||||
}
|
||||
return ProtobufUtil.toServerName(activeMaster.getServerName());
|
||||
}
|
||||
|
||||
List<ServerName> getMasters() throws IOException {
|
||||
List<ServerName> result = new ArrayList<>();
|
||||
for (GetMastersResponseEntry entry: getMastersInternal().getMasterServersList()) {
|
||||
result.add(ProtobufUtil.toServerName(entry.getServerName()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private GetMastersResponse getMastersInternal() throws IOException {
|
||||
return doCall(new Callable<GetMastersResponse>() {
|
||||
@Override
|
||||
public GetActiveMasterResponse call(
|
||||
public GetMastersResponse call(
|
||||
ClientMetaService.Interface stub, RpcController controller) throws IOException {
|
||||
BlockingRpcCallback<GetActiveMasterResponse> cb = new BlockingRpcCallback<>();
|
||||
stub.getActiveMaster(controller, GetActiveMasterRequest.getDefaultInstance(), cb);
|
||||
BlockingRpcCallback<GetMastersResponse> cb = new BlockingRpcCallback<>();
|
||||
stub.getMasters(controller, GetMastersRequest.getDefaultInstance(), cb);
|
||||
return cb.get();
|
||||
}
|
||||
});
|
||||
if (!resp.hasServerName() || resp.getServerName() == null) {
|
||||
throw new HBaseIOException("No active master found");
|
||||
}
|
||||
return ProtobufUtil.toServerName(resp.getServerName());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -230,4 +257,10 @@ public class MasterRegistry implements ConnectionRegistry {
|
||||
}
|
||||
masterAddr2Stub = builder.build();
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
ImmutableSet<String> getParsedMasterServers() {
|
||||
return masterAddr2Stub.keySet();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -56,6 +56,7 @@ public class MetricsConnection implements StatisticTrackable {
|
||||
/** Set this key to {@code true} to enable metrics collection of client requests. */
|
||||
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
|
||||
|
||||
private static final String CNT_BASE = "rpcCount_";
|
||||
private static final String DRTN_BASE = "rpcCallDurationMs_";
|
||||
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
|
||||
private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
|
||||
@ -303,6 +304,8 @@ public class MetricsConnection implements StatisticTrackable {
|
||||
LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
|
||||
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
@VisibleForTesting protected final ConcurrentMap<String, Counter> rpcCounters =
|
||||
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
|
||||
|
||||
public MetricsConnection(final ConnectionManager.HConnectionImplementation conn) {
|
||||
this.scope = conn.toString();
|
||||
@ -450,8 +453,7 @@ public class MetricsConnection implements StatisticTrackable {
|
||||
}
|
||||
|
||||
/** Update call stats for non-critical-path methods */
|
||||
private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
|
||||
final String methodName = method.getService().getName() + "_" + method.getName();
|
||||
private void updateRpcGeneric(String methodName, CallStats stats) {
|
||||
getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
|
||||
.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
|
||||
getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
|
||||
@ -466,6 +468,9 @@ public class MetricsConnection implements StatisticTrackable {
|
||||
if (callsPerServer > 0) {
|
||||
concurrentCallsPerServerHist.update(callsPerServer);
|
||||
}
|
||||
// Update the counter that tracks RPCs by type.
|
||||
final String methodName = method.getService().getName() + "_" + method.getName();
|
||||
getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
|
||||
// this implementation is tied directly to protobuf implementation details. would be better
|
||||
// if we could dispatch based on something static, ie, request Message type.
|
||||
if (method.getService() == ClientService.getDescriptor()) {
|
||||
@ -518,7 +523,7 @@ public class MetricsConnection implements StatisticTrackable {
|
||||
}
|
||||
}
|
||||
// Fallback to dynamic registry lookup for DDL methods.
|
||||
updateRpcGeneric(method, stats);
|
||||
updateRpcGeneric(methodName, stats);
|
||||
}
|
||||
|
||||
public void incrCacheDroppingExceptions(Object exception) {
|
||||
|
@ -17,6 +17,10 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
@ -67,6 +71,57 @@ public class MasterAddressTracker extends ZooKeeperNodeTracker {
|
||||
super(watcher, watcher.getMasterAddressZNode(), abortable);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param watcher ZooKeeperWatcher instance to use for querying ZK.
|
||||
* @return current list of backup masters.
|
||||
*/
|
||||
public static List<ServerName> getBackupMastersAndRenewWatch(
|
||||
ZooKeeperWatcher watcher) {
|
||||
// Build Set of backup masters from ZK nodes
|
||||
List<String> backupMasterStrings;
|
||||
try {
|
||||
backupMasterStrings = ZKUtil.listChildrenAndWatchForNewChildren(
|
||||
watcher, watcher.backupMasterAddressesZNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn(watcher.prefix("Unable to list backup servers"), e);
|
||||
backupMasterStrings = null;
|
||||
}
|
||||
|
||||
List<ServerName> backupMasters = new ArrayList<>();
|
||||
if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
|
||||
for (String s: backupMasterStrings) {
|
||||
try {
|
||||
byte [] bytes;
|
||||
try {
|
||||
bytes = ZKUtil.getData(watcher, ZKUtil.joinZNode(
|
||||
watcher.backupMasterAddressesZNode, s));
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("Thread interrupted.");
|
||||
}
|
||||
if (bytes != null) {
|
||||
ServerName sn;
|
||||
try {
|
||||
sn = ServerName.parseFrom(bytes);
|
||||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed parse, skipping registering backup server", e);
|
||||
continue;
|
||||
}
|
||||
backupMasters.add(sn);
|
||||
}
|
||||
} catch (KeeperException | InterruptedIOException e) {
|
||||
LOG.warn(watcher.prefix("Unable to get information about " +
|
||||
"backup servers"), e);
|
||||
}
|
||||
}
|
||||
Collections.sort(backupMasters, new Comparator<ServerName>() {
|
||||
@Override
|
||||
public int compare(ServerName s1, ServerName s2) {
|
||||
return s1.getServerName().compareTo(s2.getServerName());
|
||||
}});
|
||||
}
|
||||
return backupMasters;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the address of the current master if one is available. Returns null
|
||||
* if no current master.
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.yammer.metrics.util.RatioGauge;
|
||||
@ -129,6 +130,11 @@ public class TestMetricsConnection {
|
||||
.build(),
|
||||
MetricsConnection.newCallStats());
|
||||
}
|
||||
for (String method: new String[]{"Get", "Scan", "Mutate"}) {
|
||||
final String metricKey = "rpcCount_" + ClientService.getDescriptor().getName() + "_" + method;
|
||||
final long metricVal = METRICS.rpcCounters.get(metricKey).count();
|
||||
assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop);
|
||||
}
|
||||
for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
|
||||
METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker,
|
||||
METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -922,12 +922,15 @@ message GetClusterIdResponse {
|
||||
optional string cluster_id = 1;
|
||||
}
|
||||
|
||||
/** Request and response to get the currently active master name for this cluster */
|
||||
message GetActiveMasterRequest {
|
||||
/** Request and response to get the current list of all registers master servers */
|
||||
message GetMastersRequest {
|
||||
}
|
||||
message GetActiveMasterResponse {
|
||||
/** Not set if an active master could not be determined. */
|
||||
optional ServerName server_name = 1;
|
||||
message GetMastersResponseEntry {
|
||||
required ServerName server_name = 1;
|
||||
required bool is_active = 2;
|
||||
}
|
||||
message GetMastersResponse {
|
||||
repeated GetMastersResponseEntry master_servers = 1;
|
||||
}
|
||||
|
||||
/** Request and response to get the current list of meta region locations */
|
||||
@ -955,9 +958,10 @@ service ClientMetaService {
|
||||
rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);
|
||||
|
||||
/**
|
||||
* Get active master server name for this cluster.
|
||||
* Get registered list of master servers in this cluster. List includes both active and backup
|
||||
* masters.
|
||||
*/
|
||||
rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);
|
||||
rpc GetMasters(GetMastersRequest) returns(GetMastersResponse);
|
||||
|
||||
/**
|
||||
* Get current meta replicas' region locations.
|
||||
|
@ -17,7 +17,10 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -35,9 +38,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Handles everything on master-side related to master election.
|
||||
* Handles everything on master-side related to master election. Keeps track of
|
||||
* currently active master and registered backup masters.
|
||||
*
|
||||
* <p>Listens and responds to ZooKeeper notifications on the master znode,
|
||||
* <p>Listens and responds to ZooKeeper notifications on the master znodes,
|
||||
* both <code>nodeCreated</code> and <code>nodeDeleted</code>.
|
||||
*
|
||||
* <p>Contains blocking methods which will hold up backup masters, waiting
|
||||
@ -64,18 +68,23 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
||||
// Active master's server name. Invalidated anytime active master changes (based on ZK
|
||||
// notifications) and lazily fetched on-demand.
|
||||
// ServerName is immutable, so we don't need heavy synchronization around it.
|
||||
private volatile ServerName activeMasterServerName;
|
||||
volatile ServerName activeMasterServerName;
|
||||
// Registered backup masters. List is kept up to date based on ZK change notifications to
|
||||
// backup znode.
|
||||
private volatile ImmutableList<ServerName> backupMasters;
|
||||
|
||||
/**
|
||||
* @param watcher ZK watcher
|
||||
* @param sn ServerName
|
||||
* @param master In an instance of a Master.
|
||||
*/
|
||||
ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, Server master) {
|
||||
ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, Server master)
|
||||
throws InterruptedIOException {
|
||||
super(watcher);
|
||||
watcher.registerListener(this);
|
||||
this.sn = sn;
|
||||
this.master = master;
|
||||
updateBackupMasters();
|
||||
}
|
||||
|
||||
// will be set after jetty server is started
|
||||
@ -89,8 +98,18 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDeleted(String path) {
|
||||
public void nodeChildrenChanged(String path) {
|
||||
if (path.equals(watcher.backupMasterAddressesZNode)) {
|
||||
try {
|
||||
updateBackupMasters();
|
||||
} catch (InterruptedIOException ioe) {
|
||||
LOG.error("Error updating backup masters", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void nodeDeleted(String path) {
|
||||
// We need to keep track of the cluster's shutdown status while
|
||||
// we wait on the current master. We consider that, if the cluster
|
||||
// was already in a "shutdown" state when we started, that this master
|
||||
@ -101,7 +120,6 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
||||
if(path.equals(watcher.clusterStateZNode) && !master.isStopped()) {
|
||||
clusterShutDown.set(true);
|
||||
}
|
||||
|
||||
handle(path);
|
||||
}
|
||||
|
||||
@ -111,6 +129,11 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateBackupMasters() throws InterruptedIOException {
|
||||
backupMasters =
|
||||
ImmutableList.copyOf(MasterAddressTracker.getBackupMastersAndRenewWatch(watcher));
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the active master's ServerName from zookeeper.
|
||||
*/
|
||||
@ -320,4 +343,11 @@ public class ActiveMasterManager extends ZooKeeperListener {
|
||||
LOG.error(this.watcher.prefix("Error deleting our own master address node"), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return list of registered backup masters.
|
||||
*/
|
||||
public List<ServerName> getBackupMasters() {
|
||||
return backupMasters;
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Service;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
@ -27,7 +29,6 @@ import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
@ -86,7 +87,6 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
import org.apache.hadoop.hbase.http.InfoServer;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
@ -189,12 +189,8 @@ import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.Service;
|
||||
|
||||
/**
|
||||
* HMaster is the "master server" for HBase. An HBase cluster has one active
|
||||
* master. If many masters are started, all compete. Whichever wins goes on to
|
||||
* run the cluster. All others park themselves in their constructor until
|
||||
* master or cluster shutdown or until the active master loses its lease in
|
||||
* zookeeper. Thereafter, all running master jostle to take over master role.
|
||||
@ -2548,56 +2544,14 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||
*/
|
||||
public ClusterStatus getClusterStatusWithoutCoprocessor() throws InterruptedIOException {
|
||||
// Build Set of backup masters from ZK nodes
|
||||
List<String> backupMasterStrings;
|
||||
try {
|
||||
backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
|
||||
this.zooKeeper.backupMasterAddressesZNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
|
||||
backupMasterStrings = null;
|
||||
}
|
||||
|
||||
List<ServerName> backupMasters = null;
|
||||
if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
|
||||
backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
|
||||
for (String s: backupMasterStrings) {
|
||||
try {
|
||||
byte [] bytes;
|
||||
try {
|
||||
bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
|
||||
this.zooKeeper.backupMasterAddressesZNode, s));
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
if (bytes != null) {
|
||||
ServerName sn;
|
||||
try {
|
||||
sn = ServerName.parseFrom(bytes);
|
||||
} catch (DeserializationException e) {
|
||||
LOG.warn("Failed parse, skipping registering backup server", e);
|
||||
continue;
|
||||
}
|
||||
backupMasters.add(sn);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
|
||||
"backup servers"), e);
|
||||
}
|
||||
}
|
||||
Collections.sort(backupMasters, new Comparator<ServerName>() {
|
||||
@Override
|
||||
public int compare(ServerName s1, ServerName s2) {
|
||||
return s1.getServerName().compareTo(s2.getServerName());
|
||||
}});
|
||||
}
|
||||
|
||||
List<ServerName> backupMasters = getBackupMasters();
|
||||
String clusterId = fileSystemManager != null ?
|
||||
fileSystemManager.getClusterId().toString() : null;
|
||||
fileSystemManager.getClusterId().toString() : null;
|
||||
Set<RegionState> regionsInTransition = assignmentManager != null ?
|
||||
assignmentManager.getRegionStates().getRegionsInTransition() : null;
|
||||
assignmentManager.getRegionStates().getRegionsInTransition() : null;
|
||||
String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
|
||||
boolean balancerOn = loadBalancerTracker != null ?
|
||||
loadBalancerTracker.isBalancerOn() : false;
|
||||
loadBalancerTracker.isBalancerOn() : false;
|
||||
Map<ServerName, ServerLoad> onlineServers = null;
|
||||
Set<ServerName> deadServers = null;
|
||||
if (serverManager != null) {
|
||||
@ -2605,8 +2559,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||
onlineServers = serverManager.getOnlineServers();
|
||||
}
|
||||
return new ClusterStatus(VersionInfo.getVersion(), clusterId,
|
||||
onlineServers, deadServers, serverName, backupMasters,
|
||||
regionsInTransition, coprocessors, balancerOn);
|
||||
onlineServers, deadServers, serverName, backupMasters,
|
||||
regionsInTransition, coprocessors, balancerOn);
|
||||
}
|
||||
|
||||
List<ServerName> getBackupMasters() {
|
||||
return activeMasterManager.getBackupMasters();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,14 +97,15 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableReques
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetActiveMasterRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetActiveMasterResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMastersRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMastersResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMastersResponseEntry;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
|
||||
@ -1804,12 +1805,19 @@ public class MasterRpcServices extends RSRpcServices
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
|
||||
GetActiveMasterRequest request) throws ServiceException {
|
||||
GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder();
|
||||
public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
|
||||
throws ServiceException {
|
||||
GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
|
||||
// Active master
|
||||
ServerName serverName = master.getActiveMaster();
|
||||
if (serverName != null) {
|
||||
resp.setServerName(ProtobufUtil.toServerName(serverName));
|
||||
resp.addMasterServers(GetMastersResponseEntry.newBuilder()
|
||||
.setServerName(ProtobufUtil.toServerName(serverName)).setIsActive(true).build());
|
||||
}
|
||||
// Backup masters
|
||||
for (ServerName backupMaster: master.getBackupMasters()) {
|
||||
resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
|
||||
ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
|
||||
}
|
||||
return resp.build();
|
||||
}
|
||||
|
@ -0,0 +1,176 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ClientTests.class, SmallTests.class})
|
||||
public class TestMasterAddressRefresher {
|
||||
|
||||
static class DummyConnection implements Connection {
|
||||
private final Configuration conf;
|
||||
|
||||
DummyConnection(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table getTable(TableName tableName) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Admin getAdmin() throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static class DummyMasterRegistry extends MasterRegistry {
|
||||
|
||||
private final AtomicInteger getMastersCallCounter = new AtomicInteger(0);
|
||||
private final List<Long> callTimeStamps = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void init(Connection connection) throws IOException {
|
||||
super.init(connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
List<ServerName> getMasters() {
|
||||
getMastersCallCounter.incrementAndGet();
|
||||
callTimeStamps.add(EnvironmentEdgeManager.currentTime());
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
public int getMastersCount() {
|
||||
return getMastersCallCounter.get();
|
||||
}
|
||||
|
||||
public List<Long> getCallTimeStamps() {
|
||||
return callTimeStamps;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPeriodicMasterEndPointRefresh() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// Refresh every 1 second.
|
||||
conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, 1);
|
||||
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
|
||||
final DummyMasterRegistry registry = new DummyMasterRegistry();
|
||||
registry.init(new DummyConnection(conf));
|
||||
// Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made.
|
||||
Waiter.waitFor(
|
||||
conf, 5000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return registry.getMastersCount() > 3;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDurationBetweenRefreshes() throws IOException {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// Disable periodic refresh
|
||||
conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, Integer.MAX_VALUE);
|
||||
// A minimum duration of 1s between refreshes
|
||||
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 1);
|
||||
DummyMasterRegistry registry = new DummyMasterRegistry();
|
||||
registry.init(new DummyConnection(conf));
|
||||
// Issue a ton of manual refreshes.
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
registry.masterAddressRefresher.refreshNow();
|
||||
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
// Overall wait time is 10000 ms, so the number of requests should be <=10
|
||||
List<Long> callTimeStamps = registry.getCallTimeStamps();
|
||||
// Actual calls to getMasters() should be much lower than the refresh count.
|
||||
Assert.assertTrue(
|
||||
String.valueOf(registry.getMastersCount()), registry.getMastersCount() <= 20);
|
||||
Assert.assertTrue(callTimeStamps.size() > 0);
|
||||
// Verify that the delta between subsequent RPCs is at least 1sec as configured.
|
||||
for (int i = 1; i < callTimeStamps.size() - 1; i++) {
|
||||
long delta = callTimeStamps.get(i) - callTimeStamps.get(i - 1);
|
||||
// Few ms cushion to account for any env jitter.
|
||||
Assert.assertTrue(callTimeStamps.toString(), delta > 990);
|
||||
}
|
||||
}
|
||||
}
|
@ -19,9 +19,15 @@ package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.protobuf.RpcController;
|
||||
import java.io.IOException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@ -33,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
@ -41,6 +48,10 @@ import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
|
||||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestMasterRegistry {
|
||||
|
||||
@ -59,6 +70,20 @@ public class TestMasterRegistry {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private static class ExceptionInjectorRegistry extends MasterRegistry {
|
||||
@Override
|
||||
public String getClusterId() throws IOException {
|
||||
GetClusterIdResponse resp = doCall(new Callable<GetClusterIdResponse>() {
|
||||
@Override
|
||||
public GetClusterIdResponse call(ClientMetaService.Interface stub, RpcController controller)
|
||||
throws IOException {
|
||||
throw new SocketTimeoutException("Injected exception.");
|
||||
}
|
||||
});
|
||||
return resp.getClusterId();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a string of dummy master addresses in host:port format. Every other hostname won't
|
||||
* have a port number.
|
||||
@ -130,4 +155,82 @@ public class TestMasterRegistry {
|
||||
registry.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the list of masters configured in the MasterRegistry is dynamically refreshed in the
|
||||
* event of errors.
|
||||
*/
|
||||
@Test
|
||||
public void testDynamicMasterConfigurationRefresh() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConnection().getConfiguration();
|
||||
String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY));
|
||||
HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
|
||||
// Add a non-working master
|
||||
ServerName badServer = ServerName.valueOf("localhost", 1234, -1);
|
||||
conf.set(HConstants.MASTER_ADDRS_KEY, badServer.toShortString() + "," + currentMasterAddrs);
|
||||
// Do not limit the number of refreshes during the test run.
|
||||
conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0);
|
||||
final ExceptionInjectorRegistry registry = new ExceptionInjectorRegistry();
|
||||
try {
|
||||
registry.init(TEST_UTIL.getConnection());
|
||||
final ImmutableSet<String> masters = registry.getParsedMasterServers();
|
||||
assertTrue(masters.contains(badServer.toString()));
|
||||
// Make a registry RPC, this should trigger a refresh since one of the RPC fails.
|
||||
try {
|
||||
registry.getClusterId();
|
||||
} catch (MasterRegistryFetchException e) {
|
||||
// Expected.
|
||||
}
|
||||
|
||||
// Wait for new set of masters to be populated.
|
||||
TEST_UTIL.waitFor(5000,
|
||||
new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return !registry.getParsedMasterServers().equals(masters);
|
||||
}
|
||||
});
|
||||
// new set of masters should not include the bad server
|
||||
final ImmutableSet<String> newMasters = registry.getParsedMasterServers();
|
||||
// Bad one should be out.
|
||||
assertEquals(3, newMasters.size());
|
||||
assertFalse(newMasters.contains(badServer.toString()));
|
||||
// Kill the active master
|
||||
activeMaster.stopMaster();
|
||||
TEST_UTIL.waitFor(10000,
|
||||
new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() {
|
||||
return TEST_UTIL.getMiniHBaseCluster().getLiveMasterThreads().size() == 2;
|
||||
}
|
||||
});
|
||||
TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(10000);
|
||||
// Make a registry RPC, this should trigger a refresh since one of the RPC fails.
|
||||
try {
|
||||
registry.getClusterId();
|
||||
} catch (MasterRegistryFetchException e) {
|
||||
// Expected.
|
||||
}
|
||||
// Wait until the killed master de-registered.
|
||||
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return registry.getMasters().size() == 2;
|
||||
}
|
||||
});
|
||||
TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return registry.getParsedMasterServers().size() == 2;
|
||||
}
|
||||
});
|
||||
final ImmutableSet<String> newMasters2 = registry.getParsedMasterServers();
|
||||
assertEquals(2, newMasters2.size());
|
||||
assertFalse(newMasters2.contains(activeMaster.getServerName().toString()));
|
||||
} finally {
|
||||
registry.close();
|
||||
// Reset the state, add a killed master.
|
||||
TEST_UTIL.getMiniHBaseCluster().startMaster();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
/**
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
@ -21,10 +21,12 @@ package org.apache.hadoop.hbase.master;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -33,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ChoreService;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
@ -70,43 +73,41 @@ public class TestActiveMasterManager {
|
||||
}
|
||||
|
||||
@Test public void testRestartMaster() throws IOException, KeeperException {
|
||||
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
"testActiveMasterManagerFromZK", null, true);
|
||||
try {
|
||||
ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
|
||||
ZKUtil.deleteNode(zk, zk.clusterStateZNode);
|
||||
} catch(KeeperException.NoNodeException nne) {}
|
||||
try (ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
"testActiveMasterManagerFromZK", null, true)) {
|
||||
try {
|
||||
ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
|
||||
ZKUtil.deleteNode(zk, zk.clusterStateZNode);
|
||||
} catch (KeeperException.NoNodeException nne) {
|
||||
}
|
||||
|
||||
// Create the master node with a dummy address
|
||||
ServerName master = ServerName.valueOf("localhost", 1, System.currentTimeMillis());
|
||||
// Should not have a master yet
|
||||
DummyMaster dummyMaster = new DummyMaster(zk,master);
|
||||
ClusterStatusTracker clusterStatusTracker =
|
||||
dummyMaster.getClusterStatusTracker();
|
||||
ActiveMasterManager activeMasterManager =
|
||||
dummyMaster.getActiveMasterManager();
|
||||
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertNull(activeMasterManager.getActiveMasterServerName());
|
||||
// Create the master node with a dummy address
|
||||
ServerName master = ServerName.valueOf("localhost", 1, System.currentTimeMillis());
|
||||
// Should not have a master yet
|
||||
DummyMaster dummyMaster = new DummyMaster(zk, master);
|
||||
ClusterStatusTracker clusterStatusTracker =
|
||||
dummyMaster.getClusterStatusTracker();
|
||||
ActiveMasterManager activeMasterManager =
|
||||
dummyMaster.getActiveMasterManager();
|
||||
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
|
||||
|
||||
// First test becoming the active master uninterrupted
|
||||
MonitoredTask status = Mockito.mock(MonitoredTask.class);
|
||||
clusterStatusTracker.setClusterUp();
|
||||
// First test becoming the active master uninterrupted
|
||||
MonitoredTask status = Mockito.mock(MonitoredTask.class);
|
||||
clusterStatusTracker.setClusterUp();
|
||||
|
||||
activeMasterManager.blockUntilBecomingActiveMaster(100, status);
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, master);
|
||||
assertMaster(zk, activeMasterManager.getActiveMasterServerName());
|
||||
activeMasterManager.blockUntilBecomingActiveMaster(100, status);
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, master);
|
||||
|
||||
// Now pretend master restart
|
||||
DummyMaster secondDummyMaster = new DummyMaster(zk,master);
|
||||
ActiveMasterManager secondActiveMasterManager =
|
||||
secondDummyMaster.getActiveMasterManager();
|
||||
assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
|
||||
activeMasterManager.blockUntilBecomingActiveMaster(100, status);
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, master);
|
||||
assertMaster(zk, activeMasterManager.getActiveMasterServerName());
|
||||
assertMaster(zk, secondActiveMasterManager.getActiveMasterServerName());
|
||||
// Now pretend master restart
|
||||
DummyMaster secondDummyMaster = new DummyMaster(zk, master);
|
||||
ActiveMasterManager secondActiveMasterManager =
|
||||
secondDummyMaster.getActiveMasterManager();
|
||||
assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
|
||||
activeMasterManager.blockUntilBecomingActiveMaster(100, status);
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, master);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -116,86 +117,126 @@ public class TestActiveMasterManager {
|
||||
*/
|
||||
@Test
|
||||
public void testActiveMasterManagerFromZK() throws Exception {
|
||||
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
"testActiveMasterManagerFromZK", null, true);
|
||||
try {
|
||||
try (ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
"testActiveMasterManagerFromZK", null, true)) {
|
||||
try {
|
||||
ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
|
||||
ZKUtil.deleteNode(zk, zk.clusterStateZNode);
|
||||
} catch (KeeperException.NoNodeException nne) {
|
||||
}
|
||||
|
||||
// Create the master node with a dummy address
|
||||
ServerName firstMasterAddress =
|
||||
ServerName.valueOf("localhost", 1, System.currentTimeMillis());
|
||||
ServerName secondMasterAddress =
|
||||
ServerName.valueOf("localhost", 2, System.currentTimeMillis());
|
||||
|
||||
// Should not have a master yet
|
||||
DummyMaster ms1 = new DummyMaster(zk, firstMasterAddress);
|
||||
ActiveMasterManager activeMasterManager =
|
||||
ms1.getActiveMasterManager();
|
||||
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
|
||||
|
||||
// First test becoming the active master uninterrupted
|
||||
ClusterStatusTracker clusterStatusTracker =
|
||||
ms1.getClusterStatusTracker();
|
||||
clusterStatusTracker.setClusterUp();
|
||||
activeMasterManager.blockUntilBecomingActiveMaster(100,
|
||||
Mockito.mock(MonitoredTask.class));
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, firstMasterAddress);
|
||||
|
||||
// New manager will now try to become the active master in another thread
|
||||
WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress);
|
||||
t.start();
|
||||
// Wait for this guy to figure out there is another active master
|
||||
// Wait for 1 second at most
|
||||
int sleeps = 0;
|
||||
while (!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
|
||||
Thread.sleep(10);
|
||||
sleeps++;
|
||||
}
|
||||
|
||||
// Both should see that there is an active master
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertTrue(t.manager.clusterHasActiveMaster.get());
|
||||
// But secondary one should not be the active master
|
||||
assertFalse(t.isActiveMaster);
|
||||
|
||||
// Close the first server and delete it's master node
|
||||
ms1.stop("stopping first server");
|
||||
|
||||
// Use a listener to capture when the node is actually deleted
|
||||
NodeDeletionListener listener = new NodeDeletionListener(zk, zk.getMasterAddressZNode());
|
||||
zk.registerListener(listener);
|
||||
|
||||
LOG.info("Deleting master node");
|
||||
ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
|
||||
ZKUtil.deleteNode(zk, zk.clusterStateZNode);
|
||||
} catch(KeeperException.NoNodeException nne) {}
|
||||
|
||||
// Create the master node with a dummy address
|
||||
ServerName firstMasterAddress =
|
||||
ServerName.valueOf("localhost", 1, System.currentTimeMillis());
|
||||
ServerName secondMasterAddress =
|
||||
ServerName.valueOf("localhost", 2, System.currentTimeMillis());
|
||||
// Wait for the node to be deleted
|
||||
LOG.info("Waiting for active master manager to be notified");
|
||||
listener.waitForDeletion();
|
||||
LOG.info("Master node deleted");
|
||||
|
||||
// Should not have a master yet
|
||||
DummyMaster ms1 = new DummyMaster(zk,firstMasterAddress);
|
||||
ActiveMasterManager activeMasterManager =
|
||||
ms1.getActiveMasterManager();
|
||||
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertNull(activeMasterManager.getActiveMasterServerName());
|
||||
// Now we expect the secondary manager to have and be the active master
|
||||
// Wait for 1 second at most
|
||||
sleeps = 0;
|
||||
while (!t.isActiveMaster && sleeps < 100) {
|
||||
Thread.sleep(10);
|
||||
sleeps++;
|
||||
}
|
||||
LOG.debug("Slept " + sleeps + " times");
|
||||
|
||||
// First test becoming the active master uninterrupted
|
||||
ClusterStatusTracker clusterStatusTracker =
|
||||
ms1.getClusterStatusTracker();
|
||||
clusterStatusTracker.setClusterUp();
|
||||
activeMasterManager.blockUntilBecomingActiveMaster(100,
|
||||
Mockito.mock(MonitoredTask.class));
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertMaster(zk, firstMasterAddress);
|
||||
assertMaster(zk, activeMasterManager.getActiveMasterServerName());
|
||||
assertTrue(t.manager.clusterHasActiveMaster.get());
|
||||
assertTrue(t.isActiveMaster);
|
||||
|
||||
// New manager will now try to become the active master in another thread
|
||||
WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress);
|
||||
t.start();
|
||||
// Wait for this guy to figure out there is another active master
|
||||
// Wait for 1 second at most
|
||||
int sleeps = 0;
|
||||
while(!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
|
||||
Thread.sleep(10);
|
||||
sleeps++;
|
||||
LOG.info("Deleting master node");
|
||||
|
||||
ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
|
||||
}
|
||||
}
|
||||
|
||||
// Both should see that there is an active master
|
||||
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
|
||||
assertTrue(t.manager.clusterHasActiveMaster.get());
|
||||
// But secondary one should not be the active master
|
||||
assertFalse(t.isActiveMaster);
|
||||
// Verify the active master ServerName is populated in standby master.
|
||||
assertEquals(firstMasterAddress, t.manager.getActiveMasterServerName());
|
||||
|
||||
// Close the first server and delete it's master node
|
||||
ms1.stop("stopping first server");
|
||||
|
||||
// Use a listener to capture when the node is actually deleted
|
||||
NodeDeletionListener listener = new NodeDeletionListener(zk, zk.getMasterAddressZNode());
|
||||
zk.registerListener(listener);
|
||||
|
||||
LOG.info("Deleting master node");
|
||||
ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
|
||||
|
||||
// Wait for the node to be deleted
|
||||
LOG.info("Waiting for active master manager to be notified");
|
||||
listener.waitForDeletion();
|
||||
LOG.info("Master node deleted");
|
||||
|
||||
// Now we expect the secondary manager to have and be the active master
|
||||
// Wait for 1 second at most
|
||||
sleeps = 0;
|
||||
while(!t.isActiveMaster && sleeps < 100) {
|
||||
Thread.sleep(10);
|
||||
sleeps++;
|
||||
@Test
|
||||
public void testBackupMasterUpdates() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
try (ZooKeeperWatcher zk = new ZooKeeperWatcher(
|
||||
conf, "testBackupMasterUpdates", null, true)) {
|
||||
ServerName sn1 = ServerName.valueOf("localhost", 1, -1);
|
||||
DummyMaster master1 = new DummyMaster(zk, sn1);
|
||||
final ActiveMasterManager activeMasterManager = master1.getActiveMasterManager();
|
||||
activeMasterManager.blockUntilBecomingActiveMaster(100,
|
||||
Mockito.mock(MonitoredTask.class));
|
||||
assertEquals(sn1, activeMasterManager.getActiveMasterServerName());
|
||||
assertEquals(0, activeMasterManager.getBackupMasters().size());
|
||||
// Add backup masters
|
||||
final List<String> backupZNodes = new ArrayList<>();
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
ServerName backupSn = ServerName.valueOf("localhost", 1000 + i, -1);
|
||||
String backupZn = ZKUtil.joinZNode(zk.backupMasterAddressesZNode, backupSn.toString());
|
||||
backupZNodes.add(backupZn);
|
||||
MasterAddressTracker.setMasterAddress(zk, backupZn, backupSn, 1234);
|
||||
TEST_UTIL.waitFor(10000,
|
||||
new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return activeMasterManager.getBackupMasters().size() == backupZNodes.size();
|
||||
}
|
||||
});
|
||||
}
|
||||
// Remove backup masters
|
||||
int numBackups = backupZNodes.size();
|
||||
for (String backupZNode: backupZNodes) {
|
||||
ZKUtil.deleteNode(zk, backupZNode);
|
||||
final int currentBackups = --numBackups;
|
||||
TEST_UTIL.waitFor(10000,
|
||||
new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
return activeMasterManager.getBackupMasters().size() == currentBackups;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
LOG.debug("Slept " + sleeps + " times");
|
||||
|
||||
assertTrue(t.manager.clusterHasActiveMaster.get());
|
||||
assertTrue(t.isActiveMaster);
|
||||
assertEquals(secondMasterAddress, t.manager.getActiveMasterServerName());
|
||||
|
||||
LOG.info("Deleting master node");
|
||||
|
||||
ZKUtil.deleteNode(zk, zk.getMasterAddressZNode());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -206,8 +247,8 @@ public class TestActiveMasterManager {
|
||||
* @throws IOException if an IO problem is encountered
|
||||
*/
|
||||
private void assertMaster(ZooKeeperWatcher zk,
|
||||
ServerName expectedAddress)
|
||||
throws KeeperException, IOException {
|
||||
ServerName expectedAddress)
|
||||
throws KeeperException, IOException {
|
||||
ServerName readAddress = MasterAddressTracker.getMasterAddress(zk);
|
||||
assertNotNull(readAddress);
|
||||
assertTrue(expectedAddress.equals(readAddress));
|
||||
@ -219,7 +260,8 @@ public class TestActiveMasterManager {
|
||||
DummyMaster dummyMaster;
|
||||
boolean isActiveMaster;
|
||||
|
||||
public WaitToBeMasterThread(ZooKeeperWatcher zk, ServerName address) {
|
||||
public WaitToBeMasterThread(ZooKeeperWatcher zk, ServerName address)
|
||||
throws InterruptedIOException {
|
||||
this.dummyMaster = new DummyMaster(zk,address);
|
||||
this.manager = this.dummyMaster.getActiveMasterManager();
|
||||
isActiveMaster = false;
|
||||
@ -267,13 +309,13 @@ public class TestActiveMasterManager {
|
||||
private ClusterStatusTracker clusterStatusTracker;
|
||||
private ActiveMasterManager activeMasterManager;
|
||||
|
||||
public DummyMaster(ZooKeeperWatcher zk, ServerName master) {
|
||||
public DummyMaster(ZooKeeperWatcher zk, ServerName master) throws InterruptedIOException {
|
||||
this.clusterStatusTracker =
|
||||
new ClusterStatusTracker(zk, this);
|
||||
new ClusterStatusTracker(zk, this);
|
||||
clusterStatusTracker.start();
|
||||
|
||||
this.activeMasterManager =
|
||||
new ActiveMasterManager(zk, master, this);
|
||||
new ActiveMasterManager(zk, master, this);
|
||||
zk.registerListener(activeMasterManager);
|
||||
}
|
||||
|
||||
@ -338,4 +380,4 @@ public class TestActiveMasterManager {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -45,8 +45,6 @@ import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ClientMetaService;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetActiveMasterRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetActiveMasterResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterIdResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
|
||||
@ -111,25 +109,6 @@ public class TestClientMetaServiceRPCs {
|
||||
assertEquals(MASTER_COUNT, rpcCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies the active master ServerName as seen by all masters.
|
||||
*/
|
||||
@Test public void TestActiveMaster() throws Exception {
|
||||
HBaseRpcController rpcController = getRpcController();
|
||||
ServerName activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerName();
|
||||
int rpcCount = 0;
|
||||
for (JVMClusterUtil.MasterThread masterThread:
|
||||
TEST_UTIL.getMiniHBaseCluster().getMasterThreads()) {
|
||||
ClientMetaService.BlockingInterface stub =
|
||||
getMasterStub(masterThread.getMaster().getServerName());
|
||||
GetActiveMasterResponse resp =
|
||||
stub.getActiveMaster(rpcController, GetActiveMasterRequest.getDefaultInstance());
|
||||
assertEquals(activeMaster, ProtobufUtil.toServerName(resp.getServerName()));
|
||||
rpcCount++;
|
||||
}
|
||||
assertEquals(MASTER_COUNT, rpcCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the meta region locations RPC returns consistent results across all masters.
|
||||
*/
|
||||
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
@ -1138,10 +1139,16 @@ public class TestMasterFailover {
|
||||
|
||||
// Check that ClusterStatus reports the correct active and backup masters
|
||||
assertNotNull(active);
|
||||
final HMaster finalActive = active;
|
||||
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
ClusterStatus status = finalActive.getClusterStatus();
|
||||
return status.getBackupMastersSize() == 1 && status.getBackupMasters().size() == 1;
|
||||
}
|
||||
});
|
||||
status = active.getClusterStatus();
|
||||
assertTrue(status.getMaster().equals(activeName));
|
||||
assertEquals(1, status.getBackupMastersSize());
|
||||
assertEquals(1, status.getBackupMasters().size());
|
||||
|
||||
// kill the active master
|
||||
LOG.debug("\n\nStopping the active master " + active.getServerName() + "\n");
|
||||
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
@ -45,10 +47,19 @@ public class TestMasterAddressTracker {
|
||||
private static final Log LOG = LogFactory.getLog(TestMasterAddressTracker.class);
|
||||
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
// Cleaned up after each unit test.
|
||||
private static ZooKeeperWatcher zk;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@After
|
||||
public void cleanUp() {
|
||||
if (zk != null) {
|
||||
zk.close();
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
@ -79,9 +90,10 @@ public class TestMasterAddressTracker {
|
||||
*/
|
||||
private MasterAddressTracker setupMasterTracker(final ServerName sn, final int infoPort)
|
||||
throws Exception {
|
||||
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
name.getMethodName(), null);
|
||||
ZKUtil.createAndFailSilent(zk, zk.baseZNode);
|
||||
ZKUtil.createAndFailSilent(zk, zk.backupMasterAddressesZNode);
|
||||
|
||||
// Should not have a master yet
|
||||
MasterAddressTracker addressTracker = new MasterAddressTracker(zk, null);
|
||||
@ -155,6 +167,29 @@ public class TestMasterAddressTracker {
|
||||
assertEquals("Should receive 0 for backup not found.", 0, addressTracker.getMasterInfoPort());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackupMasters() throws Exception {
|
||||
final ServerName sn = ServerName.valueOf("localhost", 5678, System.currentTimeMillis());
|
||||
final MasterAddressTracker addressTracker = setupMasterTracker(sn, 1111);
|
||||
assertTrue(addressTracker.hasMaster());
|
||||
ServerName activeMaster = addressTracker.getMasterAddress();
|
||||
assertEquals(activeMaster, sn);
|
||||
// No current backup masters
|
||||
List<ServerName> backupMasters = MasterAddressTracker.getBackupMastersAndRenewWatch(zk);
|
||||
assertEquals(0, backupMasters.size());
|
||||
ServerName backupMaster1 = ServerName.valueOf("localhost", 2222, -1);
|
||||
ServerName backupMaster2 = ServerName.valueOf("localhost", 3333, -1);
|
||||
String backupZNode1 = ZKUtil.joinZNode(zk.backupMasterAddressesZNode, backupMaster1.toString());
|
||||
String backupZNode2 = ZKUtil.joinZNode(zk.backupMasterAddressesZNode, backupMaster2.toString());
|
||||
// Add a backup master
|
||||
MasterAddressTracker.setMasterAddress(zk, backupZNode1, backupMaster1, 2222);
|
||||
MasterAddressTracker.setMasterAddress(zk, backupZNode2, backupMaster2, 3333);
|
||||
backupMasters = MasterAddressTracker.getBackupMastersAndRenewWatch(zk);
|
||||
assertEquals(2, backupMasters.size());
|
||||
assertTrue(backupMasters.contains(backupMaster1));
|
||||
assertTrue(backupMasters.contains(backupMaster2));
|
||||
}
|
||||
|
||||
public static class NodeCreationListener extends ZooKeeperListener {
|
||||
private static final Log LOG = LogFactory.getLog(NodeCreationListener.class);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user