HBASE-17718 Difference between RS's servername and its ephemeral node cause SSH stop working; AMENDMENT.
Make test tighter by extending ServerListener so can find when Master is in the waiting-on-regionservers state and making more assertions about state. Fix error where I would move on from waiting-on-regionservers if we had waited max time.
This commit is contained in:
parent
e239e8d2d3
commit
6a57050c24
|
@ -305,7 +305,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
MemoryBoundedLogMessageBuffer rsFatals;
|
MemoryBoundedLogMessageBuffer rsFatals;
|
||||||
|
|
||||||
// flag set after we become the active master (used for testing)
|
// flag set after we become the active master (used for testing)
|
||||||
private volatile boolean isActiveMaster = false;
|
private volatile boolean activeMaster = false;
|
||||||
|
|
||||||
// flag set after we complete initialization once active,
|
// flag set after we complete initialization once active,
|
||||||
// it is not private since it's used in unit tests
|
// it is not private since it's used in unit tests
|
||||||
|
@ -597,7 +597,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
@Override
|
@Override
|
||||||
protected void waitForMasterActive(){
|
protected void waitForMasterActive(){
|
||||||
boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
|
boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
|
||||||
while (!(tablesOnMaster && isActiveMaster)
|
while (!(tablesOnMaster && activeMaster)
|
||||||
&& !isStopped() && !isAborted()) {
|
&& !isStopped() && !isAborted()) {
|
||||||
sleeper.sleep();
|
sleeper.sleep();
|
||||||
}
|
}
|
||||||
|
@ -733,7 +733,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
private void finishActiveMasterInitialization(MonitoredTask status)
|
private void finishActiveMasterInitialization(MonitoredTask status)
|
||||||
throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
|
throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
|
||||||
|
|
||||||
isActiveMaster = true;
|
activeMaster = true;
|
||||||
Thread zombieDetector = new Thread(new InitializationMonitor(this),
|
Thread zombieDetector = new Thread(new InitializationMonitor(this),
|
||||||
"ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
|
"ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
|
||||||
zombieDetector.start();
|
zombieDetector.start();
|
||||||
|
@ -2555,7 +2555,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean isActiveMaster() {
|
public boolean isActiveMaster() {
|
||||||
return isActiveMaster;
|
return activeMaster;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -22,20 +22,25 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get notification of server events. The invocations are inline
|
* Get notification of server registration events. The invocations are inline
|
||||||
* so make sure your implementation is fast else you'll slow hbase.
|
* so make sure your implementation is fast or else you'll slow hbase.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface ServerListener {
|
public interface ServerListener {
|
||||||
|
/**
|
||||||
|
* Started waiting on RegionServers to check-in.
|
||||||
|
*/
|
||||||
|
default void waiting() {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The server has joined the cluster.
|
* The server has joined the cluster.
|
||||||
* @param serverName The remote servers name.
|
* @param serverName The remote servers name.
|
||||||
*/
|
*/
|
||||||
void serverAdded(final ServerName serverName);
|
default void serverAdded(final ServerName serverName) {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The server was removed from the cluster.
|
* The server was removed from the cluster.
|
||||||
* @param serverName The remote servers name.
|
* @param serverName The remote servers name.
|
||||||
*/
|
*/
|
||||||
void serverRemoved(final ServerName serverName);
|
default void serverRemoved(final ServerName serverName) {};
|
||||||
}
|
}
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
|
import org.apache.hadoop.hbase.ipc.FailedServerException;
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
|
||||||
|
@ -772,7 +773,16 @@ public class ServerManager {
|
||||||
*/
|
*/
|
||||||
private void checkForRSznode(final ServerName serverName, final ServiceException se) {
|
private void checkForRSznode(final ServerName serverName, final ServiceException se) {
|
||||||
if (se.getCause() == null) return;
|
if (se.getCause() == null) return;
|
||||||
if (!(se.getCause() instanceof ConnectException)) return;
|
Throwable t = se.getCause();
|
||||||
|
if (t instanceof ConnectException) {
|
||||||
|
// If this, proceed to do cleanup.
|
||||||
|
} else {
|
||||||
|
// Look for FailedServerException
|
||||||
|
if (!(t instanceof IOException)) return;
|
||||||
|
if (t.getCause() == null) return;
|
||||||
|
if (!(t.getCause() instanceof FailedServerException)) return;
|
||||||
|
// Ok, found FailedServerException -- continue.
|
||||||
|
}
|
||||||
if (!isServerOnline(serverName)) return;
|
if (!isServerOnline(serverName)) return;
|
||||||
// We think this server is online. Check it has a znode up. Currently, a RS
|
// We think this server is online. Check it has a znode up. Currently, a RS
|
||||||
// registers an ephereral znode in zk. If not present, something is up. Maybe
|
// registers an ephereral znode in zk. If not present, something is up. Maybe
|
||||||
|
@ -1030,20 +1040,19 @@ public class ServerManager {
|
||||||
*
|
*
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public void waitForRegionServers(MonitoredTask status)
|
public void waitForRegionServers(MonitoredTask status) throws InterruptedException {
|
||||||
throws InterruptedException {
|
|
||||||
final long interval = this.master.getConfiguration().
|
final long interval = this.master.getConfiguration().
|
||||||
getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
|
getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
|
||||||
final long timeout = this.master.getConfiguration().
|
final long timeout = this.master.getConfiguration().
|
||||||
getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
|
getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
|
||||||
// Min is not an absolute; just a friction making us wait longer on server checkin.
|
// Min is not an absolute; just a friction making us wait longer on server checkin.
|
||||||
int minToStart = getMinToStart();
|
int minToStart = getMinToStart();
|
||||||
int maxToStart = this.master.getConfiguration().
|
int maxToStart = this.master.getConfiguration().
|
||||||
getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
|
getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
|
||||||
if (maxToStart < minToStart) {
|
if (maxToStart < minToStart) {
|
||||||
LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
|
LOG.warn(String.format("The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
|
||||||
WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
|
WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
|
||||||
WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
|
WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
|
||||||
maxToStart = Integer.MAX_VALUE;
|
maxToStart = Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1060,19 +1069,19 @@ public class ServerManager {
|
||||||
// Next, we will keep cycling if ANY of the following three conditions are true:
|
// Next, we will keep cycling if ANY of the following three conditions are true:
|
||||||
// 1. The time since a regionserver registered is < interval (means servers are actively checking in).
|
// 1. The time since a regionserver registered is < interval (means servers are actively checking in).
|
||||||
// 2. We are under the total timeout.
|
// 2. We are under the total timeout.
|
||||||
// 3. The count of servers is < minimum expected AND we are within timeout (this just puts up
|
// 3. The count of servers is < minimum.
|
||||||
// a little friction making us wait a bit longer if < minimum servers).
|
for (ServerListener listener: this.listeners) {
|
||||||
|
listener.waiting();
|
||||||
|
}
|
||||||
while (!this.master.isStopped() && count < maxToStart &&
|
while (!this.master.isStopped() && count < maxToStart &&
|
||||||
(((lastCountChange + interval) > now) ||
|
((lastCountChange + interval) > now || timeout > slept || count < minToStart)) {
|
||||||
(timeout > slept) ||
|
|
||||||
((count < minToStart) && (timeout > slept)))) {
|
|
||||||
// Log some info at every interval time or if there is a change
|
// Log some info at every interval time or if there is a change
|
||||||
if (oldCount != count || lastLogTime + interval < now) {
|
if (oldCount != count || lastLogTime + interval < now) {
|
||||||
lastLogTime = now;
|
lastLogTime = now;
|
||||||
String msg =
|
String msg =
|
||||||
"Waiting for RegionServer count=" + count + " to settle; waited "+
|
"Waiting on RegionServer count=" + count + " to settle; waited="+
|
||||||
slept + "ms, expecting minimum=" + minToStart + "server(s) (max="+ getStrForMax(maxToStart) + "server(s)), " +
|
slept + "ms, expecting min=" + minToStart + " server(s), max="+ getStrForMax(maxToStart) +
|
||||||
"timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
|
" server(s), " + "timeout=" + timeout + "ms, lastChange=" + (lastCountChange - now) + "ms";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
status.setStatus(msg);
|
status.setStatus(msg);
|
||||||
}
|
}
|
||||||
|
@ -1089,11 +1098,9 @@ public class ServerManager {
|
||||||
lastCountChange = now;
|
lastCountChange = now;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info("Finished wait on RegionServer count=" + count + "; waited=" + slept + "ms," +
|
||||||
LOG.info("Finished waiting for RegionServer count=" + count + " to settle, slept for " + slept + "ms," +
|
" expected min=" + minToStart + " server(s), max=" + getStrForMax(maxToStart) + " server(s),"+
|
||||||
" expecting minimum=" + minToStart + " server(s) (max=" + getStrForMax(maxToStart) + " server(s),"+
|
" master is "+ (this.master.isStopped() ? "stopped.": "running"));
|
||||||
" Master is "+ (this.master.isStopped() ? "stopped.": "running")
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getStrForMax(final int max) {
|
private String getStrForMax(final int max) {
|
||||||
|
|
|
@ -70,22 +70,14 @@ public class DrainingServerTracker extends ZooKeeperListener {
|
||||||
public void start() throws KeeperException, IOException {
|
public void start() throws KeeperException, IOException {
|
||||||
watcher.registerListener(this);
|
watcher.registerListener(this);
|
||||||
// Add a ServerListener to check if a server is draining when it's added.
|
// Add a ServerListener to check if a server is draining when it's added.
|
||||||
serverManager.registerListener(
|
serverManager.registerListener(new ServerListener() {
|
||||||
new ServerListener() {
|
@Override
|
||||||
|
public void serverAdded(ServerName sn) {
|
||||||
@Override
|
if (drainingServers.contains(sn)){
|
||||||
public void serverAdded(ServerName sn) {
|
serverManager.addServerToDrainList(sn);
|
||||||
if (drainingServers.contains(sn)){
|
|
||||||
serverManager.addServerToDrainList(sn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void serverRemoved(ServerName serverName) {
|
|
||||||
// no-op
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
);
|
}
|
||||||
|
});
|
||||||
List<String> servers =
|
List<String> servers =
|
||||||
ZKUtil.listChildrenAndWatchThem(watcher, watcher.znodePaths.drainingZNode);
|
ZKUtil.listChildrenAndWatchThem(watcher, watcher.znodePaths.drainingZNode);
|
||||||
add(servers);
|
add(servers);
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.LocalHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerListener;
|
||||||
import org.apache.hadoop.hbase.master.ServerManager;
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -92,52 +94,57 @@ public class TestRSKilledWhenInitializing {
|
||||||
RegisterAndDieRegionServer.class);
|
RegisterAndDieRegionServer.class);
|
||||||
final MasterThread master = startMaster(cluster.getMasters().get(0));
|
final MasterThread master = startMaster(cluster.getMasters().get(0));
|
||||||
try {
|
try {
|
||||||
masterActive.set(true);
|
// Master is up waiting on RegionServers to check in. Now start RegionServers.
|
||||||
// Now start regionservers.
|
|
||||||
// First RS to report for duty will kill itself when it gets a response.
|
|
||||||
// See below in the RegisterAndDieRegionServer handleReportForDutyResponse.
|
|
||||||
for (int i = 0; i < NUM_RS; i++) {
|
for (int i = 0; i < NUM_RS; i++) {
|
||||||
cluster.getRegionServers().get(i).start();
|
cluster.getRegionServers().get(i).start();
|
||||||
}
|
}
|
||||||
// Now wait on master to see NUM_RS + 1 servers as being online, NUM_RS and itself.
|
// Now wait on master to see NUM_RS + 1 servers as being online, thats NUM_RS plus
|
||||||
// Then wait until the killed RS gets removed from zk and triggers Master to remove
|
// the Master itself (because Master hosts hbase:meta and checks in as though it a RS).
|
||||||
// it from list of online RS.
|
List<ServerName> onlineServersList = null;
|
||||||
List<ServerName> onlineServersList =
|
do {
|
||||||
master.getMaster().getServerManager().getOnlineServersList();
|
|
||||||
while (onlineServersList.size() < NUM_RS + 1) {
|
|
||||||
// Spin till we see NUM_RS + Master in online servers list.
|
|
||||||
onlineServersList = master.getMaster().getServerManager().getOnlineServersList();
|
onlineServersList = master.getMaster().getServerManager().getOnlineServersList();
|
||||||
}
|
} while (onlineServersList.size() < (NUM_RS + 1));
|
||||||
LOG.info(onlineServersList);
|
// Wait until killedRS is set. Means RegionServer is starting to go down.
|
||||||
assertEquals(NUM_RS + 1, onlineServersList.size());
|
|
||||||
// Steady state. How many regions open?
|
|
||||||
// Wait until killedRS is set
|
|
||||||
while (killedRS.get() == null) {
|
while (killedRS.get() == null) {
|
||||||
Threads.sleep(10);
|
Threads.sleep(1);
|
||||||
}
|
}
|
||||||
final int regionsOpenCount = master.getMaster().getAssignmentManager().getNumRegionsOpened();
|
// Wait on the RegionServer to fully die.
|
||||||
|
while (cluster.getLiveRegionServers().size() > NUM_RS) {
|
||||||
|
Threads.sleep(1);
|
||||||
|
}
|
||||||
|
// Make sure Master is fully up before progressing. Could take a while if regions
|
||||||
|
// being reassigned.
|
||||||
|
while (!master.getMaster().isInitialized()) {
|
||||||
|
Threads.sleep(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now in steady state. How many regions open? Master should have too many regionservers
|
||||||
|
// showing still. The downed RegionServer should still be showing as registered.
|
||||||
|
assertTrue(master.getMaster().getServerManager().isServerOnline(killedRS.get()));
|
||||||
// Find non-meta region (namespace?) and assign to the killed server. That'll trigger cleanup.
|
// Find non-meta region (namespace?) and assign to the killed server. That'll trigger cleanup.
|
||||||
Map<HRegionInfo, ServerName> assigments =
|
Map<HRegionInfo, ServerName> assignments = null;
|
||||||
master.getMaster().getAssignmentManager().getRegionStates().getRegionAssignments();
|
do {
|
||||||
|
assignments = master.getMaster().getAssignmentManager().getRegionStates().getRegionAssignments();
|
||||||
|
} while (assignments == null || assignments.size() < 2);
|
||||||
HRegionInfo hri = null;
|
HRegionInfo hri = null;
|
||||||
for (Map.Entry<HRegionInfo, ServerName> e: assigments.entrySet()) {
|
for (Map.Entry<HRegionInfo, ServerName> e: assignments.entrySet()) {
|
||||||
if (e.getKey().isMetaRegion()) continue;
|
if (e.getKey().isMetaRegion()) continue;
|
||||||
hri = e.getKey();
|
hri = e.getKey();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// Try moving region to the killed server. It will fail. As by-product, we will
|
// Try moving region to the killed server. It will fail. As by-product, we will
|
||||||
// remove the RS from Master online list because no corresponding znode.
|
// remove the RS from Master online list because no corresponding znode.
|
||||||
|
assertEquals(NUM_RS + 1, master.getMaster().getServerManager().getOnlineServersList().size());
|
||||||
LOG.info("Move " + hri.getEncodedName() + " to " + killedRS.get());
|
LOG.info("Move " + hri.getEncodedName() + " to " + killedRS.get());
|
||||||
master.getMaster().move(hri.getEncodedNameAsBytes(),
|
master.getMaster().move(hri.getEncodedNameAsBytes(),
|
||||||
Bytes.toBytes(killedRS.get().toString()));
|
Bytes.toBytes(killedRS.get().toString()));
|
||||||
while (onlineServersList.size() > NUM_RS) {
|
// Wait until the RS no longer shows as registered in Master.
|
||||||
|
while (onlineServersList.size() > (NUM_RS + 1)) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
onlineServersList = master.getMaster().getServerManager().getOnlineServersList();
|
onlineServersList = master.getMaster().getServerManager().getOnlineServersList();
|
||||||
}
|
}
|
||||||
// Just for kicks, ensure namespace was put back on the old server after above failed move.
|
|
||||||
assertEquals(regionsOpenCount,
|
|
||||||
master.getMaster().getAssignmentManager().getNumRegionsOpened());
|
|
||||||
} finally {
|
} finally {
|
||||||
|
// Shutdown is messy with complaints about fs being closed. Why? TODO.
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
cluster.join();
|
cluster.join();
|
||||||
TEST_UTIL.shutdownMiniDFSCluster();
|
TEST_UTIL.shutdownMiniDFSCluster();
|
||||||
|
@ -146,19 +153,32 @@ public class TestRSKilledWhenInitializing {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start Master. Get as far as the state where Master is waiting on
|
||||||
|
* RegionServers to check in, then return.
|
||||||
|
*/
|
||||||
private MasterThread startMaster(MasterThread master) {
|
private MasterThread startMaster(MasterThread master) {
|
||||||
master.start();
|
master.start();
|
||||||
long startTime = System.currentTimeMillis();
|
// It takes a while until ServerManager creation to happen inside Master startup.
|
||||||
while (!master.getMaster().isInitialized()) {
|
while (master.getMaster().getServerManager() == null) {
|
||||||
try {
|
continue;
|
||||||
Thread.sleep(100);
|
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
LOG.info("Interrupted: ignoring");
|
|
||||||
}
|
|
||||||
if (System.currentTimeMillis() > startTime + 30000) {
|
|
||||||
throw new RuntimeException("Master not active after 30 seconds");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// Set a listener for the waiting-on-RegionServers state. We want to wait
|
||||||
|
// until this condition before we leave this method and start regionservers.
|
||||||
|
final AtomicBoolean waiting = new AtomicBoolean(false);
|
||||||
|
if (master.getMaster().getServerManager() == null) throw new NullPointerException("SM");
|
||||||
|
master.getMaster().getServerManager().registerListener(new ServerListener() {
|
||||||
|
@Override
|
||||||
|
public void waiting() {
|
||||||
|
waiting.set(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Wait until the Master gets to place where it is waiting on RegionServers to check in.
|
||||||
|
while (!waiting.get()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Set the global master-is-active; gets picked up by regionservers later.
|
||||||
|
masterActive.set(true);
|
||||||
return master;
|
return master;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue