HBASE-17718 Difference between RS's servername and its ephemeral node cause SSH stop working
This patch reverts HBASE-9593 -- i.e. registering in zk before we register with master putting it back to how it was where we register in zk AFTER we report for duty with the master (because then we'll register in zk with the name the master gave us). It then fixes the problem reported in HBASE-9593 in an alternate fashion by checking for a RS znode if we failed a connect on assign; if none found, we remove a server from online servers list. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Make move method available to tests. M hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Correct method name changing moveFromOnelineToDeadServers to moveFromOnlineToDeadServers Add actual fix which is call to checkForRSznode if exception trying to open a region; if none found, call expire on the server so it gets removed from the list of online servers. This patch exposes sloppyness in the waitForRegionServers around our current case where Master is hosting regions but ONLY hbase:meta; in this case we need to wait on at least another server to report in beyond Master (we weren't but stuff was 'working' because of the early registration of RS nodes in zk). M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Make 'killed' available to tests. Put registry of ephemeral node back to where it was originally, so it is AFTER we get response from Master on registering for duty so we can put our znode up in zk with the name the Master gave us rather than local name (which could be unknown to the Master). private boolean stopping = false; M hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSKilledWhenInitializing.java Cleanup and test of new cleanup.
This commit is contained in:
parent
de25fcbd93
commit
7fa7156f2c
|
@ -85,7 +85,6 @@ import org.apache.hadoop.hbase.exceptions.MergeRegionException;
|
|||
import org.apache.hadoop.hbase.executor.ExecutorType;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
|
||||
import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.http.InfoServer;
|
||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
|
@ -158,6 +157,7 @@ import org.apache.hadoop.hbase.util.Addressing;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CompressionTest;
|
||||
import org.apache.hadoop.hbase.util.EncryptionTest;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
|
@ -166,7 +166,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
import org.apache.hadoop.hbase.util.ZKDataMigrator;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
|
||||
|
@ -178,7 +177,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
|
@ -1544,7 +1542,8 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
});
|
||||
}
|
||||
|
||||
void move(final byte[] encodedRegionName,
|
||||
@VisibleForTesting // Public so can be accessed by tests.
|
||||
public void move(final byte[] encodedRegionName,
|
||||
final byte[] destServerName) throws HBaseIOException {
|
||||
RegionState regionState = assignmentManager.getRegionStates().
|
||||
getRegionState(Bytes.toString(encodedRegionName));
|
||||
|
|
|
@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.master;
|
|||
|
||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
|
|||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -83,6 +84,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
|||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* The ServerManager class manages info about region servers.
|
||||
* <p>
|
||||
|
@ -552,7 +555,7 @@ public class ServerManager {
|
|||
}
|
||||
|
||||
try {
|
||||
List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.znodePaths.rsZNode);
|
||||
List<String> servers = getRegionServersInZK(zkw);
|
||||
if (servers == null || servers.isEmpty() || (servers.size() == 1
|
||||
&& servers.contains(sn.toString()))) {
|
||||
LOG.info("ZK shows there is only the master self online, exiting now");
|
||||
|
@ -574,6 +577,11 @@ public class ServerManager {
|
|||
}
|
||||
}
|
||||
|
||||
private List<String> getRegionServersInZK(final ZooKeeperWatcher zkw)
|
||||
throws KeeperException {
|
||||
return ZKUtil.listChildrenNoWatch(zkw, zkw.znodePaths.rsZNode);
|
||||
}
|
||||
|
||||
/*
|
||||
* Expire the passed server. Add it to list of dead servers and queue a
|
||||
* shutdown processing.
|
||||
|
@ -597,7 +605,7 @@ public class ServerManager {
|
|||
" but server shutdown already in progress");
|
||||
return;
|
||||
}
|
||||
moveFromOnelineToDeadServers(serverName);
|
||||
moveFromOnlineToDeadServers(serverName);
|
||||
|
||||
// If cluster is going down, yes, servers are going to be expiring; don't
|
||||
// process as a dead server
|
||||
|
@ -626,7 +634,7 @@ public class ServerManager {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void moveFromOnelineToDeadServers(final ServerName sn) {
|
||||
public void moveFromOnlineToDeadServers(final ServerName sn) {
|
||||
synchronized (onlineServers) {
|
||||
if (!this.onlineServers.containsKey(sn)) {
|
||||
LOG.warn("Expiration of " + sn + " but server not online");
|
||||
|
@ -751,10 +759,56 @@ public class ServerManager {
|
|||
OpenRegionResponse response = admin.openRegion(null, request);
|
||||
return ResponseConverter.getRegionOpeningState(response);
|
||||
} catch (ServiceException se) {
|
||||
checkForRSznode(server, se);
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check for an odd state, where we think an RS is up but it is not. Do it on OPEN.
|
||||
* This is only case where the check makes sense.
|
||||
*
|
||||
* <p>We are checking for instance of HBASE-9593 where a RS registered but died before it put
|
||||
* up its znode in zk. In this case, the RS made it into the list of online servers but it
|
||||
* is not actually UP. We do the check here where there is an evident problem rather
|
||||
* than do some crazy footwork where we'd have master check zk after a RS had reported
|
||||
* for duty with provisional state followed by a confirmed state; that'd be a mess.
|
||||
* Real fix is HBASE-17733.
|
||||
*/
|
||||
private void checkForRSznode(final ServerName serverName, final ServiceException se) {
|
||||
if (se.getCause() == null) return;
|
||||
if (!(se.getCause() instanceof ConnectException)) return;
|
||||
if (!isServerOnline(serverName)) return;
|
||||
// We think this server is online. Check it has a znode up. Currently, a RS
|
||||
// registers an ephereral znode in zk. If not present, something is up. Maybe
|
||||
// HBASE-9593 where RS crashed AFTER reportForDuty but BEFORE it put up an ephemeral
|
||||
// znode.
|
||||
List<String> servers = null;
|
||||
try {
|
||||
servers = getRegionServersInZK(this.master.getZooKeeper());
|
||||
} catch (KeeperException ke) {
|
||||
LOG.warn("Failed to list regionservers", ke);
|
||||
// ZK is malfunctioning, don't hang here
|
||||
}
|
||||
boolean found = false;
|
||||
if (servers != null) {
|
||||
for (String serverNameAsStr: servers) {
|
||||
ServerName sn = ServerName.valueOf(serverNameAsStr);
|
||||
if (sn.equals(serverName)) {
|
||||
// Found a server up in zk.
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
LOG.warn("Online server " + serverName.toString() + " has no corresponding " +
|
||||
"ephemeral znode (Did it die before registering in zk?); " +
|
||||
"calling expire to clean it up!");
|
||||
expireServer(serverName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends an OPEN RPC to the specified server to open the specified region.
|
||||
* <p>
|
||||
|
@ -779,6 +833,7 @@ public class ServerManager {
|
|||
OpenRegionResponse response = admin.openRegion(null, request);
|
||||
return ResponseConverter.getRegionOpeningStateList(response);
|
||||
} catch (ServiceException se) {
|
||||
checkForRSznode(server, se);
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
|
@ -946,6 +1001,27 @@ public class ServerManager {
|
|||
return admin;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate min necessary to start. This is not an absolute. It is just
|
||||
* a friction that will cause us hang around a bit longer waiting on
|
||||
* RegionServers to check-in.
|
||||
*/
|
||||
private int getMinToStart() {
|
||||
// One server should be enough to get us off the ground.
|
||||
int requiredMinToStart = 1;
|
||||
if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
|
||||
if (!BaseLoadBalancer.userTablesOnMaster(master.getConfiguration())) {
|
||||
// If Master is carrying regions but NOT user-space regions (the current default),
|
||||
// since the Master shows as a 'server', we need at least one more server to check
|
||||
// in before we can start up so up defaultMinToStart to 2.
|
||||
requiredMinToStart = 2;
|
||||
}
|
||||
}
|
||||
int minToStart = this.master.getConfiguration().getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
|
||||
// Ensure we are never less than requiredMinToStart else stuff won't work.
|
||||
return minToStart == -1 || minToStart < requiredMinToStart? requiredMinToStart: minToStart;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the region servers to report in.
|
||||
* We will wait until one of this condition is met:
|
||||
|
@ -965,30 +1041,15 @@ public class ServerManager {
|
|||
getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
|
||||
final long timeout = this.master.getConfiguration().
|
||||
getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
|
||||
int defaultMinToStart = 1;
|
||||
if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
|
||||
// If we assign regions to master, we'd like to start
|
||||
// at least another region server so that we don't
|
||||
// assign all regions to master if other region servers
|
||||
// don't come up in time.
|
||||
defaultMinToStart = 2;
|
||||
}
|
||||
int minToStart = this.master.getConfiguration().
|
||||
getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
|
||||
if (minToStart < 1) {
|
||||
LOG.warn(String.format(
|
||||
"The value of '%s' (%d) can not be less than 1, ignoring.",
|
||||
WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
|
||||
minToStart = 1;
|
||||
}
|
||||
// Min is not an absolute; just a friction making us wait longer on server checkin.
|
||||
int minToStart = getMinToStart();
|
||||
int maxToStart = this.master.getConfiguration().
|
||||
getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
|
||||
if (maxToStart < minToStart) {
|
||||
LOG.warn(String.format(
|
||||
"The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
|
||||
WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
|
||||
WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
|
||||
maxToStart = Integer.MAX_VALUE;
|
||||
LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
|
||||
WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
|
||||
WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
|
||||
maxToStart = Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
|
@ -998,16 +1059,25 @@ public class ServerManager {
|
|||
long lastCountChange = startTime;
|
||||
int count = countOfRegionServers();
|
||||
int oldCount = 0;
|
||||
while (!this.master.isStopped() && count < maxToStart
|
||||
&& (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
|
||||
// This while test is a little hard to read. We try to comment it in below but in essence:
|
||||
// Wait if Master is not stopped and the number of regionservers that have checked-in is
|
||||
// less than the maxToStart. Both of these conditions will be true near universally.
|
||||
// Next, we will keep cycling if ANY of the following three conditions are true:
|
||||
// 1. The time since a regionserver registered is < interval (means servers are actively checking in).
|
||||
// 2. We are under the total timeout.
|
||||
// 3. The count of servers is < minimum expected AND we are within timeout (this just puts up
|
||||
// a little friction making us wait a bit longer if < minimum servers).
|
||||
while (!this.master.isStopped() && count < maxToStart &&
|
||||
(((lastCountChange + interval) > now) ||
|
||||
(timeout > slept) ||
|
||||
((count < minToStart) && (timeout > slept)))) {
|
||||
// Log some info at every interval time or if there is a change
|
||||
if (oldCount != count || lastLogTime+interval < now){
|
||||
if (oldCount != count || lastLogTime + interval < now) {
|
||||
lastLogTime = now;
|
||||
String msg =
|
||||
"Waiting for region servers count to settle; currently"+
|
||||
" checked in " + count + ", slept for " + slept + " ms," +
|
||||
" expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
|
||||
", timeout of "+timeout+" ms, interval of "+interval+" ms.";
|
||||
"Waiting for RegionServer count=" + count + " to settle; waited "+
|
||||
slept + "ms, expecting minimum=" + minToStart + "server(s) (max="+ getStrForMax(maxToStart) + "server(s)), " +
|
||||
"timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
|
||||
LOG.info(msg);
|
||||
status.setStatus(msg);
|
||||
}
|
||||
|
@ -1025,13 +1095,16 @@ public class ServerManager {
|
|||
}
|
||||
}
|
||||
|
||||
LOG.info("Finished waiting for region servers count to settle;" +
|
||||
" checked in " + count + ", slept for " + slept + " ms," +
|
||||
" expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
|
||||
" master is "+ (this.master.isStopped() ? "stopped.": "running")
|
||||
LOG.info("Finished waiting for RegionServer count=" + count + " to settle, slept for " + slept + "ms," +
|
||||
" expecting minimum=" + minToStart + " server(s) (max=" + getStrForMax(maxToStart) + " server(s),"+
|
||||
" Master is "+ (this.master.isStopped() ? "stopped.": "running")
|
||||
);
|
||||
}
|
||||
|
||||
private String getStrForMax(final int max) {
|
||||
return max == Integer.MAX_VALUE? "NO_LIMIT": Integer.toString(max);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A copy of the internal list of online servers.
|
||||
*/
|
||||
|
|
|
@ -327,7 +327,7 @@ public class HRegionServer extends HasThread implements
|
|||
// space regions.
|
||||
private boolean stopping = false;
|
||||
|
||||
private volatile boolean killed = false;
|
||||
volatile boolean killed = false;
|
||||
|
||||
protected final Configuration conf;
|
||||
|
||||
|
@ -965,8 +965,6 @@ public class HRegionServer extends HasThread implements
|
|||
try {
|
||||
if (!isStopped() && !isAborted()) {
|
||||
ShutdownHook.install(conf, fs, this, Thread.currentThread());
|
||||
// Set our ephemeral znode up in zookeeper now we have a name.
|
||||
createMyEphemeralNode();
|
||||
// Initialize the RegionServerCoprocessorHost now that our ephemeral
|
||||
// node was created, in case any coprocessors want to use ZooKeeper
|
||||
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
|
||||
|
@ -1446,6 +1444,8 @@ public class HRegionServer extends HasThread implements
|
|||
}
|
||||
this.conf.set(key, value);
|
||||
}
|
||||
// Set our ephemeral znode up in zookeeper now we have a name.
|
||||
createMyEphemeralNode();
|
||||
|
||||
if (updateRootDir) {
|
||||
// initialize file system by the config fs.defaultFS and hbase.rootdir from master
|
||||
|
@ -2297,6 +2297,7 @@ public class HRegionServer extends HasThread implements
|
|||
* logs but it does close socket in case want to bring up server on old
|
||||
* hostname+port immediately.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected void kill() {
|
||||
this.killed = true;
|
||||
abort("Simulated kill");
|
||||
|
|
|
@ -133,7 +133,7 @@ public class TestMasterProcedureEvents {
|
|||
while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10);
|
||||
|
||||
// Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
|
||||
master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName());
|
||||
master.getServerManager().moveFromOnlineToDeadServers(hrs.getServerName());
|
||||
|
||||
// check event wait/wake
|
||||
testProcedureEventWaitWake(master, master.getServerCrashProcessingEnabledEvent(),
|
||||
|
|
|
@ -113,7 +113,7 @@ public class TestServerCrashProcedure {
|
|||
// Now, reenable processing else we can't get a lock on the ServerCrashProcedure.
|
||||
master.setServerCrashProcessingEnabled(true);
|
||||
// Do some of the master processing of dead servers so when SCP runs, it has expected 'state'.
|
||||
master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName());
|
||||
master.getServerManager().moveFromOnlineToDeadServers(hrs.getServerName());
|
||||
// Enable test flags and then queue the crash procedure.
|
||||
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||
|
|
|
@ -21,114 +21,164 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.LocalHBaseCluster;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
/**
|
||||
* Tests region server termination during startup.
|
||||
* Tests that a regionserver that dies after reporting for duty gets removed
|
||||
* from list of online regions. See HBASE-9593.
|
||||
*/
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
@Category({RegionServerTests.class, MediumTests.class})
|
||||
public class TestRSKilledWhenInitializing {
|
||||
private static boolean masterActive = false;
|
||||
private static AtomicBoolean firstRS = new AtomicBoolean(true);
|
||||
private static final Log LOG = LogFactory.getLog(TestRSKilledWhenInitializing.class);
|
||||
@Rule public TestName testName = new TestName();
|
||||
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().
|
||||
withTimeout(this.getClass()).withLookingForStuckThread(true).build();
|
||||
|
||||
// This boolean needs to be globally available. It is used below in our
|
||||
// mocked up regionserver so it knows when to die.
|
||||
private static AtomicBoolean masterActive = new AtomicBoolean(false);
|
||||
// Ditto for this variable. It also is used in the mocked regionserver class.
|
||||
private static final AtomicReference<ServerName> killedRS = new AtomicReference<ServerName>();
|
||||
|
||||
private static final int NUM_MASTERS = 1;
|
||||
private static final int NUM_RS = 2;
|
||||
|
||||
/**
|
||||
* Test verifies whether a region server is removing from online servers list in master if it went
|
||||
* down after registering with master.
|
||||
* down after registering with master. Test will TIMEOUT if an error!!!!
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 180000)
|
||||
public void testRSTerminationAfterRegisteringToMasterBeforeCreatingEphemeralNod() throws Exception {
|
||||
|
||||
final int NUM_MASTERS = 1;
|
||||
final int NUM_RS = 2;
|
||||
firstRS.set(true);
|
||||
@Test
|
||||
public void testRSTerminationAfterRegisteringToMasterBeforeCreatingEphemeralNode()
|
||||
throws Exception {
|
||||
// Create config to use for this cluster
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
|
||||
|
||||
// Start the cluster
|
||||
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
TEST_UTIL.createRootDir();
|
||||
final LocalHBaseCluster cluster =
|
||||
new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS, HMaster.class, MockedRegionServer.class);
|
||||
final MasterThread master = cluster.getMasters().get(0);
|
||||
master.start();
|
||||
new LocalHBaseCluster(conf, NUM_MASTERS, NUM_RS, HMaster.class,
|
||||
RegisterAndDieRegionServer.class);
|
||||
final MasterThread master = startMaster(cluster.getMasters().get(0));
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (!master.getMaster().isInitialized()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
if (System.currentTimeMillis() > startTime + 30000) {
|
||||
throw new RuntimeException("Master not active after 30 seconds");
|
||||
}
|
||||
masterActive.set(true);
|
||||
// Now start regionservers.
|
||||
// First RS to report for duty will kill itself when it gets a response.
|
||||
// See below in the RegisterAndDieRegionServer handleReportForDutyResponse.
|
||||
for (int i = 0; i < NUM_RS; i++) {
|
||||
cluster.getRegionServers().get(i).start();
|
||||
}
|
||||
masterActive = true;
|
||||
cluster.getRegionServers().get(0).start();
|
||||
cluster.getRegionServers().get(1).start();
|
||||
Thread.sleep(10000);
|
||||
// Now wait on master to see NUM_RS + 1 servers as being online, NUM_RS and itself.
|
||||
// Then wait until the killed RS gets removed from zk and triggers Master to remove
|
||||
// it from list of online RS.
|
||||
List<ServerName> onlineServersList =
|
||||
master.getMaster().getServerManager().getOnlineServersList();
|
||||
while (onlineServersList.size() > 2) {
|
||||
while (onlineServersList.size() < NUM_RS + 1) {
|
||||
// Spin till we see NUM_RS + Master in online servers list.
|
||||
onlineServersList = master.getMaster().getServerManager().getOnlineServersList();
|
||||
}
|
||||
LOG.info(onlineServersList);
|
||||
assertEquals(NUM_RS + 1, onlineServersList.size());
|
||||
// Steady state. How many regions open?
|
||||
// Wait until killedRS is set
|
||||
while (killedRS.get() == null) {
|
||||
Threads.sleep(10);
|
||||
}
|
||||
final int regionsOpenCount = master.getMaster().getAssignmentManager().getNumRegionsOpened();
|
||||
// Find non-meta region (namespace?) and assign to the killed server. That'll trigger cleanup.
|
||||
Map<HRegionInfo, ServerName> assigments =
|
||||
master.getMaster().getAssignmentManager().getRegionStates().getRegionAssignments();
|
||||
HRegionInfo hri = null;
|
||||
for (Map.Entry<HRegionInfo, ServerName> e: assigments.entrySet()) {
|
||||
if (e.getKey().isMetaRegion()) continue;
|
||||
hri = e.getKey();
|
||||
break;
|
||||
}
|
||||
// Try moving region to the killed server. It will fail. As by-product, we will
|
||||
// remove the RS from Master online list because no corresponding znode.
|
||||
LOG.info("Move " + hri.getEncodedName() + " to " + killedRS.get());
|
||||
master.getMaster().move(hri.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(killedRS.get().toString()));
|
||||
while (onlineServersList.size() > NUM_RS) {
|
||||
Thread.sleep(100);
|
||||
onlineServersList = master.getMaster().getServerManager().getOnlineServersList();
|
||||
}
|
||||
assertEquals(onlineServersList.size(), 2);
|
||||
cluster.shutdown();
|
||||
// Just for kicks, ensure namespace was put back on the old server after above failed move.
|
||||
assertEquals(regionsOpenCount,
|
||||
master.getMaster().getAssignmentManager().getNumRegionsOpened());
|
||||
} finally {
|
||||
masterActive = false;
|
||||
firstRS.set(true);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
cluster.shutdown();
|
||||
cluster.join();
|
||||
TEST_UTIL.shutdownMiniDFSCluster();
|
||||
TEST_UTIL.shutdownMiniZKCluster();
|
||||
TEST_UTIL.cleanupTestDir();
|
||||
}
|
||||
}
|
||||
|
||||
public static class MockedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
|
||||
private MasterThread startMaster(MasterThread master) {
|
||||
master.start();
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (!master.getMaster().isInitialized()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ignored) {
|
||||
LOG.info("Interrupted: ignoring");
|
||||
}
|
||||
if (System.currentTimeMillis() > startTime + 30000) {
|
||||
throw new RuntimeException("Master not active after 30 seconds");
|
||||
}
|
||||
}
|
||||
return master;
|
||||
}
|
||||
|
||||
public MockedRegionServer(Configuration conf, CoordinatedStateManager cp)
|
||||
throws IOException, InterruptedException {
|
||||
/**
|
||||
* A RegionServer that reports for duty and then immediately dies if it is the first to receive
|
||||
* the response to a reportForDuty. When it dies, it clears its ephemeral znode which the master
|
||||
* notices and so removes the region from its set of online regionservers.
|
||||
*/
|
||||
static class RegisterAndDieRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
|
||||
public RegisterAndDieRegionServer(Configuration conf, CoordinatedStateManager cp)
|
||||
throws IOException, InterruptedException {
|
||||
super(conf, cp);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleReportForDutyResponse(RegionServerStartupResponse c) throws IOException {
|
||||
if (firstRS.getAndSet(false)) {
|
||||
InetSocketAddress address = super.getRpcServer().getListenerAddress();
|
||||
if (address == null) {
|
||||
throw new IOException("Listener channel is closed");
|
||||
}
|
||||
for (NameStringPair e : c.getMapEntriesList()) {
|
||||
String key = e.getName();
|
||||
// The hostname the master sees us as.
|
||||
if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
|
||||
String hostnameFromMasterPOV = e.getValue();
|
||||
assertEquals(address.getHostName(), hostnameFromMasterPOV);
|
||||
}
|
||||
}
|
||||
while (!masterActive) {
|
||||
protected void handleReportForDutyResponse(RegionServerStartupResponse c)
|
||||
throws IOException {
|
||||
if (killedRS.compareAndSet(null, getServerName())) {
|
||||
// Make sure Master is up so it will see the removal of the ephemeral znode for this RS.
|
||||
while (!masterActive.get()) {
|
||||
Threads.sleep(100);
|
||||
}
|
||||
super.kill();
|
||||
|
@ -137,4 +187,4 @@ public class TestRSKilledWhenInitializing {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue